1
0

Compare commits

...

262 Commits

Author SHA1 Message Date
Neil Johnson
35442efb75 0.99.3 2019-04-01 12:49:03 +00:00
Neil Johnson
ed1ce0333c convert rst link to md 2019-04-01 12:53:08 +01:00
Neil Johnson
6a69bf67db 0.99.3rc1 2019-03-27 10:24:24 +00:00
Neil Johnson
4aa914369b bump version 2019-03-27 10:23:03 +00:00
Andrew Morgan
bbd244c7b2 Support 3PID login in password providers (#4931)
Adds a new method, check_3pid_auth, which gives password providers
the chance to allow authentication with third-party identifiers such
as email or msisdn.
2019-03-26 17:48:30 +00:00
Amber Brown
903f04c21f Use the state event amount for userdir import batching, not room count (#4944) 2019-03-27 02:49:28 +11:00
Andrew Morgan
4a125be138 Make federation endpoints more tolerant of trailing slashes v2 (#4935)
Redo of https://github.com/matrix-org/synapse/pull/4840
2019-03-26 11:35:29 +00:00
Richard van der Hoff
8cbbedaa2b Fix ClientReplicationStreamProtocol.__str__ (#4929)
`__str__` depended on `self.addr`, which was absent from
ClientReplicationStreamProtocol, so attempting to call str on such an object
would raise an exception.

We can calculate the peer addr from the transport, so there is no need for addr
anyway.
2019-03-25 16:41:51 +00:00
Richard van der Hoff
9bde730ef8 Fix bug where read-receipts lost their timestamps (#4927)
Make sure that they are sent correctly over the replication stream.

Fixes: #4898
2019-03-25 16:38:05 +00:00
Richard van der Hoff
a54a44734f Use an explicit dbname for postgres connections in the tests. (#4928)
I don't have a database with the same name as my user, so leaving the database
name unset fails.

While we're at it, clear out some unused stuff in the test setup.
2019-03-25 16:36:56 +00:00
Richard van der Hoff
7105057cf2 Fix nginx example in ACME doc. (#4923) 2019-03-25 09:59:36 +00:00
Amber Brown
ac396a0d32 Refactor out state delta handling into its own class (#4917) 2019-03-25 20:37:08 +11:00
Erik Johnston
5fee9d8067 Merge pull request #4869 from matrix-org/erikj/yaml_load
Fix yaml warnings by using safe_load
2019-03-22 11:58:13 +00:00
Erik Johnston
d21a4d6be6 Newsfile 2019-03-22 10:24:58 +00:00
Erik Johnston
3677548a82 Use yaml safe_load 2019-03-22 10:20:17 +00:00
Richard van der Hoff
224783a73f Allow newsfragments to end with exclamation marks! (#4912) 2019-03-21 15:28:19 +00:00
Amber Brown
a68e00fca8 Some more porting to HomeserverTestCase and remove old RESTHelper (#4913) 2019-03-22 02:10:21 +11:00
Andrew Morgan
7bef97dfb7 Remove trailing slashes from outbound federation requests and retry on 400 (#4840)
As per #3622, we remove trailing slashes from outbound federation requests. However, to ensure that we remain backwards compatible with previous versions of Synapse, if we receive a HTTP 400 with `M_UNRECOGNIZED`, then we are likely talking to an older version of Synapse in which case we retry with a trailing slash appended to the request path.
2019-03-21 15:07:28 +00:00
Andrew Morgan
b41c2eaadc Clean up backoff_on_404 and metehod calls 2019-03-21 14:32:47 +00:00
Erik Johnston
01e6b405be Merge pull request #4908 from matrix-org/erikj/block_peek_on_blocked_rooms
Deny peeking into rooms that have been blocked
2019-03-21 14:07:17 +00:00
Richard van der Hoff
27813b4ca1 Update changelog.d/4908.bugfix
Co-Authored-By: erikjohnston <erikj@jki.re>
2019-03-21 14:05:59 +00:00
Colin W
ab4e4c6c2f Update Apache Setup To Remove Location Syntax (#4870)
This one should close #4841. Many thanks to @dev4223 for bringing it up and finding a solution.

Signed-off-by: Colin White
2019-03-21 14:05:56 +00:00
Erik Johnston
d3f640f0ac isort 2019-03-21 11:29:48 +00:00
Erik Johnston
017ed9d423 Newsfile 2019-03-21 11:26:47 +00:00
Erik Johnston
3959858eaa Merge pull request #4904 from matrix-org/erikj/fix_shutdown
Fixup shutdown room API
2019-03-21 11:24:42 +00:00
Erik Johnston
cd80cbffea Fix typo and add description 2019-03-21 11:24:04 +00:00
Erik Johnston
536a266520 Deny peeking into rooms that have been blocked 2019-03-21 11:20:13 +00:00
Erik Johnston
4a8a1ac962 Rejig testcase to make it more extensible 2019-03-21 11:02:11 +00:00
Erik Johnston
9c9e618b93 Remove debug 2019-03-21 10:58:56 +00:00
Erik Johnston
5c6f61f81c Add tests 2019-03-21 10:51:21 +00:00
Erik Johnston
3ecec5ede2 Fix upsert 2019-03-21 10:21:15 +00:00
Erik Johnston
09f991a63d Merge pull request #4896 from matrix-org/erikj/disable_room_directory
Add option to disable search room lists
2019-03-21 10:16:54 +00:00
Richard van der Hoff
a6f2d3053d Log requests which are simulated by the unit tests. (#4905)
Rather than stubbing out the access_log, make it actually log the requests,
which makes it a lot more obvious what is going on during tests.
2019-03-20 18:00:02 +00:00
Erik Johnston
cd62981a6a Revert spurious delete 2019-03-20 17:51:27 +00:00
Erik Johnston
8d8834d3e7 comment block_room 2019-03-20 17:49:56 +00:00
Erik Johnston
aa959a6c07 Use flags 2019-03-20 17:40:29 +00:00
Erik Johnston
7d47cc1305 Move requester check into assert_accepted_privacy_policy 2019-03-20 17:08:36 +00:00
Erik Johnston
30e69ff9b6 Newsfile 2019-03-20 16:56:55 +00:00
Erik Johnston
72a14860ab Gracefully handle failing to kick user 2019-03-20 16:54:21 +00:00
Erik Johnston
6b28890543 Log new room ID 2019-03-20 16:52:28 +00:00
Erik Johnston
74c46d81fa Only require consent for events with an associated request
There are a number of instances where a server or admin may puppet a
user to join/leave rooms, which we don't want to fail if the user has
not consented to the privacy policy. We fix this by adding a check to
test if the requester has an associated access_token, which is used as a
proxy to answer the question of whether the action is being done on
behalf of a real request from the user.
2019-03-20 16:50:23 +00:00
Erik Johnston
67d618e111 Allow blocking a room multiple times 2019-03-20 16:50:05 +00:00
Erik Johnston
263f2c9ce1 Merge pull request #4895 from matrix-org/erikj/disable_user_search
Add option to disable searching in the user dir
2019-03-20 16:47:15 +00:00
Amber Brown
4d53017432 Batching in the user directory import (#4900) 2019-03-21 03:06:36 +11:00
Richard van der Hoff
cdb8036161 Add a config option for torture-testing worker replication. (#4902)
Setting this to 50 or so makes a bunch of sytests fail in worker mode.
2019-03-20 16:04:35 +00:00
Richard van der Hoff
a902d13180 Batch up outgoing read-receipts to reduce federation traffic. (#4890)
Rate-limit outgoing read-receipts as per #4730.
2019-03-20 16:02:25 +00:00
Erik Johnston
3660d24ebe Add test 2019-03-20 15:16:36 +00:00
Erik Johnston
cc09685830 Add test 2019-03-20 14:53:44 +00:00
Erik Johnston
cd8c5b91ad Fix up sample config 2019-03-20 14:35:41 +00:00
Richard van der Hoff
ab20f85c59 Update synapse/config/user_directory.py
Co-Authored-By: erikjohnston <erikj@jki.re>
2019-03-20 14:33:11 +00:00
Erik Johnston
cc197a61a1 Disable publishing to room list when its disabled 2019-03-20 14:30:36 +00:00
Erik Johnston
2c90422146 Pull out config option 2019-03-20 14:25:58 +00:00
Erik Johnston
7529038e66 Return before we log 2019-03-20 14:25:28 +00:00
Erik Johnston
926f29ea6d Fix up config comments 2019-03-20 14:24:53 +00:00
Andrew Morgan
2150151abe kwargs doesn't like commas on calling funcs either. TIL 2019-03-20 14:13:32 +00:00
Andrew Morgan
bb52a2e653 lint 2019-03-20 14:08:57 +00:00
Andrew Morgan
cd36a1283b New test, fix issues 2019-03-20 14:00:39 +00:00
Andrew Morgan
c69df5d5d3 Fix comments. v0.99.2 -> v0.99.3 2019-03-20 11:27:18 +00:00
Andrew Morgan
551ea11559 Just return if not doing any trailing slash shennanigans 2019-03-20 11:07:36 +00:00
Andrew Morgan
94cb7939e4 Federation test fixed! 2019-03-20 10:50:44 +00:00
Erik Johnston
bf5876990f Newsfile 2019-03-19 17:10:54 +00:00
Erik Johnston
213c98c00a Add option to disable search room lists
This disables both local and remote room list searching.
2019-03-19 17:10:52 +00:00
Erik Johnston
855bf4658d Update sample config 2019-03-19 16:47:04 +00:00
Erik Johnston
0891202629 Newsfile 2019-03-19 16:41:57 +00:00
Erik Johnston
320667a479 Add option to disable searching in the user dir
We still populate it, as it can still be accessed via the admin API.
2019-03-19 16:40:19 +00:00
Erik Johnston
11f2125885 Merge pull request #4894 from matrix-org/erikj/postgres_tuning
Add note on tuning postgres
2019-03-19 16:21:24 +00:00
Erik Johnston
38ae23d5c2 Newsfile 2019-03-19 16:07:07 +00:00
Erik Johnston
b616a8717b Add note on tuning postgres 2019-03-19 16:05:32 +00:00
Erik Johnston
4aa0b707d2 Merge pull request #4879 from matrix-org/erikj/test_old_deps
Add py27-old test case to buildkite
2019-03-19 13:05:39 +00:00
Erik Johnston
b0e767f7bb Add comment back in 2019-03-19 12:51:32 +00:00
Richard van der Hoff
5cf00c9f60 Merge pull request #4889 from matrix-org/rav/test_real_config
Use a regular HomeServerConfig object for unit tests
2019-03-19 12:27:07 +00:00
Richard van der Hoff
7872638c31 Merge pull request #4888 from matrix-org/rav/fix_disabled_hs
Enforce hs_disabled_message correctly
2019-03-19 12:22:13 +00:00
Richard van der Hoff
b5d48560c7 Fix RegistrationTestCase
turns out this relies on there being a `user_consent_version` set.
2019-03-19 12:05:05 +00:00
Richard van der Hoff
07f057ac80 changelog 2019-03-19 11:44:43 +00:00
Richard van der Hoff
13bc1e0746 Use a regular HomeServerConfig object for unit tests
Rather than using a Mock for the homeserver config, use a genuine
HomeServerConfig object. This makes for a more realistic test, and means that
we don't have to keep remembering to add things to the mock config every time
we add a new config setting.
2019-03-19 11:44:43 +00:00
Richard van der Hoff
053c50bcb3 Fix resource limits tests
Make sure that we have a `server_notices_mxid` set, given that we are relying
on it.
2019-03-19 11:44:43 +00:00
Richard van der Hoff
45bb54a6c6 Fix registration test
* Set allow_guest_access = True, since we rely on it
* config doesn't have a `hostname` attribute; it is `server_name`
2019-03-19 11:44:43 +00:00
Richard van der Hoff
8c1774e821 Fix email test
The Mailer expects the config object to have `email_smtp_pass` and
`email_riot_base_url` attributes (and it won't by default, because the default
config impl doesn't set any of the attributes unless email_enable_notifs is
set).
2019-03-19 11:44:43 +00:00
Richard van der Hoff
45c4e19c74 Merge remote-tracking branch 'origin/develop' into HEAD 2019-03-19 11:43:46 +00:00
Neil Johnson
88f0675967 fix test_auto_create_auto_join_where_no_consent (#4886) 2019-03-19 11:38:59 +00:00
Richard van der Hoff
0dbfae03f9 Enforce hs_disabled_message correctly
Fixes a bug where hs_disabled_message was not enforced for 3pid-based requests
if there was no server_notices_mxid configured.
2019-03-19 11:30:54 +00:00
Erik Johnston
e9eeca1314 Fix user directory background update (#4887) 2019-03-19 22:13:53 +11:00
Erik Johnston
cfc5a442ac Update newsfile 2019-03-19 10:56:11 +00:00
Richard van der Hoff
d2a537ea60 Merge remote-tracking branch 'origin/master' into develop 2019-03-19 10:37:50 +00:00
Michael Kaye
9482a84c0a Repoint docs for federation (#4881) 2019-03-19 10:37:18 +00:00
Erik Johnston
4d25624ff6 Revert changes 2019-03-19 10:28:00 +00:00
Richard van der Hoff
fd463b4f5d Comment out most options in the generated config. (#4863)
Make it so that most options in the config are optional, and commented out in
the generated config.

The reasons this is a good thing are as follows:

* If we decide that we should change the default for an option, we can do so,
  and only those admins that have deliberately chosen to override that option
  will be stuck on the old setting.

* It moves us towards a point where we can get rid of the super-surprising
  feature of synapse where the default settings for the config come from the
  generated yaml.

* It makes setting up a test config for unit testing an order of magnitude
  easier (see forthcoming PR).

* It makes the generated config more consistent, and hopefully easier for users
  to understand.
2019-03-19 10:06:40 +00:00
Amber Brown
282c97327f Migrate the user directory initial population to a background task (#4864) 2019-03-19 04:50:24 +11:00
Andrew Morgan
a8ad39eec7 lint 2019-03-18 17:47:39 +00:00
Andrew Morgan
621e7f37f1 Better exception handling 2019-03-18 17:45:54 +00:00
Erik Johnston
00d97668bf Bring py27-old into line with other test envs 2019-03-18 17:45:49 +00:00
Erik Johnston
b6ac5e40a0 Add coverage to py27-old 2019-03-18 17:38:22 +00:00
Erik Johnston
9ef1107b33 Newsfile 2019-03-18 17:19:49 +00:00
Erik Johnston
45f97de657 Add py27-old test case to buildkite 2019-03-18 17:15:46 +00:00
Brendan Abolivier
651ad8bc96 Add ratelimiting on failed login attempts (#4865) 2019-03-18 12:57:20 +00:00
Brendan Abolivier
899e523d6d Add ratelimiting on login (#4821)
Add two ratelimiters on login (per-IP address and per-userID).
2019-03-15 17:46:16 +00:00
Richard van der Hoff
3b7ceb2c69 Merge pull request #4855 from matrix-org/rav/refactor_transaction_queue
Split TransactionQueue up
2019-03-15 12:32:11 +00:00
Richard van der Hoff
2dee441bdb Merge pull request #4852 from matrix-org/rav/move_rr_sending_to_worker
Move client receipt processing to federation sender worker.
2019-03-15 12:30:30 +00:00
Erik Johnston
b0fa3f6ff3 Merge pull request #4853 from matrix-org/erikj/worker_docker_ci
Allow passing --daemonize to workers
2019-03-15 10:35:38 +00:00
Luca Corbatto
a6d84190eb Add systemd setup that supports workers (#4662)
This setup is a way to manage workers with systemd. It does however not
require workers. You can use this setup without workers. You just have
to make sure that the homeserver is forking and writes its PID file
to the location the service is looking in.

The currently distributed setup in the debian package does not work in
conjunction with workers.

* Adds changelog

* Lets systemd handle the forking

Sets all services to `type=simple` and disables daemonizing on the
synapse side.

* Formats readme to 80 columns per line

* Allows for full restart of all workers

* Changes README to reflect the new setup

* Adds dot to end of changelog file

* Removes surplus word

Co-Authored-By: targodan <targodan@users.noreply.github.com>

* Adds missing word

Co-Authored-By: targodan <targodan@users.noreply.github.com>

* Fixes linebreak

Co-Authored-By: targodan <targodan@users.noreply.github.com>

* Fixes unit type
2019-03-15 09:51:46 +00:00
Richard van der Hoff
9ffadcdbad fix some typos in federate.md 2019-03-15 09:43:24 +00:00
Aaron Raimist
2fb4ff8c89 Add some stuff back to the .gitignore (#4843)
* Add some stuff back to the .gitignore

Signed-off-by: Aaron Raimist <aaron@raim.ist>

* Add changelog

Signed-off-by: Aaron Raimist <aaron@raim.ist>

* Reorder and remove old items from .gitignore

Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-03-15 09:22:29 +00:00
Erik Johnston
6861ce3fb1 Merge pull request #4859 from matrix-org/revert-4793-anoa/trailing_slashes
Revert "Make federation endpoints more tolerant of trailing slashes for some endpoints"
2019-03-14 14:31:40 +00:00
Erik Johnston
271cb1998b Revert "Make federation endpoints more tolerant of trailing slashes for some endpoints (#4793)"
This reverts commit 290552fd83.
2019-03-14 14:30:54 +00:00
Erik Johnston
9073cfc8bd Merge pull request #4846 from matrix-org/hawkowl/userdir-search
Improve searching in the userdir
2019-03-14 13:54:15 +00:00
Erik Johnston
9ad448c1e5 Correctly handle all command line options 2019-03-14 13:32:14 +00:00
Richard van der Hoff
ec3a59de50 changelog 2019-03-13 22:08:37 +00:00
Andrew Morgan
ecea5af491 Correct var name 2019-03-13 21:21:03 +00:00
Andrew Morgan
b2df0e8e2c receiving a 400 caused an exception. handle it 2019-03-13 21:08:10 +00:00
Andrew Morgan
9a2e22fd41 is this what purgatory feels like 2019-03-13 20:29:38 +00:00
Andrew Morgan
86c60bda15 i should have given up x3 2019-03-13 20:19:07 +00:00
Andrew Morgan
45524f2f5e i should have given up x2 2019-03-13 20:17:39 +00:00
Andrew Morgan
8d16ffaf7a i should have given up 2019-03-13 20:03:10 +00:00
Richard van der Hoff
02e23b36bc Rename and move the classes 2019-03-13 20:02:56 +00:00
Andrew Morgan
26f8e2d099 there comes a time when you should give up. but you dont 2019-03-13 19:49:08 +00:00
Andrew Morgan
5ca857ad84 as above 2019-03-13 19:35:23 +00:00
Andrew Morgan
7c0295f13c no kwargs today 2019-03-13 19:27:10 +00:00
Andrew Morgan
66cdb840a6 Or perhaps I was the one who was drunk 2019-03-13 19:18:25 +00:00
Andrew Morgan
bec313818c go home python, you're drunk 2019-03-13 19:10:56 +00:00
Andrew Morgan
c991e7aec7 Syntax checker is bork 2019-03-13 19:08:08 +00:00
Andrew Morgan
c2d848b80d Destructure again 2019-03-13 19:04:43 +00:00
Andrew Morgan
ee8ba397e8 Are you happy now 2019-03-13 18:48:31 +00:00
Andrew Morgan
9dd0e34679 Syntax test 2019-03-13 18:45:17 +00:00
Andrew Morgan
220607a618 Remove testing code 2019-03-13 18:43:40 +00:00
Andrew Morgan
660b77f362 Add missing docstring detail 2019-03-13 18:38:16 +00:00
Andrew Morgan
5526b054aa Fix syntax issues 2019-03-13 18:35:21 +00:00
Andrew Morgan
09626bfd39 Switch to wrapper function around _send_request 2019-03-13 18:26:06 +00:00
Richard van der Hoff
5d89a526f1 Factor per-destination stuff out of TransactionQueue
This is easier than having to have a million fields keyed on destination.
2019-03-13 17:48:29 +00:00
Erik Johnston
f5d57d4848 Newsfile 2019-03-13 17:38:58 +00:00
Erik Johnston
72bfaf746d Allow passing --daemonize to workers 2019-03-13 17:33:54 +00:00
Richard van der Hoff
6accbd25bc changelog 2019-03-13 17:24:10 +00:00
Richard van der Hoff
fdcad8eabd Move client receipt processing to federation sender worker.
This is mostly a prerequisite for #4730, but also fits with the general theme
of "move everything off the master that we possibly can".
2019-03-13 17:21:19 +00:00
Richard van der Hoff
eed7271b3b declare a ReadReceipt class
I'm going to use this in queues and things, so it'll be useful to give it more
of a structure.
2019-03-13 17:20:55 +00:00
Andrew Morgan
7998ca3a66 Document using a certificate with a full chain (#4849) 2019-03-13 15:26:29 +00:00
Andrew Morgan
7d053cfe10 Retry on 400:M_UNRECOGNIZED 2019-03-13 12:10:33 +00:00
Andrew Morgan
7e75d9644b Fix paranthesis indent 2019-03-13 11:15:23 +00:00
Andrew Morgan
cf301e37d8 Add workaround note 2019-03-13 11:14:43 +00:00
Neil Johnson
332b60ec68 Merge branch 'master' of github.com:matrix-org/synapse into develop 2019-03-12 17:15:21 +00:00
Neil Johnson
83193a9362 fix orphaned sentence 2019-03-12 16:57:17 +00:00
Andrew Morgan
d42c81d724 Transfer local user's push rules on room upgrade (#4838)
Transfer push rules (notifications) on room upgrade
2019-03-12 14:42:53 +00:00
Amber Brown
c0332d095f fixup 2019-03-13 01:30:54 +11:00
Andrew Morgan
97653ef1f4 Correct argument name 2019-03-12 14:30:26 +00:00
Neil Johnson
8b692bf7c2 Neilj/improved delegation doc 2 (#4832)
Improved federation configuration docs.  Specifically detailing  .well-known and SRV based delegation methods. 

Inspiration Valentin Lab <valentin.lab@kalysto.org> for https://github.com/matrix-org/synapse/pull/4781
2019-03-12 14:23:28 +00:00
Amber Brown
797b6a63fc fixup 2019-03-13 01:17:51 +11:00
Andrew Morgan
0ea8582f8b Cleaner way of implementing trailing slashes 2019-03-12 14:11:11 +00:00
Amber Brown
d306bd1b26 fixup 2019-03-12 22:38:01 +11:00
Amber Brown
81d9d1bee6 fixup 2019-03-12 22:28:48 +11:00
Amber Brown
6f5890b2fa fixup 2019-03-12 22:27:56 +11:00
Amber Brown
10480c4348 fixup 2019-03-12 21:47:14 +11:00
Amber Brown
8b618041ef fixup 2019-03-12 18:06:28 +11:00
Amber Brown
c980c7e31f use the old method 2019-03-12 17:51:14 +11:00
Andrew Morgan
4868b12029 and again 2019-03-11 18:22:26 +00:00
Aaron Raimist
8ea1b41a0e Clarify what registration_shared_secret allows for (#2885) (#4844)
* Clarify what registration_shared_secret allows for (#2885)

Signed-off-by: Aaron Raimist <aaron@raim.ist>

* Add changelog

Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-03-11 18:21:52 +00:00
Andrew Morgan
802cb5dcf0 Fix syntax error 2019-03-11 18:08:28 +00:00
Andrew Morgan
66f205e93d We're calling different functions now 2019-03-11 18:01:58 +00:00
Andrew Morgan
f18dca26da Merge branch 'develop' into anoa/trailing_slashes_client 2019-03-11 17:44:29 +00:00
Andrew Morgan
290552fd83 Make federation endpoints more tolerant of trailing slashes for some endpoints (#4793)
Server side of a solution towards #3622.
2019-03-11 17:44:03 +00:00
Erik Johnston
9c0e6e8e7c Merge pull request #4847 from matrix-org/erikj/fix_stacktrace_keyring
Correctly log expected errors when fetching server keys
2019-03-11 17:12:20 +00:00
Andrew Morgan
a8a028dfce Merge branch 'develop' into anoa/trailing_slashes_client 2019-03-11 17:07:31 +00:00
Erik Johnston
33dfd9a288 Newsfile 2019-03-11 14:13:58 +00:00
Erik Johnston
78c563b77c Correctly log expected errors when fetching server keys 2019-03-11 14:11:10 +00:00
Amber Brown
78a6b950b3 fix 2019-03-12 00:50:28 +11:00
Amber Brown
1b77bd69fb pep8 2019-03-12 00:39:12 +11:00
Amber Brown
41a5ba1682 changelog 2019-03-12 00:38:17 +11:00
Amber Brown
5ba8ceab4c fixes 2019-03-12 00:35:31 +11:00
Amber Brown
26eefca3b7 setup master 2019-03-11 21:16:10 +11:00
Amber Brown
8da22e2b53 master startup 2019-03-11 21:13:35 +11:00
Amber Brown
30a8deeb68 Merge remote-tracking branch 'origin/develop' into hawkowl/userdir-search 2019-03-11 21:12:30 +11:00
Amber Brown
4abc988c6a initial 2019-03-11 21:11:36 +11:00
Andrew Morgan
b61ac9660a Merge pull request #4839 from matrix-org/anoa/no_captcha_tests
Disable captcha registration by default in tests
2019-03-11 09:58:43 +00:00
Matthew Hodgson
2326e00bc4 fix incorrect encoding of filenames with spaces in (#2090)
fixes https://github.com/vector-im/riot-web/issues/3155
2019-03-11 09:53:45 +00:00
Erik Johnston
ca7f7d84f4 Merge pull request #4837 from matrix-org/erikj/optional_prev_state
Make `prev_state` field optional
2019-03-11 09:22:33 +00:00
Andrew Morgan
a5dd335cd8 lint 2019-03-08 18:25:59 +00:00
Andrew Morgan
f8740d57de Add changelog 2019-03-08 18:23:54 +00:00
Andrew Morgan
64ff11019e Retry certain federation requests on 404 2019-03-08 18:22:47 +00:00
Andrew Morgan
525dd02bbe Remove trailing slashes from outbound federation requests 2019-03-08 16:55:52 +00:00
Andrew Morgan
50924ee34a Add changelog 2019-03-08 16:49:16 +00:00
Erik Johnston
fe6c12e6cd Add comment to schema 2019-03-08 16:38:23 +00:00
Andrew Morgan
d6e0be92fe Disable captcha registration by default in tests 2019-03-08 15:49:38 +00:00
Erik Johnston
cac4723afe Newsfile 2019-03-08 15:08:34 +00:00
Erik Johnston
5536ddba75 Make prev_state field optional
The `prev_state` field on events is not specced and so synapse shouldn't
explode if an event is missing the field.

Fixes #4787
2019-03-08 15:05:32 +00:00
Erik Johnston
39e57f9728 Merge pull request #4814 from matrix-org/erikj/soft_fail_impl
Implement soft fail
2019-03-08 14:51:06 +00:00
Erik Johnston
8c4896668f Merge branch 'develop' of github.com:matrix-org/synapse into erikj/soft_fail_impl 2019-03-08 11:44:20 +00:00
Erik Johnston
0ff8163eae Factor out soft fail checks 2019-03-08 11:26:33 +00:00
Erik Johnston
469b66c102 Merge pull request #4829 from matrix-org/erikj/device_list_seen_updates
When re-syncing device lists reset the state
2019-03-08 09:54:54 +00:00
Erik Johnston
436b1c8be1 Merge pull request #4828 from matrix-org/erikj/debug_device_lists
Add some debug logging for device list handling
2019-03-07 17:02:58 +00:00
Erik Johnston
cef80da903 Newsfile 2019-03-07 16:18:02 +00:00
Erik Johnston
d42b41544a When re-syncing device lists reset the state
We keep track of what stream IDs we've seen so that we know what updates
we've handled or missed. If we re-sync we don't know if the updates
we've seen are included in the re-sync (there may be a race), so we
should reset the seen updates.
2019-03-07 16:04:24 +00:00
Erik Johnston
7404fb3cdb Newsfile 2019-03-07 16:00:09 +00:00
Erik Johnston
c633fc02d7 Add some debug logging for device list handling 2019-03-07 15:58:03 +00:00
Richard van der Hoff
fde26e47d6 Merge pull request #4824 from matrix-org/rav/docker_docs
Document the `generate` option for the docker image.
2019-03-07 14:12:44 +00:00
Richard van der Hoff
ba7a6807c8 Debian package: fix warning during preconfiguration. (#4823) 2019-03-07 14:12:27 +00:00
Richard van der Hoff
32471d63b7 Update example_log_config.yaml (#4820) 2019-03-07 14:12:10 +00:00
Richard van der Hoff
685704536f Fix check-newsfragment for debian-only changes. (#4825) 2019-03-07 14:03:05 +00:00
Amber Brown
f6135d06cf Rewrite userdir to be faster (#4537) 2019-03-07 01:22:53 -08:00
Richard van der Hoff
5580616235 Document the generate option for the docker image. 2019-03-07 07:35:42 +00:00
Richard van der Hoff
6e4931aa19 Debian package: fix warning during preconfiguration. 2019-03-07 07:18:06 +00:00
Matthew Hodgson
8f4b9f5210 Reword the sample config header to be less scary (#4801) 2019-03-07 07:09:01 +00:00
Travis Ralston
06cbf79fe3 Merge pull request #4779 from matrix-org/travis/icons
Use static locations for Riot icons
2019-03-06 15:11:41 -07:00
Richard van der Hoff
898378c9b5 Update changelog.d/4779.misc
Co-Authored-By: turt2live <travpc@gmail.com>
2019-03-06 14:29:56 -07:00
Erik Johnston
1f85c2c0ce Merge pull request #4818 from matrix-org/erikj/prefill_client_ips
Prefill client IPs cache on workers
2019-03-06 19:17:39 +00:00
Erik Johnston
366877c579 Update changelog 2019-03-06 19:04:52 +00:00
Erik Johnston
0a6e716600 Merge pull request #4815 from matrix-org/erikj/docstrings
Add docstrings from matrix-org-hotfixes
2019-03-06 18:59:28 +00:00
Erik Johnston
c665b637de Merge pull request #4816 from matrix-org/erikj/4422_debug
Port #4422 debug logging from hotfixes
2019-03-06 18:59:12 +00:00
Erik Johnston
ff7bd29ea9 Merge pull request #4817 from matrix-org/erikj/shutdown_room_message
Send message after room has been shutdown
2019-03-06 18:59:05 +00:00
Richard van der Hoff
b70ea3fa78 Add zwsp in bug report template (#4811)
Inserts a zero-width space in the `-->` which isn't supposed to close a
comment. This used to be here but it got lost in
d86826277d.
2019-03-06 17:55:59 +00:00
Erik Johnston
7791c5194e Newsfile 2019-03-06 17:40:51 +00:00
Erik Johnston
face0c5b3c Prefill client IPs cache on workers 2019-03-06 17:39:32 +00:00
Erik Johnston
03dce32019 Newsfile 2019-03-06 17:38:19 +00:00
Erik Johnston
b879870b2d Send message after room has been shutdown
Currently the explanation message is sent to the abuse room before any
users are forced joined, which means it tends to get lost in the backlog
of joins.

So instead we send the message *after* we've forced joined everyone.
2019-03-06 17:35:11 +00:00
Erik Johnston
4238f63545 Newsfile 2019-03-06 17:32:48 +00:00
Erik Johnston
8b7790e68f Port #4422 debug logging from hotfixes 2019-03-06 17:29:15 +00:00
Erik Johnston
9c50074c21 Newsfile 2019-03-06 17:24:53 +00:00
Erik Johnston
6d13bdec91 Add docstrings from matrix-org-hotfixes 2019-03-06 17:24:26 +00:00
Erik Johnston
4c473ba088 Newsfile 2019-03-06 16:24:03 +00:00
Erik Johnston
a9de04be72 Implement soft fail 2019-03-06 16:22:16 +00:00
Andrew Morgan
7b8a157b79 Merge pull request #4792 from matrix-org/anoa/replication_tokens
Support batch updates in the worker sender
2019-03-06 15:48:29 +00:00
Brendan Abolivier
20dd3403ee Merge pull request #4804 from matrix-org/babolivier/ratelimit_registration_improvements
Improve ratelimit on registration
2019-03-06 11:40:10 +00:00
Brendan Abolivier
6f3cde8b25 Make registration ratelimiter separate from the main events one 2019-03-06 11:02:42 +00:00
Brendan Abolivier
f4195f4118 Revert "Split ratelimiters in two (one for events, one for registration)"
This reverts commit d7dbad3526.
2019-03-06 10:55:22 +00:00
Brendan Abolivier
6fcecb4859 Add changelog 2019-03-05 18:55:29 +00:00
Brendan Abolivier
d7dbad3526 Split ratelimiters in two (one for events, one for registration) 2019-03-05 18:41:27 +00:00
Brendan Abolivier
c23e8c3333 Update sample config 2019-03-05 18:03:48 +00:00
Brendan Abolivier
067ce795c0 Move settings from registration to ratelimiting in config file 2019-03-05 18:03:14 +00:00
Erik Johnston
16c8b4ecbd Merge pull request #4772 from jbweston/jbweston/server-version-api
Add 'server_version' endpoint to admin API
2019-03-05 16:31:00 +00:00
Brendan Abolivier
a4c3a361b7 Add rate-limiting on registration (#4735)
* Rate-limiting for registration

* Add unit test for registration rate limiting

* Add config parameters for rate limiting on auth endpoints

* Doc

* Fix doc of rate limiting function

Co-Authored-By: babolivier <contact@brendanabolivier.com>

* Incorporate review

* Fix config parsing

* Fix linting errors

* Set default config for auth rate limiting

* Fix tests

* Add changelog

* Advance reactor instead of mocked clock

* Move parameters to registration specific config and give them more sensible default values

* Remove unused config options

* Don't mock the rate limiter un MAU tests

* Rename _register_with_store into register_with_store

* Make CI happy

* Remove unused import

* Update sample config

* Fix ratelimiting test for py2

* Add non-guest test
2019-03-05 14:25:33 +00:00
Andrew Morgan
b9f6163092 Simplify token replication logic 2019-03-05 13:58:30 +00:00
Andrew Morgan
3887e0cd80 Merge pull request #4795 from matrix-org/anoa/configinatoractoring
Remove reference to that no longer exists in README
2019-03-05 09:47:14 +00:00
Andrew Morgan
ae90531036 Merge pull request #4794 from matrix-org/anoa/erroneous_dollahs
Remove unnecessary dollar signs
2019-03-05 09:46:59 +00:00
Erik Johnston
b050a10871 Merge pull request #4699 from matrix-org/erikj/stop_fed_not_in_room
Stop backpaginating when events not visible
2019-03-05 09:32:33 +00:00
Erik Johnston
9e8bca5667 Merge pull request #4799 from matrix-org/rav/clean_up_replication_code
Clean ups in replication notifier
2019-03-05 09:19:48 +00:00
Erik Johnston
aa06d26ae0 clarify comments 2019-03-05 09:16:35 +00:00
Erik Johnston
c3c542bb4a Merge pull request #4796 from matrix-org/erikj/factor_out_e2e_keys
Allow /keys/{changes,query} API to run on worker
2019-03-05 09:06:25 +00:00
Richard van der Hoff
eaa9f43603 changelog 2019-03-04 19:01:19 +00:00
Richard van der Hoff
c7325776a7 Remove redundant PreserveLoggingContext
Both (!) things that register as replication listeners do the right thing wrt
logcontexts, so this is redundant.
2019-03-04 18:31:18 +00:00
Erik Johnston
00b0e8b7df Newsfile 2019-03-04 18:30:07 +00:00
Erik Johnston
bfa7d46a10 Allow /keys/{changes,query} API to run on worker 2019-03-04 18:30:01 +00:00
Erik Johnston
157e5a8f27 Split DeviceHandler into master and worker 2019-03-04 18:29:26 +00:00
Richard van der Hoff
daa10e3e66 Remove unused wait_for_replication method
I guess this was used once? It's not now, anyway.
2019-03-04 18:27:32 +00:00
Erik Johnston
a84b8d56c2 Fixup slave stores 2019-03-04 18:04:57 +00:00
Erik Johnston
0d2d046709 Fix missing null guard 2019-03-04 16:04:04 +00:00
Andrew Morgan
be18073692 Add changelog 2019-03-04 16:01:17 +00:00
Andrew Morgan
d3f270f06a Remove reference to that no longer exists in README 2019-03-04 15:59:43 +00:00
Andrew Morgan
3a438c24a6 Add changelog 2019-03-04 15:28:22 +00:00
Andrew Morgan
336de1d45b Remove unnecessary dollar signs
A dollar sign is already appended to the end of each PATH, so there's
no need to add one in the PATH declaration as well.
2019-03-04 15:25:12 +00:00
Andrew Morgan
fe7bd23a85 Clean up logic and add comments 2019-03-04 15:08:15 +00:00
Erik Johnston
d1523aed6b Only check history visibility when filtering
When filtering events to send to server we check more than just history
visibility. However when deciding whether to backfill or not we only
care about the history visibility.
2019-03-04 14:43:42 +00:00
Andrew Morgan
9f7cdf3da1 Clearer branching, fix missing list clear 2019-03-04 14:36:52 +00:00
Andrew Morgan
0bc50fb60a Add changelog 2019-03-04 14:05:16 +00:00
Andrew Morgan
5f0c449dd5 Prevent replication wedging 2019-03-04 14:03:18 +00:00
Erik Johnston
8b63fe4c26 s/get_forward_events/get_successor_events/ 2019-03-04 11:56:03 +00:00
Erik Johnston
fbc047f2a5 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/stop_fed_not_in_room 2019-03-04 11:54:58 +00:00
Joseph Weston
144cbfd650 add API documentation
Signed-off-by: Joseph Weston <joseph@weston.cloud>
2019-03-02 03:07:04 +01:00
Travis Ralston
13c18853b9 Changelog 2019-03-01 15:15:00 -07:00
Travis Ralston
4dfbae18fe Use static locations for Riot icons
See https://github.com/vector-im/riot-web/issues/9009
2019-03-01 15:06:21 -07:00
Joseph Weston
d3dcb64501 Add changelog and AUTHORS file entry
Signed-off-by: Joseph Weston <joseph@weston.cloud>
2019-03-01 10:44:40 +01:00
Joseph Weston
1e8388b311 Add 'server_version' endpoint to admin API
This is required because the 'Server' HTTP header is not always
passed through proxies.
2019-03-01 09:56:58 +01:00
Joseph Weston
b136ee10df Import 'admin' module rather than 'register_servlets' directly
We will later need also to import 'register_servlets' from the
'login' module, so we un-pollute the namespace now to keep the
logical changes separate.
2019-03-01 09:46:25 +01:00
Erik Johnston
71ef5fc411 Update newsfile to have a full stop 2019-02-27 13:22:23 +00:00
Erik Johnston
b183fef9ac Update comments 2019-02-27 13:06:10 +00:00
Erik Johnston
56f4ece778 Newsfile 2019-02-20 18:15:13 +00:00
Erik Johnston
71b625d808 Stop backpaginating when events not visible 2019-02-20 18:14:12 +00:00
167 changed files with 6250 additions and 3329 deletions

View File

@@ -90,6 +90,17 @@ steps:
image: "python:3.7"
propagate-environment: true
- command:
- "python -m pip install tox"
- "tox -e py27-old,codecov"
label: ":python: 2.7 / SQLite / Old Deps"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:2.7"
propagate-environment: true
- label: ":python: 2.7 / :postgres: 9.4"
env:
TRIAL_FLAGS: "-j 4"

View File

@@ -4,9 +4,9 @@ about: Create a report to help us improve
---
<!--
<!--
**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**:
**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**:
You will likely get better support more quickly if you ask in ** #matrix:matrix.org ** ;)
@@ -17,7 +17,7 @@ the necessary data to fix your issue.
You can also preview your report before submitting it. You may remove sections
that aren't relevant to your particular case.
Text between <!-- and --> marks will be invisible in the report.
Text between <!-- and --> marks will be invisible in the report.
-->
@@ -31,7 +31,7 @@ Text between <!-- and --> marks will be invisible in the report.
- that reproduce the bug
- using hyphens as bullet points
<!--
<!--
Describe how what happens differs from what you expected.
If you can identify any relevant log snippets from _homeserver.log_, please include
@@ -48,8 +48,8 @@ those (please be careful to remove any personal or private data). Please surroun
If not matrix.org:
<!--
What version of Synapse is running?
<!--
What version of Synapse is running?
You can find the Synapse version by inspecting the server headers (replace matrix.org with
your own homeserver domain):
$ curl -v https://matrix.org/_matrix/client/versions 2>&1 | grep "Server:"

10
.gitignore vendored
View File

@@ -12,11 +12,15 @@ _trial_temp/
_trial_temp*/
# stuff that is likely to exist when you run a server locally
/*.db
/*.log
/*.log.config
/*.pid
/*.signing.key
/*.tls.crt
/*.tls.key
/uploads
/env/
/homeserver*.yaml
/media_store/
/uploads
# IDEs
/.idea/

View File

@@ -69,3 +69,6 @@ Serban Constantin <serban.constantin at gmail dot com>
Jason Robinson <jasonr at matrix.org>
* Minor fixes
Joseph Weston <joseph at weston.cloud>
+ Add admin API for querying HS version

View File

@@ -1,3 +1,102 @@
Synapse 0.99.3 (2019-04-01)
===========================
No significant changes.
Synapse 0.99.3rc1 (2019-03-27)
==============================
Features
--------
- The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. ([\#4537](https://github.com/matrix-org/synapse/issues/4537), [\#4846](https://github.com/matrix-org/synapse/issues/4846), [\#4864](https://github.com/matrix-org/synapse/issues/4864), [\#4887](https://github.com/matrix-org/synapse/issues/4887), [\#4900](https://github.com/matrix-org/synapse/issues/4900), [\#4944](https://github.com/matrix-org/synapse/issues/4944))
- Add configurable rate limiting to the /register endpoint. ([\#4735](https://github.com/matrix-org/synapse/issues/4735), [\#4804](https://github.com/matrix-org/synapse/issues/4804))
- Move server key queries to federation reader. ([\#4757](https://github.com/matrix-org/synapse/issues/4757))
- Add support for /account/3pid REST endpoint to client_reader worker. ([\#4759](https://github.com/matrix-org/synapse/issues/4759))
- Add an endpoint to the admin API for querying the server version. Contributed by Joseph Weston. ([\#4772](https://github.com/matrix-org/synapse/issues/4772))
- Include a default configuration file in the 'docs' directory. ([\#4791](https://github.com/matrix-org/synapse/issues/4791), [\#4801](https://github.com/matrix-org/synapse/issues/4801))
- Synapse is now permissive about trailing slashes on some of its federation endpoints, allowing zero or more to be present. ([\#4793](https://github.com/matrix-org/synapse/issues/4793))
- Add support for /keys/query and /keys/changes REST endpoints to client_reader worker. ([\#4796](https://github.com/matrix-org/synapse/issues/4796))
- Add checks to incoming events over federation for events evading auth (aka "soft fail"). ([\#4814](https://github.com/matrix-org/synapse/issues/4814))
- Add configurable rate limiting to the /login endpoint. ([\#4821](https://github.com/matrix-org/synapse/issues/4821), [\#4865](https://github.com/matrix-org/synapse/issues/4865))
- Remove trailing slashes from certain outbound federation requests. Retry if receiving a 404. Context: #3622. ([\#4840](https://github.com/matrix-org/synapse/issues/4840))
- Allow passing --daemonize flags to workers in the same way as with master. ([\#4853](https://github.com/matrix-org/synapse/issues/4853))
- Batch up outgoing read-receipts to reduce federation traffic. ([\#4890](https://github.com/matrix-org/synapse/issues/4890), [\#4927](https://github.com/matrix-org/synapse/issues/4927))
- Add option to disable searching the user directory. ([\#4895](https://github.com/matrix-org/synapse/issues/4895))
- Add option to disable searching of local and remote public room lists. ([\#4896](https://github.com/matrix-org/synapse/issues/4896))
- Add ability for password providers to login/register a user via 3PID (email, phone). ([\#4931](https://github.com/matrix-org/synapse/issues/4931))
Bugfixes
--------
- Fix a bug where media with spaces in the name would get a corrupted name. ([\#2090](https://github.com/matrix-org/synapse/issues/2090))
- Fix attempting to paginate in rooms where server cannot see any events, to avoid unnecessarily pulling in lots of redacted events. ([\#4699](https://github.com/matrix-org/synapse/issues/4699))
- 'event_id' is now a required parameter in federated state requests, as per the matrix spec. ([\#4740](https://github.com/matrix-org/synapse/issues/4740))
- Fix tightloop over connecting to replication server. ([\#4749](https://github.com/matrix-org/synapse/issues/4749))
- Fix parsing of Content-Disposition headers on remote media requests and URL previews. ([\#4763](https://github.com/matrix-org/synapse/issues/4763))
- Fix incorrect log about not persisting duplicate state event. ([\#4776](https://github.com/matrix-org/synapse/issues/4776))
- Fix v4v6 option in HAProxy example config. Contributed by Flakebi. ([\#4790](https://github.com/matrix-org/synapse/issues/4790))
- Handle batch updates in worker replication protocol. ([\#4792](https://github.com/matrix-org/synapse/issues/4792))
- Fix bug where we didn't correctly throttle sending of USER_IP commands over replication. ([\#4818](https://github.com/matrix-org/synapse/issues/4818))
- Fix potential race in handling missing updates in device list updates. ([\#4829](https://github.com/matrix-org/synapse/issues/4829))
- Fix bug where synapse expected an un-specced `prev_state` field on state events. ([\#4837](https://github.com/matrix-org/synapse/issues/4837))
- Transfer a user's notification settings (push rules) on room upgrade. ([\#4838](https://github.com/matrix-org/synapse/issues/4838))
- fix test_auto_create_auto_join_where_no_consent. ([\#4886](https://github.com/matrix-org/synapse/issues/4886))
- Fix a bug where hs_disabled_message was sometimes not correctly enforced. ([\#4888](https://github.com/matrix-org/synapse/issues/4888))
- Fix bug in shutdown room admin API where it would fail if a user in the room hadn't consented to the privacy policy. ([\#4904](https://github.com/matrix-org/synapse/issues/4904))
- Fix bug where blocked world-readable rooms were still peekable. ([\#4908](https://github.com/matrix-org/synapse/issues/4908))
Internal Changes
----------------
- Add a systemd setup that supports synapse workers. Contributed by Luca Corbatto. ([\#4662](https://github.com/matrix-org/synapse/issues/4662))
- Change from TravisCI to Buildkite for CI. ([\#4752](https://github.com/matrix-org/synapse/issues/4752))
- When presence is disabled don't send over replication. ([\#4757](https://github.com/matrix-org/synapse/issues/4757))
- Minor docstring fixes for MatrixFederationAgent. ([\#4765](https://github.com/matrix-org/synapse/issues/4765))
- Optimise EDU transmission for the federation_sender worker. ([\#4770](https://github.com/matrix-org/synapse/issues/4770))
- Update test_typing to use HomeserverTestCase. ([\#4771](https://github.com/matrix-org/synapse/issues/4771))
- Update URLs for riot.im icons and logos in the default notification templates. ([\#4779](https://github.com/matrix-org/synapse/issues/4779))
- Removed unnecessary $ from some federation endpoint path regexes. ([\#4794](https://github.com/matrix-org/synapse/issues/4794))
- Remove link to deleted title in README. ([\#4795](https://github.com/matrix-org/synapse/issues/4795))
- Clean up read-receipt handling. ([\#4797](https://github.com/matrix-org/synapse/issues/4797))
- Add some debug about processing read receipts. ([\#4798](https://github.com/matrix-org/synapse/issues/4798))
- Clean up some replication code. ([\#4799](https://github.com/matrix-org/synapse/issues/4799))
- Add some docstrings. ([\#4815](https://github.com/matrix-org/synapse/issues/4815))
- Add debug logger to try and track down #4422. ([\#4816](https://github.com/matrix-org/synapse/issues/4816))
- Make shutdown API send explanation message to room after users have been forced joined. ([\#4817](https://github.com/matrix-org/synapse/issues/4817))
- Update example_log_config.yaml. ([\#4820](https://github.com/matrix-org/synapse/issues/4820))
- Document the `generate` option for the docker image. ([\#4824](https://github.com/matrix-org/synapse/issues/4824))
- Fix check-newsfragment for debian-only changes. ([\#4825](https://github.com/matrix-org/synapse/issues/4825))
- Add some debug logging for device list updates to help with #4828. ([\#4828](https://github.com/matrix-org/synapse/issues/4828))
- Improve federation documentation, specifically .well-known support. Many thanks to @vaab. ([\#4832](https://github.com/matrix-org/synapse/issues/4832))
- Disable captcha registration by default in unit tests. ([\#4839](https://github.com/matrix-org/synapse/issues/4839))
- Add stuff back to the .gitignore. ([\#4843](https://github.com/matrix-org/synapse/issues/4843))
- Clarify what registration_shared_secret allows for. ([\#4844](https://github.com/matrix-org/synapse/issues/4844))
- Correctly log expected errors when fetching server keys. ([\#4847](https://github.com/matrix-org/synapse/issues/4847))
- Update install docs to explicitly state a full-chain (not just the top-level) TLS certificate must be provided to Synapse. This caused some people's Synapse ports to appear correct in a browser but still (rightfully so) upset the federation tester. ([\#4849](https://github.com/matrix-org/synapse/issues/4849))
- Move client read-receipt processing to federation sender worker. ([\#4852](https://github.com/matrix-org/synapse/issues/4852))
- Refactor federation TransactionQueue. ([\#4855](https://github.com/matrix-org/synapse/issues/4855))
- Comment out most options in the generated config. ([\#4863](https://github.com/matrix-org/synapse/issues/4863))
- Fix yaml library warnings by using safe_load. ([\#4869](https://github.com/matrix-org/synapse/issues/4869))
- Update Apache setup to remove location syntax. Thanks to @cwmke! ([\#4870](https://github.com/matrix-org/synapse/issues/4870))
- Reinstate test case that runs unit tests against oldest supported dependencies. ([\#4879](https://github.com/matrix-org/synapse/issues/4879))
- Update link to federation docs. ([\#4881](https://github.com/matrix-org/synapse/issues/4881))
- fix test_auto_create_auto_join_where_no_consent. ([\#4886](https://github.com/matrix-org/synapse/issues/4886))
- Use a regular HomeServerConfig object for unit tests rater than a Mock. ([\#4889](https://github.com/matrix-org/synapse/issues/4889))
- Add some notes about tuning postgres for larger deployments. ([\#4895](https://github.com/matrix-org/synapse/issues/4895))
- Add a config option for torture-testing worker replication. ([\#4902](https://github.com/matrix-org/synapse/issues/4902))
- Log requests which are simulated by the unit tests. ([\#4905](https://github.com/matrix-org/synapse/issues/4905))
- Allow newsfragments to end with exclamation marks. Exciting! ([\#4912](https://github.com/matrix-org/synapse/issues/4912))
- Refactor some more tests to use HomeserverTestCase. ([\#4913](https://github.com/matrix-org/synapse/issues/4913))
- Refactor out the state deltas portion of the user directory store and handler. ([\#4917](https://github.com/matrix-org/synapse/issues/4917))
- Fix nginx example in ACME doc. ([\#4923](https://github.com/matrix-org/synapse/issues/4923))
- Use an explicit dbname for postgres connections in the tests. ([\#4928](https://github.com/matrix-org/synapse/issues/4928))
- Fix `ClientReplicationStreamProtocol.__str__()`. ([\#4929](https://github.com/matrix-org/synapse/issues/4929))
Synapse 0.99.2 (2019-03-01)
===========================

View File

@@ -71,7 +71,8 @@ set this to the hostname of your server. For a more production-ready setup, you
will probably want to specify your domain (`example.com`) rather than a
matrix-specific hostname here (in the same way that your email address is
probably `user@example.com` rather than `user@email.example.com`) - but
doing so may require more advanced setup. - see [Setting up Federation](README.rst#setting-up-federation). Beware that the server name cannot be changed later.
doing so may require more advanced setup: see [Setting up Federation](docs/federate.md).
Beware that the server name cannot be changed later.
This command will generate you a config file that you can then customise, but it will
also generate a set of keys for you. These keys will allow your Home Server to
@@ -374,9 +375,16 @@ To configure Synapse to expose an HTTPS port, you will need to edit
* You will also need to uncomment the `tls_certificate_path` and
`tls_private_key_path` lines under the `TLS` section. You can either
point these settings at an existing certificate and key, or you can
enable Synapse's built-in ACME (Let's Encrypt) support. Instructions
for having Synapse automatically provision and renew federation
certificates through ACME can be found at [ACME.md](docs/ACME.md).
enable Synapse's built-in ACME (Let's Encrypt) support. Instructions
for having Synapse automatically provision and renew federation
certificates through ACME can be found at [ACME.md](docs/ACME.md). If you
are using your own certificate, be sure to use a `.pem` file that includes
the full certificate chain including any intermediate certificates (for
instance, if using certbot, use `fullchain.pem` as your certificate, not
`cert.pem`).
For those of you upgrading your TLS certificate in readiness for Synapse 1.0,
please take a look at [our guide](docs/MSC1711_certificates_FAQ.md#configuring-certificates-for-compatibility-with-synapse-100).
## Registering a user
@@ -402,8 +410,8 @@ This process uses a setting `registration_shared_secret` in
`homeserver.yaml`, which is shared between Synapse itself and the
`register_new_matrix_user` script. It doesn't matter what it is (a random
value is generated by `--generate-config`), but it should be kept secret, as
anyone with knowledge of it can register users on your server even if
`enable_registration` is `false`.
anyone with knowledge of it can register users, including admin accounts,
on your server even if `enable_registration` is `false`.
## Setting up a TURN server

View File

@@ -80,7 +80,10 @@ Thanks for using Matrix!
Synapse Installation
====================
For details on how to install synapse, see `<INSTALL.md>`_.
.. _federation:
* For details on how to install synapse, see `<INSTALL.md>`_.
* For specific details on how to configure Synapse for federation see `docs/federate.md <docs/federate.md>`_
Connecting to Synapse from a client
@@ -93,13 +96,13 @@ Unless you are running a test instance of Synapse on your local machine, in
general, you will need to enable TLS support before you can successfully
connect from a client: see `<INSTALL.md#tls-certificates>`_.
An easy way to get started is to login or register via Riot at
https://riot.im/app/#/login or https://riot.im/app/#/register respectively.
An easy way to get started is to login or register via Riot at
https://riot.im/app/#/login or https://riot.im/app/#/register respectively.
You will need to change the server you are logging into from ``matrix.org``
and instead specify a Homeserver URL of ``https://<server_name>:8448``
(or just ``https://<server_name>`` if you are using a reverse proxy).
(Leave the identity server as the default - see `Identity servers`_.)
If you prefer to use another client, refer to our
and instead specify a Homeserver URL of ``https://<server_name>:8448``
(or just ``https://<server_name>`` if you are using a reverse proxy).
(Leave the identity server as the default - see `Identity servers`_.)
If you prefer to use another client, refer to our
`client breakdown <https://matrix.org/docs/projects/clients-matrix>`_.
If all goes well you should at least be able to log in, create a room, and
@@ -117,9 +120,9 @@ recommended to also set up CAPTCHA - see `<docs/CAPTCHA_SETUP.rst>`_.)
Once ``enable_registration`` is set to ``true``, it is possible to register a
user via `riot.im <https://riot.im/app/#/register>`_ or other Matrix clients.
Your new user name will be formed partly from the ``server_name`` (see
`Configuring synapse`_), and partly from a localpart you specify when you
create the account. Your name will take the form of::
Your new user name will be formed partly from the ``server_name``, and partly
from a localpart you specify when you create the account. Your name will take
the form of::
@localpart:my.domain.name
@@ -151,56 +154,6 @@ server on the same domain.
See https://github.com/vector-im/riot-web/issues/1977 and
https://developer.github.com/changes/2014-04-25-user-content-security for more details.
Troubleshooting
===============
Running out of File Handles
---------------------------
If synapse runs out of filehandles, it typically fails badly - live-locking
at 100% CPU, and/or failing to accept new TCP connections (blocking the
connecting client). Matrix currently can legitimately use a lot of file handles,
thanks to busy rooms like #matrix:matrix.org containing hundreds of participating
servers. The first time a server talks in a room it will try to connect
simultaneously to all participating servers, which could exhaust the available
file descriptors between DNS queries & HTTPS sockets, especially if DNS is slow
to respond. (We need to improve the routing algorithm used to be better than
full mesh, but as of June 2017 this hasn't happened yet).
If you hit this failure mode, we recommend increasing the maximum number of
open file handles to be at least 4096 (assuming a default of 1024 or 256).
This is typically done by editing ``/etc/security/limits.conf``
Separately, Synapse may leak file handles if inbound HTTP requests get stuck
during processing - e.g. blocked behind a lock or talking to a remote server etc.
This is best diagnosed by matching up the 'Received request' and 'Processed request'
log lines and looking for any 'Processed request' lines which take more than
a few seconds to execute. Please let us know at #synapse:matrix.org if
you see this failure mode so we can help debug it, however.
Help!! Synapse eats all my RAM!
-------------------------------
Synapse's architecture is quite RAM hungry currently - we deliberately
cache a lot of recent room data and metadata in RAM in order to speed up
common requests. We'll improve this in future, but for now the easiest
way to either reduce the RAM usage (at the risk of slowing things down)
is to set the almost-undocumented ``SYNAPSE_CACHE_FACTOR`` environment
variable. The default is 0.5, which can be decreased to reduce RAM usage
in memory constrained enviroments, or increased if performance starts to
degrade.
Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant
improvement in overall amount, and especially in terms of giving back RAM
to the OS. To use it, the library must simply be put in the LD_PRELOAD
environment variable when launching Synapse. On Debian, this can be done
by installing the ``libjemalloc1`` package and adding this line to
``/etc/default/matrix-synapse``::
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
This can make a significant difference on Python 2.7 - it's unclear how
much of an improvement it provides on Python 3.x.
Upgrading an existing Synapse
=============================
@@ -211,100 +164,19 @@ versions of synapse.
.. _UPGRADE.rst: UPGRADE.rst
.. _federation:
Setting up Federation
=====================
Federation is the process by which users on different servers can participate
in the same room. For this to work, those other servers must be able to contact
yours to send messages.
The ``server_name`` in your ``homeserver.yaml`` file determines the way that
other servers will reach yours. By default, they will treat it as a hostname
and try to connect to port 8448. This is easy to set up and will work with the
default configuration, provided you set the ``server_name`` to match your
machine's public DNS hostname, and give Synapse a TLS certificate which is
valid for your ``server_name``.
For a more flexible configuration, you can set up a DNS SRV record. This allows
you to run your server on a machine that might not have the same name as your
domain name. For example, you might want to run your server at
``synapse.example.com``, but have your Matrix user-ids look like
``@user:example.com``. (A SRV record also allows you to change the port from
the default 8448).
To use a SRV record, first create your SRV record and publish it in DNS. This
should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
<synapse.server.name>``. The DNS record should then look something like::
$ dig -t srv _matrix._tcp.example.com
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
Note that the server hostname cannot be an alias (CNAME record): it has to point
directly to the server hosting the synapse instance.
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
its user-ids, by setting ``server_name``::
python -m synapse.app.homeserver \
--server-name <yourdomain.com> \
--config-path homeserver.yaml \
--generate-config
python -m synapse.app.homeserver --config-path homeserver.yaml
If you've already generated the config file, you need to edit the ``server_name``
in your ``homeserver.yaml`` file. If you've already started Synapse and a
database has been created, you will have to recreate the database.
If all goes well, you should be able to `connect to your server with a client`__,
and then join a room via federation. (Try ``#matrix-dev:matrix.org`` as a first
step. "Matrix HQ"'s sheer size and activity level tends to make even the
largest boxes pause for thought.)
.. __: `Connecting to Synapse from a client`_
Troubleshooting
---------------
You can use the `federation tester <https://matrix.org/federationtester>`_ to
check if your homeserver is all set.
The typical failure mode with federation is that when you try to join a room,
it is rejected with "401: Unauthorized". Generally this means that other
servers in the room couldn't access yours. (Joining a room over federation is a
complicated dance which requires connections in both directions).
So, things to check are:
* If you are not using a SRV record, check that your ``server_name`` (the part
of your user-id after the ``:``) matches your hostname, and that port 8448 on
that hostname is reachable from outside your network.
* If you *are* using a SRV record, check that it matches your ``server_name``
(it should be ``_matrix._tcp.<server_name>``), and that the port and hostname
it specifies are reachable from outside your network.
Another common problem is that people on other servers can't join rooms that
you invite them to. This can be caused by an incorrectly-configured reverse
proxy: see `<docs/reverse_proxy.rst>`_ for instructions on how to correctly
configure a reverse proxy.
Running a Demo Federation of Synapses
-------------------------------------
If you want to get up and running quickly with a trio of homeservers in a
private federation, there is a script in the ``demo`` directory. This is mainly
useful just for development purposes. See `<demo/README>`_.
Using PostgreSQL
================
As of Synapse 0.9, `PostgreSQL <https://www.postgresql.org>`_ is supported as an
alternative to the `SQLite <https://sqlite.org/>`_ database that Synapse has
traditionally used for convenience and simplicity.
Synapse offers two database engines:
* `SQLite <https://sqlite.org/>`_
* `PostgreSQL <https://www.postgresql.org>`_
The advantages of Postgres include:
By default Synapse uses SQLite in and doing so trades performance for convenience.
SQLite is only recommended in Synapse for testing purposes or for servers with
light workloads.
Almost all installations should opt to use PostreSQL. Advantages include:
* significant performance improvements due to the superior threading and
caching model, smarter query optimiser
@@ -440,3 +312,54 @@ sphinxcontrib-napoleon::
Building internal API documentation::
python setup.py build_sphinx
Troubleshooting
===============
Running out of File Handles
---------------------------
If synapse runs out of file handles, it typically fails badly - live-locking
at 100% CPU, and/or failing to accept new TCP connections (blocking the
connecting client). Matrix currently can legitimately use a lot of file handles,
thanks to busy rooms like #matrix:matrix.org containing hundreds of participating
servers. The first time a server talks in a room it will try to connect
simultaneously to all participating servers, which could exhaust the available
file descriptors between DNS queries & HTTPS sockets, especially if DNS is slow
to respond. (We need to improve the routing algorithm used to be better than
full mesh, but as of March 2019 this hasn't happened yet).
If you hit this failure mode, we recommend increasing the maximum number of
open file handles to be at least 4096 (assuming a default of 1024 or 256).
This is typically done by editing ``/etc/security/limits.conf``
Separately, Synapse may leak file handles if inbound HTTP requests get stuck
during processing - e.g. blocked behind a lock or talking to a remote server etc.
This is best diagnosed by matching up the 'Received request' and 'Processed request'
log lines and looking for any 'Processed request' lines which take more than
a few seconds to execute. Please let us know at #synapse:matrix.org if
you see this failure mode so we can help debug it, however.
Help!! Synapse eats all my RAM!
-------------------------------
Synapse's architecture is quite RAM hungry currently - we deliberately
cache a lot of recent room data and metadata in RAM in order to speed up
common requests. We'll improve this in the future, but for now the easiest
way to either reduce the RAM usage (at the risk of slowing things down)
is to set the almost-undocumented ``SYNAPSE_CACHE_FACTOR`` environment
variable. The default is 0.5, which can be decreased to reduce RAM usage
in memory constrained enviroments, or increased if performance starts to
degrade.
Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant
improvement in overall amount, and especially in terms of giving back RAM
to the OS. To use it, the library must simply be put in the LD_PRELOAD
environment variable when launching Synapse. On Debian, this can be done
by installing the ``libjemalloc1`` package and adding this line to
``/etc/default/matrix-synapse``::
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
This can make a significant difference on Python 2.7 - it's unclear how
much of an improvement it provides on Python 3.x.

View File

@@ -1 +0,0 @@
'event_id' is now a required parameter in federated state requests, as per the matrix spec.

View File

@@ -1 +0,0 @@
Fix tightloop over connecting to replication server.

View File

@@ -1 +0,0 @@
Change from TravisCI to Buildkite for CI.

View File

@@ -1 +0,0 @@
Move server key queries to federation reader.

View File

@@ -1 +0,0 @@
When presence is disabled don't send over replication.

View File

@@ -1 +0,0 @@
Add support for /account/3pid REST endpoint to client_reader worker.

View File

@@ -1 +0,0 @@
Fix parsing of Content-Disposition headers on remote media requests and URL previews.

View File

@@ -1 +0,0 @@
Minor docstring fixes for MatrixFederationAgent.

View File

@@ -1 +0,0 @@
Optimise EDU transmission for the federation_sender worker.

View File

@@ -1 +0,0 @@
Update test_typing to use HomeserverTestCase.

View File

@@ -1 +0,0 @@
Fix incorrect log about not persisting duplicate state event.

View File

@@ -1 +0,0 @@
Fix v4v6 option in HAProxy example config. Contributed by Flakebi.

View File

@@ -1 +0,0 @@
Include a default configuration file in the 'docs' directory.

View File

@@ -1 +0,0 @@
Clean up read-receipt handling.

View File

@@ -1 +0,0 @@
Add some debug about processing read receipts.

View File

@@ -19,6 +19,7 @@ handlers:
# example output to console
console:
class: logging.StreamHandler
formatter: fmt
filters: [context]
# example output to file - to enable, edit 'root' config below.
@@ -29,7 +30,7 @@ handlers:
maxBytes: 100000000
backupCount: 3
filters: [context]
encoding: utf8
root:
level: INFO

View File

@@ -0,0 +1,150 @@
# Setup Synapse with Workers and Systemd
This is a setup for managing synapse with systemd including support for
managing workers. It provides a `matrix-synapse`, as well as a
`matrix-synapse-worker@` service for any workers you require. Additionally to
group the required services it sets up a `matrix.target`. You can use this to
automatically start any bot- or bridge-services. More on this in
[Bots and Bridges](#bots-and-bridges).
See the folder [system](system) for any service and target files.
The folder [workers](workers) contains an example configuration for the
`federation_reader` worker. Pay special attention to the name of the
configuration file. In order to work with the `matrix-synapse-worker@.service`
service, it needs to have the exact same name as the worker app.
This setup expects neither the homeserver nor any workers to fork. Forking is
handled by systemd.
## Setup
1. Adjust your matrix configs. Make sure that the worker config files have the
exact same name as the worker app. Compare `matrix-synapse-worker@.service` for
why. You can find an example worker config in the [workers](workers) folder. See
below for relevant settings in the `homeserver.yaml`.
2. Copy the `*.service` and `*.target` files in [system](system) to
`/etc/systemd/system`.
3. `systemctl enable matrix-synapse.service` this adds the homeserver
app to the `matrix.target`
4. *Optional.* `systemctl enable
matrix-synapse-worker@federation_reader.service` this adds the federation_reader
app to the `matrix-synapse.service`
5. *Optional.* Repeat step 4 for any additional workers you require.
6. *Optional.* Add any bots or bridges by enabling them.
7. Start all matrix related services via `systemctl start matrix.target`
8. *Optional.* Enable autostart of all matrix related services on system boot
via `systemctl enable matrix.target`
## Usage
After you have setup you can use the following commands to manage your synapse
installation:
```
# Start matrix-synapse, all workers and any enabled bots or bridges.
systemctl start matrix.target
# Restart matrix-synapse and all workers (not necessarily restarting bots
# or bridges, see "Bots and Bridges")
systemctl restart matrix-synapse.service
# Stop matrix-synapse and all workers (not necessarily restarting bots
# or bridges, see "Bots and Bridges")
systemctl stop matrix-synapse.service
# Restart a specific worker (i. e. federation_reader), the homeserver is
# unaffected by this.
systemctl restart matrix-synapse-worker@federation_reader.service
# Add a new worker (assuming all configs are setup already)
systemctl enable matrix-synapse-worker@federation_writer.service
systemctl restart matrix-synapse.service
```
## The Configs
Make sure the `worker_app` is set in the `homeserver.yaml` and it does not fork.
```
worker_app: synapse.app.homeserver
daemonize: false
```
None of the workers should fork, as forking is handled by systemd. Hence make
sure this is present in all worker config files.
```
worker_daemonize: false
```
The config files of all workers are expected to be located in
`/etc/matrix-synapse/workers`. If you want to use a different location you have
to edit the provided `*.service` files accordingly.
## Bots and Bridges
Most bots and bridges do not care if the homeserver goes down or is restarted.
Depending on the implementation this may crash them though. So look up the docs
or ask the community of the specific bridge or bot you want to run to make sure
you choose the correct setup.
Whichever configuration you choose, after the setup the following will enable
automatically starting (and potentially restarting) your bot/bridge with the
`matrix.target`.
```
systemctl enable <yourBotOrBridgeName>.service
```
**Note** that from an inactive synapse the bots/bridges will only be started with
synapse if you start the `matrix.target`, not if you start the
`matrix-synapse.service`. This is on purpose. Think of `matrix-synapse.service`
as *just* synapse, but `matrix.target` being anything matrix related, including
synapse and any and all enabled bots and bridges.
### Start with synapse but ignore synapse going down
If the bridge can handle shutdowns of the homeserver you'll want to install the
service in the `matrix.target` and optionally add a
`After=matrix-synapse.service` dependency to have the bot/bridge start after
synapse on starting everything.
In this case the service file should look like this.
```
[Unit]
# ...
# Optional, this will only ensure that if you start everything, synapse will
# be started before the bot/bridge will be started.
After=matrix-synapse.service
[Service]
# ...
[Install]
WantedBy=matrix.target
```
### Stop/restart when synapse stops/restarts
If the bridge can't handle shutdowns of the homeserver you'll still want to
install the service in the `matrix.target` but also have to specify the
`After=matrix-synapse.service` *and* `BindsTo=matrix-synapse.service`
dependencies to have the bot/bridge stop/restart with synapse.
In this case the service file should look like this.
```
[Unit]
# ...
# Mandatory
After=matrix-synapse.service
BindsTo=matrix-synapse.service
[Service]
# ...
[Install]
WantedBy=matrix.target
```

View File

@@ -0,0 +1,17 @@
[Unit]
Description=Synapse Matrix Worker
After=matrix-synapse.service
BindsTo=matrix-synapse.service
[Service]
Type=simple
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.%i --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
[Install]
WantedBy=matrix-synapse.service

View File

@@ -0,0 +1,16 @@
[Unit]
Description=Synapse Matrix Homeserver
[Service]
Type=simple
User=matrix-synapse
WorkingDirectory=/var/lib/matrix-synapse
EnvironmentFile=/etc/default/matrix-synapse
ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=3
[Install]
WantedBy=matrix.target

View File

@@ -0,0 +1,7 @@
[Unit]
Description=Contains matrix services like synapse, bridges and bots
After=network.target
AllowIsolate=no
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,14 @@
worker_app: synapse.app.federation_reader
worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_replication_http_port: 9093
worker_listeners:
- type: http
port: 8011
resources:
- names: [federation]
worker_daemonize: false
worker_log_config: /etc/matrix-synapse/federation-reader-log.yaml

10
debian/changelog vendored
View File

@@ -1,3 +1,13 @@
matrix-synapse-py3 (0.99.3) stable; urgency=medium
[ Richard van der Hoff ]
* Fix warning during preconfiguration. (Fixes: #4819)
[ Synapse Packaging team ]
* New synapse release 0.99.3.
-- Synapse Packaging team <packages@matrix.org> Mon, 01 Apr 2019 12:48:21 +0000
matrix-synapse-py3 (0.99.2) stable; urgency=medium
* Fix overwriting of config settings on upgrade.

View File

@@ -5,7 +5,11 @@ set -e
. /usr/share/debconf/confmodule
# try to update the debconf db according to whatever is in the config files
/opt/venvs/matrix-synapse/lib/manage_debconf.pl read || true
#
# note that we may get run during preconfiguration, in which case the script
# will not yet be installed.
[ -x /opt/venvs/matrix-synapse/lib/manage_debconf.pl ] && \
/opt/venvs/matrix-synapse/lib/manage_debconf.pl read
db_input high matrix-synapse/server-name || true
db_input high matrix-synapse/report-stats || true

View File

@@ -28,7 +28,7 @@ with your postgres database.
docker run \
-d \
--name synapse \
-v ${DATA_PATH}:/data \
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
matrixdotorg/synapse:latest
@@ -87,10 +87,15 @@ Global settings:
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
then customize it manually. No other environment variable is required.
then customize it manually: see [Generating a config
file](#generating-a-config-file).
Otherwise, a dynamic configuration file will be used. The following environment
variables are available for configuration:
Otherwise, a dynamic configuration file will be used.
### Environment variables used to build a dynamic configuration file
The following environment variables are used to build the configuration file
when ``SYNAPSE_CONFIG_PATH`` is not set.
* ``SYNAPSE_SERVER_NAME`` (mandatory), the server public hostname.
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
@@ -143,3 +148,31 @@ Mail server specific values (will not send emails if not set):
any.
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail
server if any.
### Generating a config file
It is possible to generate a basic configuration file for use with
`SYNAPSE_CONFIG_PATH` using the `generate` commandline option. You will need to
specify values for `SYNAPSE_CONFIG_PATH`, `SYNAPSE_SERVER_NAME` and
`SYNAPSE_REPORT_STATS`, and mount a docker volume to store the data on. For
example:
```
docker run -it --rm
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
matrixdotorg/synapse:latest generate
```
This will generate a `homeserver.yaml` in (typically)
`/var/lib/docker/volumes/synapse-data/_data`, which you can then customise and
use with:
```
docker run -d --name synapse \
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
matrixdotorg/synapse:latest
```

View File

@@ -1,7 +1,12 @@
# This file is a reference to the configuration options which can be set in
# homeserver.yaml.
# The config is maintained as an up-to-date snapshot of the default
# homeserver.yaml configuration generated by Synapse.
#
# Note that it is not quite ready to be used as-is. If you are starting from
# scratch, it is easier to generate the config files following the instructions
# in INSTALL.md.
# It is intended to act as a reference for the default configuration,
# helping admins keep track of new options and other changes, and compare
# their configs with the current default. As such, many of the actual
# config values shown are placeholders.
#
# It is *not* intended to be copied and used as the basis for a real
# homeserver.yaml. Instead, if you are starting from scratch, please generate
# a fresh config using Synapse by following the instructions in INSTALL.md.

View File

@@ -67,7 +67,7 @@ For nginx users, add the following line to your existing `server` block:
```
location /.well-known/acme-challenge {
proxy_pass http://localhost:8009/;
proxy_pass http://localhost:8009;
}
```

View File

@@ -0,0 +1,22 @@
Version API
===========
This API returns the running Synapse version and the Python version
on which Synapse is being run. This is useful when a Synapse instance
is behind a proxy that does not forward the 'Server' header (which also
contains Synapse version information).
The api is::
GET /_matrix/client/r0/admin/server_version
including an ``access_token`` of a server admin.
It returns a JSON body like the following:
.. code:: json
{
"server_version": "0.99.2rc1 (b=develop, abcdef123)",
"python_version": "3.6.8"
}

123
docs/federate.md Normal file
View File

@@ -0,0 +1,123 @@
Setting up Federation
=====================
Federation is the process by which users on different servers can participate
in the same room. For this to work, those other servers must be able to contact
yours to send messages.
The ``server_name`` configured in the Synapse configuration file (often
``homeserver.yaml``) defines how resources (users, rooms, etc.) will be
identified (eg: ``@user:example.com``, ``#room:example.com``). By
default, it is also the domain that other servers will use to
try to reach your server (via port 8448). This is easy to set
up and will work provided you set the ``server_name`` to match your
machine's public DNS hostname, and provide Synapse with a TLS certificate
which is valid for your ``server_name``.
Once you have completed the steps necessary to federate, you should be able to
join a room via federation. (A good place to start is ``#synapse:matrix.org`` - a
room for Synapse admins.)
## Delegation
For a more flexible configuration, you can have ``server_name``
resources (eg: ``@user:example.com``) served by a different host and
port (eg: ``synapse.example.com:443``). There are two ways to do this:
- adding a ``/.well-known/matrix/server`` URL served on ``https://example.com``.
- adding a DNS ``SRV`` record in the DNS zone of domain
``example.com``.
Without configuring delegation, the matrix federation will
expect to find your server via ``example.com:8448``. The following methods
allow you retain a `server_name` of `example.com` so that your user IDs, room
aliases, etc continue to look like `*:example.com`, whilst having your
federation traffic routed to a different server.
### .well-known delegation
To use this method, you need to be able to alter the
``server_name`` 's https server to serve the ``/.well-known/matrix/server``
URL. Having an active server (with a valid TLS certificate) serving your
``server_name`` domain is out of the scope of this documentation.
The URL ``https://<server_name>/.well-known/matrix/server`` should
return a JSON structure containing the key ``m.server`` like so:
{
"m.server": "<synapse.server.name>[:<yourport>]"
}
In our example, this would mean that URL ``https://example.com/.well-known/matrix/server``
should return:
{
"m.server": "synapse.example.com:443"
}
Note, specifying a port is optional. If a port is not specified an SRV lookup
is performed, as described below. If the target of the
delegation does not have an SRV record, then the port defaults to 8448.
Most installations will not need to configure .well-known. However, it can be
useful in cases where the admin is hosting on behalf of someone else and
therefore cannot gain access to the necessary certificate. With .well-known,
federation servers will check for a valid TLS certificate for the delegated
hostname (in our example: ``synapse.example.com``).
.well-known support first appeared in Synapse v0.99.0. To federate with older
servers you may need to additionally configure SRV delegation. Alternatively,
encourage the server admin in question to upgrade :).
### DNS SRV delegation
To use this delegation method, you need to have write access to your
``server_name`` 's domain zone DNS records (in our example it would be
``example.com`` DNS zone).
This method requires the target server to provide a
valid TLS certificate for the original ``server_name``.
You need to add a SRV record in your ``server_name`` 's DNS zone with
this format:
_matrix._tcp.<yourdomain.com> <ttl> IN SRV <priority> <weight> <port> <synapse.server.name>
In our example, we would need to add this SRV record in the
``example.com`` DNS zone:
_matrix._tcp.example.com. 3600 IN SRV 10 5 443 synapse.example.com.
Once done and set up, you can check the DNS record with ``dig -t srv
_matrix._tcp.<server_name>``. In our example, we would expect this:
$ dig -t srv _matrix._tcp.example.com
_matrix._tcp.example.com. 3600 IN SRV 10 0 443 synapse.example.com.
Note that the target of a SRV record cannot be an alias (CNAME record): it has to point
directly to the server hosting the synapse instance.
## Troubleshooting
You can use the [federation tester](
<https://matrix.org/federationtester>) to check if your homeserver is
configured correctly. Alternatively try the [JSON API used by the federation tester](https://matrix.org/federationtester/api/report?server_name=DOMAIN).
Note that you'll have to modify this URL to replace ``DOMAIN`` with your
``server_name``. Hitting the API directly provides extra detail.
The typical failure mode for federation is that when the server tries to join
a room, it is rejected with "401: Unauthorized". Generally this means that other
servers in the room could not access yours. (Joining a room over federation is
a complicated dance which requires connections in both directions).
Another common problem is that people on other servers can't join rooms that
you invite them to. This can be caused by an incorrectly-configured reverse
proxy: see [reverse_proxy.rst](<reverse_proxy.rst>) for instructions on how to correctly
configure a reverse proxy.
## Running a Demo Federation of Synapses
If you want to get up and running quickly with a trio of homeservers in a
private federation, there is a script in the ``demo`` directory. This is mainly
useful just for development purposes. See [demo/README](<../demo/README>).

View File

@@ -75,6 +75,20 @@ Password auth provider classes may optionally provide the following methods.
result from the ``/login`` call (including ``access_token``, ``device_id``,
etc.)
``someprovider.check_3pid_auth``\(*medium*, *address*, *password*)
This method, if implemented, is called when a user attempts to register or
log in with a third party identifier, such as email. It is passed the
medium (ex. "email"), an address (ex. "jdoe@example.com") and the user's
password.
The method should return a Twisted ``Deferred`` object, which resolves to
a ``str`` containing the user's (canonical) User ID if authentication was
successful, and ``None`` if not.
As with ``check_auth``, the ``Deferred`` may alternatively resolve to a
``(user_id, callback)`` tuple.
``someprovider.check_password``\(*user_id*, *password*)
This method provides a simpler interface than ``get_supported_login_types``

View File

@@ -49,6 +49,24 @@ As with Debian/Ubuntu, postgres support depends on the postgres python connector
export PATH=/usr/pgsql-9.4/bin/:$PATH
pip install psycopg2
Tuning Postgres
===============
The default settings should be fine for most deployments. For larger scale
deployments tuning some of the settings is recommended, details of which can be
found at https://wiki.postgresql.org/wiki/Tuning_Your_PostgreSQL_Server.
In particular, we've found tuning the following values helpful for performance:
- ``shared_buffers``
- ``effective_cache_size``
- ``work_mem``
- ``maintenance_work_mem``
- ``autovacuum_work_mem``
Note that the appropriate values for those fields depend on the amount of free
memory the database host has available.
Synapse config
==============
@@ -129,8 +147,8 @@ Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file ``homeserver-postgres.yaml``::
./synctl stop
mv homeserver.yaml homeserver-old-sqlite.yaml
mv homeserver-postgres.yaml homeserver.yaml
mv homeserver.yaml homeserver-old-sqlite.yaml
mv homeserver-postgres.yaml homeserver.yaml
./synctl start
Synapse should now be running against PostgreSQL.

View File

@@ -18,7 +18,7 @@ servers do not necessarily need to connect to your server via the same server
name or port. Indeed, clients will use port 443 by default, whereas servers
default to port 8448. Where these are different, we refer to the 'client port'
and the 'federation port'. See `Setting up federation
<../README.rst#setting-up-federation>`_ for more details of the algorithm used for
<federate.md>`_ for more details of the algorithm used for
federation connections.
Let's assume that we expect clients to connect to our server at
@@ -69,20 +69,16 @@ Let's assume that we expect clients to connect to our server at
SSLEngine on
ServerName matrix.example.com;
<Location /_matrix>
ProxyPass http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse http://127.0.0.1:8008/_matrix
</Location>
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
</VirtualHost>
<VirtualHost *:8448>
SSLEngine on
ServerName example.com;
<Location /_matrix>
ProxyPass http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse http://127.0.0.1:8008/_matrix
</Location>
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
</VirtualHost>
* HAProxy::

View File

@@ -1,9 +1,14 @@
# This file is a reference to the configuration options which can be set in
# homeserver.yaml.
# The config is maintained as an up-to-date snapshot of the default
# homeserver.yaml configuration generated by Synapse.
#
# Note that it is not quite ready to be used as-is. If you are starting from
# scratch, it is easier to generate the config files following the instructions
# in INSTALL.md.
# It is intended to act as a reference for the default configuration,
# helping admins keep track of new options and other changes, and compare
# their configs with the current default. As such, many of the actual
# config values shown are placeholders.
#
# It is *not* intended to be copied and used as the basis for a real
# homeserver.yaml. Instead, if you are starting from scratch, please generate
# a fresh config using Synapse by following the instructions in INSTALL.md.
## Server ##
@@ -58,11 +63,11 @@ pid_file: DATADIR/homeserver.pid
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
#
soft_file_limit: 0
#soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
#
use_presence: true
#use_presence: false
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
#
@@ -241,6 +246,11 @@ listeners:
# See 'ACME support' below to enable auto-provisioning this certificate via
# Let's Encrypt.
#
# If supplying your own, be sure to use a `.pem` file that includes the
# full certificate chain including any intermediate certificates (for
# instance, if using certbot, use `fullchain.pem` as your certificate,
# not `cert.pem`).
#
#tls_certificate_path: "CONFDIR/SERVERNAME.tls.crt"
# PEM-encoded private key for TLS
@@ -349,7 +359,8 @@ database:
database: "DATADIR/homeserver.db"
# Number of events to cache in memory.
event_cache_size: "10K"
#
#event_cache_size: 10K
## Logging ##
@@ -363,35 +374,77 @@ log_config: "CONFDIR/SERVERNAME.log.config"
# Number of messages a client can send per second
#
rc_messages_per_second: 0.2
#rc_messages_per_second: 0.2
# Number of message a client can send before being throttled
#
rc_message_burst_count: 10.0
#rc_message_burst_count: 10.0
# Ratelimiting settings for registration and login.
#
# Each ratelimiting configuration is made of two parameters:
# - per_second: number of requests a client can send per second.
# - burst_count: number of requests a client can send before being throttled.
#
# Synapse currently uses the following configurations:
# - one for registration that ratelimits registration requests based on the
# client's IP address.
# - one for login that ratelimits login requests based on the client's IP
# address.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into, based on the amount of failed login
# attempts for this account.
#
# The defaults are as shown below.
#
#rc_registration:
# per_second: 0.17
# burst_count: 3
#
#rc_login:
# address:
# per_second: 0.17
# burst_count: 3
# account:
# per_second: 0.17
# burst_count: 3
# failed_attempts:
# per_second: 0.17
# burst_count: 3
# The federation window size in milliseconds
#
federation_rc_window_size: 1000
#federation_rc_window_size: 1000
# The number of federation requests from a single server in a window
# before the server will delay processing the request.
#
federation_rc_sleep_limit: 10
#federation_rc_sleep_limit: 10
# The duration in milliseconds to delay processing events from
# remote servers by if they go over the sleep limit.
#
federation_rc_sleep_delay: 500
#federation_rc_sleep_delay: 500
# The maximum number of concurrent federation requests allowed
# from a single server
#
federation_rc_reject_limit: 50
#federation_rc_reject_limit: 50
# The number of federation requests to concurrently process from a
# single server
#
federation_rc_concurrent: 3
#federation_rc_concurrent: 3
# Target outgoing federation transaction frequency for sending read-receipts,
# per-room.
#
# If we end up trying to send out more read-receipts, they will get buffered up
# into fewer transactions.
#
#federation_rr_transactions_per_room_per_second: 50
@@ -420,11 +473,11 @@ uploads_path: "DATADIR/uploads"
# The largest allowed upload size in bytes
#
max_upload_size: "10M"
#max_upload_size: 10M
# Maximum number of pixels that will be thumbnailed
#
max_image_pixels: "32M"
#max_image_pixels: 32M
# Whether to generate new thumbnails on the fly to precisely match
# the resolution requested by the client. If true then whenever
@@ -432,32 +485,32 @@ max_image_pixels: "32M"
# generate a new thumbnail. If false the server will pick a thumbnail
# from a precalculated list.
#
dynamic_thumbnails: false
#dynamic_thumbnails: false
# List of thumbnails to precalculate when an image is uploaded.
#
thumbnail_sizes:
- width: 32
height: 32
method: crop
- width: 96
height: 96
method: crop
- width: 320
height: 240
method: scale
- width: 640
height: 480
method: scale
- width: 800
height: 600
method: scale
#thumbnail_sizes:
# - width: 32
# height: 32
# method: crop
# - width: 96
# height: 96
# method: crop
# - width: 320
# height: 240
# method: scale
# - width: 640
# height: 480
# method: scale
# - width: 800
# height: 600
# method: scale
# Is the preview URL API enabled? If enabled, you *must* specify
# an explicit url_preview_ip_range_blacklist of IPs that the spider is
# denied from accessing.
#
url_preview_enabled: False
#url_preview_enabled: false
# List of IP address CIDR ranges that the URL preview spider is denied
# from accessing. There are no defaults: you must explicitly
@@ -522,8 +575,8 @@ url_preview_enabled: False
# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
# The largest allowed URL preview spidering size in bytes
max_spider_size: "10M"
#
#max_spider_size: 10M
## Captcha ##
@@ -531,23 +584,25 @@ max_spider_size: "10M"
# This Home Server's ReCAPTCHA public key.
#
recaptcha_public_key: "YOUR_PUBLIC_KEY"
#recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key.
#
recaptcha_private_key: "YOUR_PRIVATE_KEY"
#recaptcha_private_key: "YOUR_PRIVATE_KEY"
# Enables ReCaptcha checks when registering, preventing signup
# unless a captcha is answered. Requires a valid ReCaptcha
# public/private key.
#
enable_registration_captcha: False
#enable_registration_captcha: false
# A secret key used to bypass the captcha test entirely.
#
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
#
#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
## TURN ##
@@ -568,7 +623,7 @@ recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
# How long generated TURN credentials last
#
turn_user_lifetime: "1h"
#turn_user_lifetime: 1h
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
@@ -576,13 +631,17 @@ turn_user_lifetime: "1h"
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
#
turn_allow_guests: True
#turn_allow_guests: True
## Registration ##
#
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users.
enable_registration: False
#
#enable_registration: false
# The user must provide all of the below types of 3PID when registering.
#
@@ -593,7 +652,7 @@ enable_registration: False
# Explicitly disable asking for MSISDNs from the registration
# flow (overrides registrations_require_3pid if MSISDNs are set as required)
#
#disable_msisdn_registration: True
#disable_msisdn_registration: true
# Mandate that users are only allowed to associate certain formats of
# 3PIDs with accounts on this server.
@@ -606,8 +665,8 @@ enable_registration: False
# - medium: msisdn
# pattern: '\+44'
# If set, allows registration by anyone who also has the shared
# secret, even if registration is otherwise disabled.
# If set, allows registration of standard or admin accounts by anyone who
# has the shared secret, even if registration is otherwise disabled.
#
# registration_shared_secret: <PRIVATE STRING>
@@ -617,13 +676,13 @@ enable_registration: False
# N.B. that increasing this will exponentially increase the time required
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
#
bcrypt_rounds: 12
#bcrypt_rounds: 12
# Allows users to register as guests without a password/email/etc, and
# participate in rooms hosted on this server which have been made
# accessible to anonymous users.
#
allow_guest_access: False
#allow_guest_access: false
# The identity server which we suggest that clients should use when users log
# in on this server.
@@ -639,9 +698,9 @@ allow_guest_access: False
# Also defines the ID server which will be called when an account is
# deactivated (one will be picked arbitrarily).
#
trusted_third_party_id_servers:
- matrix.org
- vector.im
#trusted_third_party_id_servers:
# - matrix.org
# - vector.im
# Users who register on this homeserver will automatically be joined
# to these rooms
@@ -655,14 +714,14 @@ trusted_third_party_id_servers:
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
autocreate_auto_join_rooms: true
#autocreate_auto_join_rooms: true
## Metrics ###
# Enable collection and rendering of performance metrics
#
enable_metrics: False
#enable_metrics: False
# Enable sentry integration
# NOTE: While attempts are made to ensure that the logs don't contain
@@ -682,22 +741,24 @@ enable_metrics: False
# A list of event types that will be included in the room_invite_state
#
room_invite_state_types:
- "m.room.join_rules"
- "m.room.canonical_alias"
- "m.room.avatar"
- "m.room.encryption"
- "m.room.name"
#room_invite_state_types:
# - "m.room.join_rules"
# - "m.room.canonical_alias"
# - "m.room.avatar"
# - "m.room.encryption"
# - "m.room.name"
# A list of application service config file to use
# A list of application service config files to use
#
app_service_config_files: []
#app_service_config_files:
# - app_service_1.yaml
# - app_service_2.yaml
# Whether or not to track application service IP addresses. Implicitly
# Uncomment to enable tracking of application service IP addresses. Implicitly
# enables MAU tracking for application service users.
#
track_appservice_user_ips: False
#track_appservice_user_ips: True
# a secret which is used to sign access tokens. If none is specified,
@@ -708,7 +769,7 @@ track_appservice_user_ips: False
# Used to enable access token expiration.
#
expire_access_token: False
#expire_access_token: False
# a secret which is used to calculate HMACs for form values, to stop
# falsification of values. Must be specified for the User Consent
@@ -737,17 +798,16 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
# Determines how quickly servers will query to check which keys
# are still valid.
#
key_refresh_interval: "1d" # 1 Day.
#key_refresh_interval: 1d
# The trusted servers to download signing keys from.
#
perspectives:
servers:
"matrix.org":
verify_keys:
"ed25519:auto":
key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
#perspectives:
# servers:
# "matrix.org":
# verify_keys:
# "ed25519:auto":
# key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
# Enable SAML2 for registration and login. Uses pysaml2.
@@ -812,14 +872,15 @@ perspectives:
# algorithm: "HS256"
# Enable password for login.
#
password_config:
enabled: true
# Uncomment to disable password login
#
#enabled: false
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
#pepper: ""
#
#pepper: "EVEN_MORE_SECRET"
@@ -888,9 +949,9 @@ password_config:
# example_option: 'things'
# Whether to allow non server admins to create groups on this server
# Uncomment to allow non-server-admin users to create groups on this server
#
enable_group_creation: false
#enable_group_creation: true
# If enabled, non server admins can only create groups with local parts
# starting with this prefix
@@ -901,6 +962,10 @@ enable_group_creation: false
# User Directory configuration
#
# 'enabled' defines whether users can search the user directory. If
# false then empty responses are returned to all queries. Defaults to
# true.
#
# 'search_all_users' defines whether to search all users visible to your HS
# when searching the user directory, rather than limiting to users visible
# in public rooms. Defaults to false. If you set it True, you'll have to run
@@ -908,6 +973,7 @@ enable_group_creation: false
# on your database to tell it to rebuild the user_directory search indexes.
#
#user_directory:
# enabled: true
# search_all_users: false
@@ -983,6 +1049,12 @@ enable_group_creation: false
# Uncomment to disable searching the public room list. When disabled
# blocks searching local and remote room lists for local and remote
# users by always returning an empty list for all queries.
#
#enable_room_list_search: false
# The `alias_creation` option controls who's allowed to create aliases
# on this server.
#

View File

@@ -225,6 +225,8 @@ following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
^/_matrix/client/(api/v1|r0|unstable)/login$
^/_matrix/client/(api/v1|r0|unstable)/account/3pid$
^/_matrix/client/(api/v1|r0|unstable)/keys/query$
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
Additionally, the following REST endpoints can be handled, but all requests must
be routed to the same instance::

View File

@@ -7,14 +7,12 @@ set -e
# make sure that origin/develop is up to date
git remote set-branches --add origin develop
git fetch --depth=1 origin develop
UPSTREAM=origin/develop
git fetch origin develop
# if there are changes in the debian directory, check that the debian changelog
# has been updated
if ! git diff --quiet $UPSTREAM... -- debian; then
if git diff --quiet $UPSTREAM... -- debian/changelog; then
if ! git diff --quiet FETCH_HEAD... -- debian; then
if git diff --quiet FETCH_HEAD... -- debian/changelog; then
echo "Updates to debian directory, but no update to the changelog." >&2
exit 1
fi
@@ -22,7 +20,7 @@ fi
# if there are changes *outside* the debian directory, check that the
# newsfragments have been updated.
if git diff --name-only $UPSTREAM... | grep -qv '^develop/'; then
if git diff --name-only FETCH_HEAD... | grep -qv '^debian/'; then
tox -e check-newsfragment
fi
@@ -31,10 +29,10 @@ echo "--------------------------"
echo
# check that any new newsfiles on this branch end with a full stop.
for f in `git diff --name-only $UPSTREAM... -- changelog.d`; do
for f in `git diff --name-only FETCH_HEAD... -- changelog.d`; do
lastchar=`tr -d '\n' < $f | tail -c 1`
if [ $lastchar != '.' ]; then
echo -e "\e[31mERROR: newsfragment $f does not end with a '.'\e[39m" >&2
if [ $lastchar != '.' -a $lastchar != '!' ]; then
echo -e "\e[31mERROR: newsfragment $f does not end with a '.' or '!'\e[39m" >&2
exit 1
fi
done

View File

@@ -76,7 +76,7 @@ def rows_v2(server, json):
def main():
config = yaml.load(open(sys.argv[1]))
config = yaml.safe_load(open(sys.argv[1]))
valid_until = int(time.time() / (3600 * 24)) * 1000 * 3600 * 24
server_name = config["server_name"]

View File

@@ -27,4 +27,4 @@ try:
except ImportError:
pass
__version__ = "0.99.2"
__version__ = "0.99.3"

View File

@@ -621,13 +621,13 @@ class Auth(object):
Returns:
True if the the sender is allowed to redact the target event if the
target event was created by them.
target event was created by them.
False if the sender is allowed to redact the target event with no
further checks.
further checks.
Raises:
AuthError if the event sender is definitely not allowed to redact
the target event.
the target event.
"""
return event_auth.check_redaction(room_version, event, auth_events)
@@ -743,9 +743,9 @@ class Auth(object):
Returns:
Deferred[tuple[str, str|None]]: Resolves to the current membership of
the user in the room and the membership event ID of the user. If
the user is not in the room and never has been, then
`(Membership.JOIN, None)` is returned.
the user in the room and the membership event ID of the user. If
the user is not in the room and never has been, then
`(Membership.JOIN, None)` is returned.
"""
try:
@@ -777,20 +777,22 @@ class Auth(object):
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
MAU cohort
threepid(dict|None): If present, checks for presence against configured
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
"""
# Never fail an auth check for the server notices users or support user
# This can be a problem where event creation is prohibited due to blocking
is_support = yield self.store.is_support_user(user_id)
if user_id == self.hs.config.server_notices_mxid or is_support:
return
if user_id is not None:
if user_id == self.hs.config.server_notices_mxid:
return
if (yield self.store.is_support_user(user_id)):
return
if self.hs.config.hs_disabled:
raise ResourceLimitError(

View File

@@ -14,6 +14,8 @@
import collections
from synapse.api.errors import LimitExceededError
class Ratelimiter(object):
"""
@@ -23,12 +25,13 @@ class Ratelimiter(object):
def __init__(self):
self.message_counts = collections.OrderedDict()
def send_message(self, user_id, time_now_s, msg_rate_hz, burst_count, update=True):
"""Can the user send a message?
def can_do_action(self, key, time_now_s, rate_hz, burst_count, update=True):
"""Can the entity (e.g. user or IP address) perform the action?
Args:
user_id: The user sending a message.
key: The key we should use when rate limiting. Can be a user ID
(when sending events), an IP address, etc.
time_now_s: The time now.
msg_rate_hz: The long term number of messages a user can send in a
rate_hz: The long term number of messages a user can send in a
second.
burst_count: How many messages the user can send before being
limited.
@@ -41,10 +44,10 @@ class Ratelimiter(object):
"""
self.prune_message_counts(time_now_s)
message_count, time_start, _ignored = self.message_counts.get(
user_id, (0., time_now_s, None),
key, (0., time_now_s, None),
)
time_delta = time_now_s - time_start
sent_count = message_count - time_delta * msg_rate_hz
sent_count = message_count - time_delta * rate_hz
if sent_count < 0:
allowed = True
time_start = time_now_s
@@ -56,13 +59,13 @@ class Ratelimiter(object):
message_count += 1
if update:
self.message_counts[user_id] = (
message_count, time_start, msg_rate_hz
self.message_counts[key] = (
message_count, time_start, rate_hz
)
if msg_rate_hz > 0:
if rate_hz > 0:
time_allowed = (
time_start + (message_count - burst_count + 1) / msg_rate_hz
time_start + (message_count - burst_count + 1) / rate_hz
)
if time_allowed < time_now_s:
time_allowed = time_now_s
@@ -72,12 +75,22 @@ class Ratelimiter(object):
return allowed, time_allowed
def prune_message_counts(self, time_now_s):
for user_id in list(self.message_counts.keys()):
message_count, time_start, msg_rate_hz = (
self.message_counts[user_id]
for key in list(self.message_counts.keys()):
message_count, time_start, rate_hz = (
self.message_counts[key]
)
time_delta = time_now_s - time_start
if message_count - time_delta * msg_rate_hz > 0:
if message_count - time_delta * rate_hz > 0:
break
else:
del self.message_counts[user_id]
del self.message_counts[key]
def ratelimit(self, key, time_now_s, rate_hz, burst_count, update=True):
allowed, time_allowed = self.can_do_action(
key, time_now_s, rate_hz, burst_count, update
)
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now_s)),
)

View File

@@ -63,12 +63,13 @@ def start_worker_reactor(appname, config):
start_reactor(
appname,
config.soft_file_limit,
config.gc_thresholds,
config.worker_pid_file,
config.worker_daemonize,
config.worker_cpu_affinity,
logger,
soft_file_limit=config.soft_file_limit,
gc_thresholds=config.gc_thresholds,
pid_file=config.worker_pid_file,
daemonize=config.worker_daemonize,
cpu_affinity=config.worker_cpu_affinity,
print_pidfile=config.print_pidfile,
logger=logger,
)
@@ -79,6 +80,7 @@ def start_reactor(
pid_file,
daemonize,
cpu_affinity,
print_pidfile,
logger,
):
""" Run the reactor in the main process
@@ -93,6 +95,7 @@ def start_reactor(
pid_file (str): name of pid file to write to if daemonize is True
daemonize (bool): true to run the reactor in a background process
cpu_affinity (int|None): cpu affinity mask
print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
"""
@@ -124,6 +127,9 @@ def start_reactor(
reactor.run()
if daemonize:
if print_pidfile:
print(pid_file)
daemon = Daemonize(
app=appname,
pid=pid_file,

View File

@@ -33,9 +33,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
@@ -49,6 +53,7 @@ from synapse.rest.client.v1.room import (
RoomStateRestServlet,
)
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -61,6 +66,10 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
SlavedPushRuleStore,
SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
@@ -98,6 +107,8 @@ class ClientReaderServer(HomeServer):
RegisterRestServlet(self).register(resource)
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,

View File

@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
"""
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
"process_receipts_for_federation", self._on_new_receipts, rows,
)
@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
receipt.data,
)
yield self.federation_sender.send_read_receipt(receipt_info)
@defer.inlineCallbacks
def update_token(self, token):
try:

View File

@@ -376,6 +376,7 @@ def setup(config_options):
logger.info("Database prepared in %s.", config.database_config['name'])
hs.setup()
hs.setup_master()
@defer.inlineCallbacks
def do_acme():
@@ -636,17 +637,15 @@ def run(hs):
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print(hs.config.pid_file)
_base.start_reactor(
"synapse-homeserver",
hs.config.soft_file_limit,
hs.config.gc_thresholds,
hs.config.pid_file,
hs.config.daemonize,
hs.config.cpu_affinity,
logger,
soft_file_limit=hs.config.soft_file_limit,
gc_thresholds=hs.config.gc_thresholds,
pid_file=hs.config.pid_file,
daemonize=hs.config.daemonize,
cpu_affinity=hs.config.cpu_affinity,
print_pidfile=hs.config.print_pidfile,
logger=logger,
)

View File

@@ -137,7 +137,7 @@ class Config(object):
@staticmethod
def read_config_file(file_path):
with open(file_path) as file_stream:
return yaml.load(file_stream)
return yaml.safe_load(file_stream)
def invoke_all(self, name, *args, **kargs):
results = []
@@ -214,14 +214,20 @@ class Config(object):
" Defaults to the directory containing the last config file",
)
obj = cls()
obj.invoke_all("add_arguments", config_parser)
config_args = config_parser.parse_args(argv)
config_files = find_config_files(search_paths=config_args.config_path)
obj = cls()
obj.read_config_files(
config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
obj.invoke_all("read_arguments", config_args)
return obj
@classmethod
@@ -312,7 +318,7 @@ class Config(object):
)
config_file.write(config_str)
config = yaml.load(config_str)
config = yaml.safe_load(config_str)
obj.invoke_all("generate_files", config)
print(
@@ -384,7 +390,7 @@ class Config(object):
server_name=server_name,
generate_secrets=False,
)
config = yaml.load(config_string)
config = yaml.safe_load(config_string)
config.pop("log_config")
config.update(specified_config)
@@ -399,7 +405,10 @@ class Config(object):
self.invoke_all("generate_files", config)
return
self.invoke_all("read_config", config)
self.parse_config_dict(config)
def parse_config_dict(self, config_dict):
self.invoke_all("read_config", config_dict)
def find_config_files(search_paths):

View File

@@ -34,10 +34,10 @@ class ApiConfig(Config):
# A list of event types that will be included in the room_invite_state
#
room_invite_state_types:
- "{JoinRules}"
- "{CanonicalAlias}"
- "{RoomAvatar}"
- "{RoomEncryption}"
- "{Name}"
#room_invite_state_types:
# - "{JoinRules}"
# - "{CanonicalAlias}"
# - "{RoomAvatar}"
# - "{RoomEncryption}"
# - "{Name}"
""".format(**vars(EventTypes))

View File

@@ -37,14 +37,16 @@ class AppServiceConfig(Config):
def default_config(cls, **kwargs):
return """\
# A list of application service config file to use
# A list of application service config files to use
#
app_service_config_files: []
#app_service_config_files:
# - app_service_1.yaml
# - app_service_2.yaml
# Whether or not to track application service IP addresses. Implicitly
# Uncomment to enable tracking of application service IP addresses. Implicitly
# enables MAU tracking for application service users.
#
track_appservice_user_ips: False
#track_appservice_user_ips: True
"""
@@ -66,7 +68,7 @@ def load_appservices(hostname, config_files):
try:
with open(config_file, 'r') as f:
appservice = _load_appservice(
hostname, yaml.load(f), config_file
hostname, yaml.safe_load(f), config_file
)
if appservice.id in seen_ids:
raise ConfigError(

View File

@@ -18,11 +18,16 @@ from ._base import Config
class CaptchaConfig(Config):
def read_config(self, config):
self.recaptcha_private_key = config["recaptcha_private_key"]
self.recaptcha_public_key = config["recaptcha_public_key"]
self.enable_registration_captcha = config["enable_registration_captcha"]
self.recaptcha_private_key = config.get("recaptcha_private_key")
self.recaptcha_public_key = config.get("recaptcha_public_key")
self.enable_registration_captcha = config.get(
"enable_registration_captcha", False
)
self.captcha_bypass_secret = config.get("captcha_bypass_secret")
self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
self.recaptcha_siteverify_api = config.get(
"recaptcha_siteverify_api",
"https://www.recaptcha.net/recaptcha/api/siteverify",
)
def default_config(self, **kwargs):
return """\
@@ -31,21 +36,23 @@ class CaptchaConfig(Config):
# This Home Server's ReCAPTCHA public key.
#
recaptcha_public_key: "YOUR_PUBLIC_KEY"
#recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key.
#
recaptcha_private_key: "YOUR_PRIVATE_KEY"
#recaptcha_private_key: "YOUR_PRIVATE_KEY"
# Enables ReCaptcha checks when registering, preventing signup
# unless a captcha is answered. Requires a valid ReCaptcha
# public/private key.
#
enable_registration_captcha: False
#enable_registration_captcha: false
# A secret key used to bypass the captcha test entirely.
#
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
#
#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
"""

View File

@@ -60,7 +60,8 @@ class DatabaseConfig(Config):
database: "%(database_path)s"
# Number of events to cache in memory.
event_cache_size: "10K"
#
#event_cache_size: 10K
""" % locals()
def read_arguments(self, args):

View File

@@ -23,9 +23,9 @@ class GroupsConfig(Config):
def default_config(self, **kwargs):
return """\
# Whether to allow non server admins to create groups on this server
# Uncomment to allow non-server-admin users to create groups on this server
#
enable_group_creation: false
#enable_group_creation: true
# If enabled, non server admins can only create groups with local parts
# starting with this prefix

View File

@@ -38,15 +38,26 @@ logger = logging.getLogger(__name__)
class KeyConfig(Config):
def read_config(self, config):
self.signing_key = self.read_signing_key(config["signing_key_path"])
# the signing key can be specified inline or in a separate file
if "signing_key" in config:
self.signing_key = read_signing_keys([config["signing_key"]])
else:
self.signing_key = self.read_signing_key(config["signing_key_path"])
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})
)
self.key_refresh_interval = self.parse_duration(
config["key_refresh_interval"]
config.get("key_refresh_interval", "1d"),
)
self.perspectives = self.read_perspectives(
config["perspectives"]
config.get("perspectives", {}).get("servers", {
"matrix.org": {"verify_keys": {
"ed25519:auto": {
"key": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw",
}
}}
})
)
self.macaroon_secret_key = config.get(
@@ -88,7 +99,7 @@ class KeyConfig(Config):
# Used to enable access token expiration.
#
expire_access_token: False
#expire_access_token: False
# a secret which is used to calculate HMACs for form values, to stop
# falsification of values. Must be specified for the User Consent
@@ -117,21 +128,21 @@ class KeyConfig(Config):
# Determines how quickly servers will query to check which keys
# are still valid.
#
key_refresh_interval: "1d" # 1 Day.
#key_refresh_interval: 1d
# The trusted servers to download signing keys from.
#
perspectives:
servers:
"matrix.org":
verify_keys:
"ed25519:auto":
key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
#perspectives:
# servers:
# "matrix.org":
# verify_keys:
# "ed25519:auto":
# key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
""" % locals()
def read_perspectives(self, perspectives_config):
def read_perspectives(self, perspectives_servers):
servers = {}
for server_name, server_config in perspectives_config["servers"].items():
for server_name, server_config in perspectives_servers.items():
for key_id, key_data in server_config["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]

View File

@@ -195,7 +195,7 @@ def setup_logging(config, use_worker_options=False):
else:
def load_log_config():
with open(log_config, 'r') as f:
logging.config.dictConfig(yaml.load(f))
logging.config.dictConfig(yaml.safe_load(f))
def sighup(*args):
# it might be better to use a file watcher or something for this.

View File

@@ -24,7 +24,7 @@ MISSING_SENTRY = (
class MetricsConfig(Config):
def read_config(self, config):
self.enable_metrics = config["enable_metrics"]
self.enable_metrics = config.get("enable_metrics", False)
self.report_stats = config.get("report_stats", None)
self.metrics_port = config.get("metrics_port")
self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
@@ -48,7 +48,7 @@ class MetricsConfig(Config):
# Enable collection and rendering of performance metrics
#
enable_metrics: False
#enable_metrics: False
# Enable sentry integration
# NOTE: While attempts are made to ensure that the logs don't contain

View File

@@ -22,16 +22,21 @@ class PasswordConfig(Config):
def read_config(self, config):
password_config = config.get("password_config", {})
if password_config is None:
password_config = {}
self.password_enabled = password_config.get("enabled", True)
self.password_pepper = password_config.get("pepper", "")
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Enable password for login.
#
return """\
password_config:
enabled: true
# Uncomment to disable password login
#
#enabled: false
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
#pepper: ""
#
#pepper: "EVEN_MORE_SECRET"
"""

View File

@@ -15,17 +15,36 @@
from ._base import Config
class RateLimitConfig(object):
def __init__(self, config):
self.per_second = config.get("per_second", 0.17)
self.burst_count = config.get("burst_count", 3.0)
class RatelimitConfig(Config):
def read_config(self, config):
self.rc_messages_per_second = config["rc_messages_per_second"]
self.rc_message_burst_count = config["rc_message_burst_count"]
self.rc_messages_per_second = config.get("rc_messages_per_second", 0.2)
self.rc_message_burst_count = config.get("rc_message_burst_count", 10.0)
self.federation_rc_window_size = config["federation_rc_window_size"]
self.federation_rc_sleep_limit = config["federation_rc_sleep_limit"]
self.federation_rc_sleep_delay = config["federation_rc_sleep_delay"]
self.federation_rc_reject_limit = config["federation_rc_reject_limit"]
self.federation_rc_concurrent = config["federation_rc_concurrent"]
self.rc_registration = RateLimitConfig(config.get("rc_registration", {}))
rc_login_config = config.get("rc_login", {})
self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {}))
self.rc_login_account = RateLimitConfig(rc_login_config.get("account", {}))
self.rc_login_failed_attempts = RateLimitConfig(
rc_login_config.get("failed_attempts", {}),
)
self.federation_rc_window_size = config.get("federation_rc_window_size", 1000)
self.federation_rc_sleep_limit = config.get("federation_rc_sleep_limit", 10)
self.federation_rc_sleep_delay = config.get("federation_rc_sleep_delay", 500)
self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50)
self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3)
self.federation_rr_transactions_per_room_per_second = config.get(
"federation_rr_transactions_per_room_per_second", 50,
)
def default_config(self, **kwargs):
return """\
@@ -33,33 +52,75 @@ class RatelimitConfig(Config):
# Number of messages a client can send per second
#
rc_messages_per_second: 0.2
#rc_messages_per_second: 0.2
# Number of message a client can send before being throttled
#
rc_message_burst_count: 10.0
#rc_message_burst_count: 10.0
# Ratelimiting settings for registration and login.
#
# Each ratelimiting configuration is made of two parameters:
# - per_second: number of requests a client can send per second.
# - burst_count: number of requests a client can send before being throttled.
#
# Synapse currently uses the following configurations:
# - one for registration that ratelimits registration requests based on the
# client's IP address.
# - one for login that ratelimits login requests based on the client's IP
# address.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into.
# - one for login that ratelimits login requests based on the account the
# client is attempting to log into, based on the amount of failed login
# attempts for this account.
#
# The defaults are as shown below.
#
#rc_registration:
# per_second: 0.17
# burst_count: 3
#
#rc_login:
# address:
# per_second: 0.17
# burst_count: 3
# account:
# per_second: 0.17
# burst_count: 3
# failed_attempts:
# per_second: 0.17
# burst_count: 3
# The federation window size in milliseconds
#
federation_rc_window_size: 1000
#federation_rc_window_size: 1000
# The number of federation requests from a single server in a window
# before the server will delay processing the request.
#
federation_rc_sleep_limit: 10
#federation_rc_sleep_limit: 10
# The duration in milliseconds to delay processing events from
# remote servers by if they go over the sleep limit.
#
federation_rc_sleep_delay: 500
#federation_rc_sleep_delay: 500
# The maximum number of concurrent federation requests allowed
# from a single server
#
federation_rc_reject_limit: 50
#federation_rc_reject_limit: 50
# The number of federation requests to concurrently process from a
# single server
#
federation_rc_concurrent: 3
#federation_rc_concurrent: 3
# Target outgoing federation transaction frequency for sending read-receipts,
# per-room.
#
# If we end up trying to send out more read-receipts, they will get buffered up
# into fewer transactions.
#
#federation_rr_transactions_per_room_per_second: 50
"""

View File

@@ -24,7 +24,7 @@ class RegistrationConfig(Config):
def read_config(self, config):
self.enable_registration = bool(
strtobool(str(config["enable_registration"]))
strtobool(str(config.get("enable_registration", False)))
)
if "disable_registration" in config:
self.enable_registration = not bool(
@@ -36,7 +36,10 @@ class RegistrationConfig(Config):
self.registration_shared_secret = config.get("registration_shared_secret")
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
self.trusted_third_party_id_servers = config.get(
"trusted_third_party_id_servers",
["matrix.org", "vector.im"],
)
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
@@ -64,9 +67,13 @@ class RegistrationConfig(Config):
return """\
## Registration ##
#
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users.
enable_registration: False
#
#enable_registration: false
# The user must provide all of the below types of 3PID when registering.
#
@@ -77,7 +84,7 @@ class RegistrationConfig(Config):
# Explicitly disable asking for MSISDNs from the registration
# flow (overrides registrations_require_3pid if MSISDNs are set as required)
#
#disable_msisdn_registration: True
#disable_msisdn_registration: true
# Mandate that users are only allowed to associate certain formats of
# 3PIDs with accounts on this server.
@@ -90,8 +97,8 @@ class RegistrationConfig(Config):
# - medium: msisdn
# pattern: '\\+44'
# If set, allows registration by anyone who also has the shared
# secret, even if registration is otherwise disabled.
# If set, allows registration of standard or admin accounts by anyone who
# has the shared secret, even if registration is otherwise disabled.
#
%(registration_shared_secret)s
@@ -101,13 +108,13 @@ class RegistrationConfig(Config):
# N.B. that increasing this will exponentially increase the time required
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
#
bcrypt_rounds: 12
#bcrypt_rounds: 12
# Allows users to register as guests without a password/email/etc, and
# participate in rooms hosted on this server which have been made
# accessible to anonymous users.
#
allow_guest_access: False
#allow_guest_access: false
# The identity server which we suggest that clients should use when users log
# in on this server.
@@ -123,9 +130,9 @@ class RegistrationConfig(Config):
# Also defines the ID server which will be called when an account is
# deactivated (one will be picked arbitrarily).
#
trusted_third_party_id_servers:
- matrix.org
- vector.im
#trusted_third_party_id_servers:
# - matrix.org
# - vector.im
# Users who register on this homeserver will automatically be joined
# to these rooms
@@ -139,7 +146,7 @@ class RegistrationConfig(Config):
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
autocreate_auto_join_rooms: true
#autocreate_auto_join_rooms: true
""" % locals()
def add_arguments(self, parser):

View File

@@ -19,6 +19,36 @@ from synapse.util.module_loader import load_module
from ._base import Config, ConfigError
DEFAULT_THUMBNAIL_SIZES = [
{
"width": 32,
"height": 32,
"method": "crop",
}, {
"width": 96,
"height": 96,
"method": "crop",
}, {
"width": 320,
"height": 240,
"method": "scale",
}, {
"width": 640,
"height": 480,
"method": "scale",
}, {
"width": 800,
"height": 600,
"method": "scale"
},
]
THUMBNAIL_SIZE_YAML = """\
# - width: %(width)i
# height: %(height)i
# method: %(method)s
"""
MISSING_NETADDR = (
"Missing netaddr library. This is required for URL preview API."
)
@@ -77,9 +107,9 @@ def parse_thumbnail_requirements(thumbnail_sizes):
class ContentRepositoryConfig(Config):
def read_config(self, config):
self.max_upload_size = self.parse_size(config["max_upload_size"])
self.max_image_pixels = self.parse_size(config["max_image_pixels"])
self.max_spider_size = self.parse_size(config["max_spider_size"])
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
self.media_store_path = self.ensure_directory(config["media_store_path"])
@@ -139,9 +169,9 @@ class ContentRepositoryConfig(Config):
)
self.uploads_path = self.ensure_directory(config["uploads_path"])
self.dynamic_thumbnails = config["dynamic_thumbnails"]
self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements(
config["thumbnail_sizes"]
config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES),
)
self.url_preview_enabled = config.get("url_preview_enabled", False)
if self.url_preview_enabled:
@@ -178,6 +208,13 @@ class ContentRepositoryConfig(Config):
def default_config(self, data_dir_path, **kwargs):
media_store = os.path.join(data_dir_path, "media_store")
uploads_path = os.path.join(data_dir_path, "uploads")
formatted_thumbnail_sizes = "".join(
THUMBNAIL_SIZE_YAML % s for s in DEFAULT_THUMBNAIL_SIZES
)
# strip final NL
formatted_thumbnail_sizes = formatted_thumbnail_sizes[:-1]
return r"""
# Directory where uploaded images and attachments are stored.
#
@@ -204,11 +241,11 @@ class ContentRepositoryConfig(Config):
# The largest allowed upload size in bytes
#
max_upload_size: "10M"
#max_upload_size: 10M
# Maximum number of pixels that will be thumbnailed
#
max_image_pixels: "32M"
#max_image_pixels: 32M
# Whether to generate new thumbnails on the fly to precisely match
# the resolution requested by the client. If true then whenever
@@ -216,32 +253,18 @@ class ContentRepositoryConfig(Config):
# generate a new thumbnail. If false the server will pick a thumbnail
# from a precalculated list.
#
dynamic_thumbnails: false
#dynamic_thumbnails: false
# List of thumbnails to precalculate when an image is uploaded.
#
thumbnail_sizes:
- width: 32
height: 32
method: crop
- width: 96
height: 96
method: crop
- width: 320
height: 240
method: scale
- width: 640
height: 480
method: scale
- width: 800
height: 600
method: scale
#thumbnail_sizes:
%(formatted_thumbnail_sizes)s
# Is the preview URL API enabled? If enabled, you *must* specify
# an explicit url_preview_ip_range_blacklist of IPs that the spider is
# denied from accessing.
#
url_preview_enabled: False
#url_preview_enabled: false
# List of IP address CIDR ranges that the URL preview spider is denied
# from accessing. There are no defaults: you must explicitly
@@ -306,6 +329,6 @@ class ContentRepositoryConfig(Config):
# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
# The largest allowed URL preview spidering size in bytes
max_spider_size: "10M"
#
#max_spider_size: 10M
""" % locals()

View File

@@ -20,6 +20,10 @@ from ._base import Config, ConfigError
class RoomDirectoryConfig(Config):
def read_config(self, config):
self.enable_room_list_search = config.get(
"enable_room_list_search", True,
)
alias_creation_rules = config.get("alias_creation_rules")
if alias_creation_rules is not None:
@@ -54,6 +58,12 @@ class RoomDirectoryConfig(Config):
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Uncomment to disable searching the public room list. When disabled
# blocks searching local and remote room lists for local and remote
# users by always returning an empty list for all queries.
#
#enable_room_list_search: false
# The `alias_creation` option controls who's allowed to create aliases
# on this server.
#

View File

@@ -64,7 +64,7 @@ class SAML2Config(Config):
}
def default_config(self, config_dir_path, server_name, **kwargs):
return """
return """\
# Enable SAML2 for registration and login. Uses pysaml2.
#
# `sp_config` is the configuration for the pysaml2 Service Provider.

View File

@@ -45,7 +45,7 @@ class ServerConfig(Config):
self.pid_file = self.abspath(config.get("pid_file"))
self.web_client_location = config.get("web_client_location", None)
self.soft_file_limit = config["soft_file_limit"]
self.soft_file_limit = config.get("soft_file_limit", 0)
self.daemonize = config.get("daemonize")
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
@@ -126,6 +126,11 @@ class ServerConfig(Config):
self.public_baseurl += '/'
self.start_pushers = config.get("start_pushers", True)
# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
# sending out any replication updates.
self.replication_torture_level = config.get("replication_torture_level")
self.listeners = []
for listener in config.get("listeners", []):
if not isinstance(listener.get("port", None), int):
@@ -307,11 +312,11 @@ class ServerConfig(Config):
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
#
soft_file_limit: 0
#soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
#
use_presence: true
#use_presence: false
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
#

View File

@@ -181,6 +181,11 @@ class TlsConfig(Config):
# See 'ACME support' below to enable auto-provisioning this certificate via
# Let's Encrypt.
#
# If supplying your own, be sure to use a `.pem` file that includes the
# full certificate chain including any intermediate certificates (for
# instance, if using certbot, use `fullchain.pem` as your certificate,
# not `cert.pem`).
#
#tls_certificate_path: "%(tls_certificate_path)s"
# PEM-encoded private key for TLS

View File

@@ -22,9 +22,13 @@ class UserDirectoryConfig(Config):
"""
def read_config(self, config):
self.user_directory_search_enabled = True
self.user_directory_search_all_users = False
user_directory_config = config.get("user_directory", None)
if user_directory_config:
self.user_directory_search_enabled = (
user_directory_config.get("enabled", True)
)
self.user_directory_search_all_users = (
user_directory_config.get("search_all_users", False)
)
@@ -33,6 +37,10 @@ class UserDirectoryConfig(Config):
return """
# User Directory configuration
#
# 'enabled' defines whether users can search the user directory. If
# false then empty responses are returned to all queries. Defaults to
# true.
#
# 'search_all_users' defines whether to search all users visible to your HS
# when searching the user directory, rather than limiting to users visible
# in public rooms. Defaults to false. If you set it True, you'll have to run
@@ -40,5 +48,6 @@ class UserDirectoryConfig(Config):
# on your database to tell it to rebuild the user_directory search indexes.
#
#user_directory:
# enabled: true
# search_all_users: false
"""

View File

@@ -22,7 +22,9 @@ class VoipConfig(Config):
self.turn_shared_secret = config.get("turn_shared_secret")
self.turn_username = config.get("turn_username")
self.turn_password = config.get("turn_password")
self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
self.turn_user_lifetime = self.parse_duration(
config.get("turn_user_lifetime", "1h"),
)
self.turn_allow_guests = config.get("turn_allow_guests", True)
def default_config(self, **kwargs):
@@ -45,7 +47,7 @@ class VoipConfig(Config):
# How long generated TURN credentials last
#
turn_user_lifetime: "1h"
#turn_user_lifetime: 1h
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
@@ -53,5 +55,5 @@ class VoipConfig(Config):
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
#
turn_allow_guests: True
#turn_allow_guests: True
"""

View File

@@ -28,7 +28,7 @@ class WorkerConfig(Config):
if self.worker_app == "synapse.app.homeserver":
self.worker_app = None
self.worker_listeners = config.get("worker_listeners")
self.worker_listeners = config.get("worker_listeners", [])
self.worker_daemonize = config.get("worker_daemonize")
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
@@ -48,6 +48,17 @@ class WorkerConfig(Config):
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
self.worker_cpu_affinity = config.get("worker_cpu_affinity")
# This option is really only here to support `--manhole` command line
# argument.
manhole = config.get("worker_manhole")
if manhole:
self.worker_listeners.append({
"port": manhole,
"bind_addresses": ["127.0.0.1"],
"type": "manhole",
"tls": False,
})
if self.worker_listeners:
for listener in self.worker_listeners:
bind_address = listener.pop("bind_address", None)
@@ -57,3 +68,18 @@ class WorkerConfig(Config):
bind_addresses.append(bind_address)
elif not bind_addresses:
bind_addresses.append('')
def read_arguments(self, args):
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
# on workers so we also have to override them when command line options
# are specified.
if args.daemonize is not None:
self.worker_daemonize = args.daemonize
if args.log_config is not None:
self.worker_log_config = args.log_config
if args.log_file is not None:
self.worker_log_file = args.log_file
if args.manhole is not None:
self.worker_manhole = args.worker_manhole

View File

@@ -686,9 +686,9 @@ def _handle_key_deferred(verify_request):
try:
with PreserveLoggingContext():
_, key_id, verify_key = yield verify_request.deferred
except (IOError, RequestSendFailed) as e:
except KeyLookupError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
"Failed to download keys for %s: %s %s",
server_name, type(e).__name__, str(e),
)
raise SynapseError(

View File

@@ -77,6 +77,20 @@ class _EventInternalMetadata(object):
"""
return getattr(self, "recheck_redaction", False)
def is_soft_failed(self):
"""Whether the event has been soft failed.
Soft failed events should be handled as usual, except:
1. They should not go down sync or event streams, or generally
sent to clients.
2. They should not be added to the forward extremities (and
therefore not to current state).
Returns:
bool
"""
return getattr(self, "soft_failed", False)
def _event_dict_property(key):
# We want to be able to use hasattr with the event dict properties.
@@ -127,7 +141,6 @@ class EventBase(object):
origin = _event_dict_property("origin")
origin_server_ts = _event_dict_property("origin_server_ts")
prev_events = _event_dict_property("prev_events")
prev_state = _event_dict_property("prev_state")
redacts = _event_dict_property("redacts")
room_id = _event_dict_property("room_id")
sender = _event_dict_property("sender")

View File

@@ -46,7 +46,7 @@ logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(object):
"""A drop in replacement for TransactionQueue"""
"""A drop in replacement for FederationSender"""
def __init__(self, hs):
self.server_name = hs.hostname
@@ -154,13 +154,13 @@ class FederationRemoteSendQueue(object):
del self.device_messages[key]
def notify_new_events(self, current_id):
"""As per TransactionQueue"""
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""As per TransactionQueue"""
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
@@ -183,8 +183,17 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_read_receipt(self, receipt):
"""As per FederationSender
Args:
receipt (synapse.types.ReadReceipt):
"""
# nothing to do here: the replication listener will handle it.
pass
def send_presence(self, states):
"""As per TransactionQueue
"""As per FederationSender
Args:
states (list(UserPresenceState))
@@ -201,7 +210,7 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
"""As per TransactionQueue"""
"""As per FederationSender"""
pos = self._next_pos()
self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
@@ -439,7 +448,7 @@ def process_rows_for_federation(transaction_queue, rows):
transaction queue ready for sending to the relevant homeservers.
Args:
transaction_queue (TransactionQueue)
transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
"""

View File

@@ -0,0 +1,465 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from six import itervalues
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import logcontext
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
"Total number of PDUs queued for sending across all destinations",
)
class FederationSender(object):
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
self._transaction_manager = TransactionManager(hs)
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: dict[str, PerDestinationQueue]
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: sum(
1 for d in self._per_destination_queues.values()
if d.transmission_loop_running
),
)
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
),
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
),
)
self._order = 1
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room = {
} # type: dict[str, set[PerDestinationQueue]]
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
)
def _get_per_destination_queue(self, destination):
"""Get or create a PerDestinationQueue for the given destination
Args:
destination (str): server_name of remote server
Returns:
PerDestinationQueue
"""
queue = self._per_destination_queues.get(destination)
if not queue:
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
self._per_destination_queues[destination] = queue
return queue
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
"""
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
return
# fire off a processing loop in the background
run_as_background_process(
"process_event_queue_for_federation",
self._process_event_queue_loop,
)
@defer.inlineCallbacks
def _process_event_queue_loop(self):
try:
self._is_processing = True
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
if not events and next_token >= self._last_poked_id:
break
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=event.prev_event_ids(),
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
destinations = set(destinations)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in itervalues(events_by_room)
],
consumeErrors=True
))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
order = self._order
self._order += 1
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)
@defer.inlineCallbacks
def send_read_receipt(self, receipt):
"""Send a RR to any other servers in the room
Args:
receipt (synapse.types.ReadReceipt): receipt to be sent
"""
# Some background on the rate-limiting going on here.
#
# It turns out that if we attempt to send out RRs as soon as we get them from
# a client, then we end up trying to do several hundred Hz of federation
# transactions. (The number of transactions scales as O(N^2) on the size of a
# room, since in a large room we have both more RRs coming in, and more servers
# to send them to.)
#
# This leads to a lot of CPU load, and we end up getting behind. The solution
# currently adopted is as follows:
#
# The first receipt in a given room is sent out immediately, at time T0. Any
# further receipts are, in theory, batched up for N seconds, where N is calculated
# based on the number of servers in the room to achieve a transaction frequency
# of around 50Hz. So, for example, if there were 100 servers in the room, then
# N would be 100 / 50Hz = 2 seconds.
#
# Then, after T+N, we flush out any receipts that have accumulated, and restart
# the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
# we stop the cycle and go back to the start.
#
# However, in practice, it is often possible to flush out receipts earlier: in
# particular, if we are sending a transaction to a given server anyway (for
# example, because we have a PDU or a RR in another room to send), then we may
# as well send out all of the pending RRs for that server. So it may be that
# by the time we get to T+N, we don't actually have any RRs left to send out.
# Nevertheless we continue to buffer up RRs for the room in question until we
# reach the point that no RRs arrive between timer ticks.
#
# For even more background, see https://github.com/matrix-org/synapse/issues/4730.
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(room_id)
domains = [d for d in domains if d != self.server_name]
if not domains:
return
queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(
room_id
)
# if there is no flush yet scheduled, we will send out these receipts with
# immediate flushes, and schedule the next flush for this room.
if queues_pending_flush is not None:
logger.debug("Queuing receipt for: %r", domains)
else:
logger.debug("Sending receipt to: %r", domains)
self._schedule_rr_flush_for_room(room_id, len(domains))
for domain in domains:
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
# if there is already a RR flush pending for this room, then make sure this
# destination is registered for the flush
if queues_pending_flush is not None:
queues_pending_flush.add(queue)
else:
queue.flush_read_receipts_for_room(room_id)
def _schedule_rr_flush_for_room(self, room_id, n_domains):
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()
def _flush_rrs_for_room(self, room_id):
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)
if not queues:
# no more RRs arrived for this room; we are done.
return
# schedule the next flush
self._schedule_rr_flush_for_room(room_id, len(queues))
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""Construct an Edu object, and queue it for sending
Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
self.send_edu(edu, key)
def send_edu(self, edu, key):
"""Queue an EDU for sending
Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
"""
queue = self._get_per_destination_queue(edu.destination)
if key:
queue.send_keyed_edu(edu, key)
else:
queue.send_edu(edu)
def send_device_messages(self, destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
def get_current_token(self):
return 0

View File

@@ -0,0 +1,378 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
)
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
logger = logging.getLogger(__name__)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
)
class PerDestinationQueue(object):
"""
Manages the per-destination transmission queues.
Args:
hs (synapse.HomeServer):
transaction_sender (TransactionManager):
destination (str): the server_name of the destination that we are managing
transmission for.
"""
def __init__(self, hs, transaction_manager, destination):
self._server_name = hs.hostname
self._clock = hs.get_clock()
self._store = hs.get_datastore()
self._transaction_manager = transaction_manager
self._destination = destination
self.transmission_loop_running = False
# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: list[tuple[EventBase, int]]
self._pending_edus = [] # type: list[Edu]
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self._last_device_stream_id = 0
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
def __str__(self):
return "PerDestinationQueue[%s]" % self._destination
def pending_pdu_count(self):
return len(self._pending_pdus)
def pending_edu_count(self):
return (
len(self._pending_edus)
+ len(self._pending_presence)
+ len(self._pending_edus_keyed)
)
def send_pdu(self, pdu, order):
"""Add a PDU to the queue, and start the transmission loop if neccessary
Args:
pdu (EventBase): pdu to send
order (int):
"""
self._pending_pdus.append((pdu, order))
self.attempt_new_transaction()
def send_presence(self, states):
"""Add presence updates to the queue. Start the transmission loop if neccessary.
Args:
states (iterable[UserPresenceState]): presence to send
"""
self._pending_presence.update({
state.user_id: state for state in states
})
self.attempt_new_transaction()
def queue_read_receipt(self, receipt):
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
(see flush_read_receipts_for_room)
Args:
receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
"""
self._pending_rrs.setdefault(
receipt.room_id, {},
).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {
"event_ids": receipt.event_ids,
"data": receipt.data,
}
def flush_read_receipts_for_room(self, room_id):
# if we don't have any read-receipts for this room, it may be that we've already
# sent them out, so we don't need to flush.
if room_id not in self._pending_rrs:
return
self._rrs_pending_flush = True
self.attempt_new_transaction()
def send_keyed_edu(self, edu, key):
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
def send_edu(self, edu):
self._pending_edus.append(edu)
self.attempt_new_transaction()
def attempt_new_transaction(self):
"""Try to start a new transaction to this destination
If there is already a transaction in progress to this destination,
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
# list of (pending_pdu, deferred, order)
if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
self._destination
)
return
logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
)
@defer.inlineCallbacks
def _transaction_transmission_loop(self):
pending_pdus = []
try:
self.transmission_loop_running = True
# This will throw if we wouldn't retry. We do this here so we fail
# quickly, but we will later check this again in the http client,
# hence why we throw the result away.
yield get_retry_limiter(self._destination, self._clock, self._store)
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages()
)
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
pending_edus = []
pending_edus.extend(self._get_rr_edus(force_flush=False))
# We can only include at most 100 EDUs per transactions
pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
pending_edus.extend(
self._pending_edus_keyed.values()
)
self._pending_edus_keyed = {}
pending_edus.extend(device_message_edus)
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination, len(pending_pdus))
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < 100:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now",
self._destination,
datetime.datetime.fromtimestamp(
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
logger.warning(
"TX [%s] Received %d response to transaction: %s",
self._destination, e.code, e,
)
except RequestSendFailed as e:
logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
self._destination)
except Exception:
logger.exception(
"TX [%s] Failed to send transaction",
self._destination,
)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
self._destination)
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
def _get_rr_edus(self, force_flush):
if not self._pending_rrs:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
edu = Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.receipt",
content=self._pending_rrs,
)
self._pending_rrs = {}
self._rrs_pending_flush = False
yield edu
def _pop_pending_edus(self, limit):
pending_edus = self._pending_edus
pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
return pending_edus
@defer.inlineCallbacks
def _get_new_device_messages(self):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.direct_to_device",
content=content,
)
for content in contents
]
last_device_list = self._last_device_list_stream_id
now_stream_id, results = yield self._store.get_devices_by_remote(
self._destination, last_device_list
)
edus.extend(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.device_list_update",
content=content,
)
for content in results
)
defer.returnValue((edus, stream_id, now_stream_id))

View File

@@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
class TransactionManager(object):
"""Helper class which handles building and sending transactions
shared between PerDestinationQueue objects
"""
def __init__(self, hs):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self._store = hs.get_datastore()
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d)",
destination, txn_id,
len(pdus),
len(edus),
)
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
self._next_txn_id += 1
yield self._transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
raise e
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
yield self._transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
)
success = False
defer.returnValue(success)

View File

@@ -1,716 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
from six import itervalues
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
)
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
sent_transactions_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from .persistence import TransactionActions
from .units import Edu, Transaction
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
"Total number of PDUs queued for sending across all destinations",
)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
)
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
a time for a given destination.
It batches pending PDUs into single transactions.
"""
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.transaction_actions = TransactionActions(self.store)
self.transport_layer = hs.get_federation_transport_client()
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
# Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are
# done
self.pending_transactions = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: len(self.pending_transactions),
)
# Is a mapping from destination -> list of
# tuple(pending pdus, deferred, order)
self.pending_pdus_by_dest = pdus = {}
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
# Map of destination -> user_id -> UserPresenceState of pending presence
# to be sent to each destinations
self.pending_presence_by_dest = presence = {}
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(map(len, pdus.values())),
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
+ sum(map(len, edus_keyed.values()))
),
)
# destination -> stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self.last_device_stream_id_by_dest = {}
# destination -> stream_id of last successfully sent device list
# update.
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
self._order = 1
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
"""
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
return
# fire off a processing loop in the background
run_as_background_process(
"process_event_queue_for_federation",
self._process_event_queue_loop,
)
@defer.inlineCallbacks
def _process_event_queue_loop(self):
try:
self._is_processing = True
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
if not events and next_token >= self._last_poked_id:
break
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=event.prev_event_ids(),
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
destinations = set(destinations)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in itervalues(events_by_room)
],
consumeErrors=True
))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
order = self._order
self._order += 1
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, order)
)
self._attempt_new_transaction(destination)
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
self.pending_presence_by_dest.setdefault(
destination, {}
).update({
state.user_id: state for state in states
})
self._attempt_new_transaction(destination)
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""Construct an Edu object, and queue it for sending
Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
self.send_edu(edu, key)
def send_edu(self, edu, key):
"""Queue an EDU for sending
Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
"""
if key:
self.pending_edus_keyed_by_dest.setdefault(
edu.destination, {}
)[(edu.edu_type, key)] = edu
else:
self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)
self._attempt_new_transaction(edu.destination)
def send_device_messages(self, destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
return
self._attempt_new_transaction(destination)
def get_current_token(self):
return 0
def _attempt_new_transaction(self, destination):
"""Try to start a new transaction to this destination
If there is already a transaction in progress to this destination,
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
Args:
destination (str):
Returns:
None
"""
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
destination
)
return
logger.debug("TX [%s] Starting transaction loop", destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
destination,
)
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
pending_pdus = []
try:
self.pending_transactions[destination] = 1
# This will throw if we wouldn't retry. We do this here so we fail
# quickly, but we will later check this again in the http client,
# hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store)
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# pending_transactions flag.
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = (
device_stream_id
)
return
# END CRITICAL SECTION
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus,
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now",
destination,
datetime.datetime.fromtimestamp(
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
logger.warning(
"TX [%s] Received %d response to transaction: %s",
destination, e.code, e,
)
except RequestSendFailed as e:
logger.warning("TX [%s] Failed to send transaction: %s", destination, e)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
except Exception:
logger.exception(
"TX [%s] Failed to send transaction",
destination,
)
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
to_device_stream_id = self.store.get_to_device_stream_token()
contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
destination, last_device_stream_id, to_device_stream_id
)
edus = [
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.direct_to_device",
content=content,
)
for content in contents
]
last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
now_stream_id, results = yield self.store.get_devices_by_remote(
destination, last_device_list
)
edus.extend(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.device_list_update",
content=content,
)
for content in results
)
defer.returnValue((edus, stream_id, now_stream_id))
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d)",
destination, txn_id,
len(pdus),
len(edus),
)
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
raise e
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
yield self.transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
)
success = False
defer.returnValue(success)

View File

@@ -51,9 +51,10 @@ class TransportLayerClient(object):
logger.debug("get_room_state dest=%s, room=%s",
destination, room_id)
path = _create_v1_path("/state/%s/", room_id)
path = _create_v1_path("/state/%s", room_id)
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
try_trailing_slash_on_400=True,
)
@log_function
@@ -73,9 +74,10 @@ class TransportLayerClient(object):
logger.debug("get_room_state_ids dest=%s, room=%s",
destination, room_id)
path = _create_v1_path("/state_ids/%s/", room_id)
path = _create_v1_path("/state_ids/%s", room_id)
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
try_trailing_slash_on_400=True,
)
@log_function
@@ -95,8 +97,11 @@ class TransportLayerClient(object):
logger.debug("get_pdu dest=%s, event_id=%s",
destination, event_id)
path = _create_v1_path("/event/%s/", event_id)
return self.client.get_json(destination, path=path, timeout=timeout)
path = _create_v1_path("/event/%s", event_id)
return self.client.get_json(
destination, path=path, timeout=timeout,
try_trailing_slash_on_400=True,
)
@log_function
def backfill(self, destination, room_id, event_tuples, limit):
@@ -121,7 +126,7 @@ class TransportLayerClient(object):
# TODO: raise?
return
path = _create_v1_path("/backfill/%s/", room_id)
path = _create_v1_path("/backfill/%s", room_id)
args = {
"v": event_tuples,
@@ -132,6 +137,7 @@ class TransportLayerClient(object):
destination,
path=path,
args=args,
try_trailing_slash_on_400=True,
)
@defer.inlineCallbacks
@@ -167,7 +173,7 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
path = _create_v1_path("/send/%s/", transaction.transaction_id)
path = _create_v1_path("/send/%s", transaction.transaction_id)
response = yield self.client.put_json(
transaction.destination,
@@ -176,6 +182,7 @@ class TransportLayerClient(object):
json_data_callback=json_data_callback,
long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
try_trailing_slash_on_400=True,
)
defer.returnValue(response)
@@ -959,7 +966,7 @@ def _create_v1_path(path, *args):
Example:
_create_v1_path("/event/%s/", event_id)
_create_v1_path("/event/%s", event_id)
Args:
path (str): String template for the path
@@ -980,7 +987,7 @@ def _create_v2_path(path, *args):
Example:
_create_v2_path("/event/%s/", event_id)
_create_v2_path("/event/%s", event_id)
Args:
path (str): String template for the path

View File

@@ -312,7 +312,7 @@ class BaseFederationServlet(object):
class FederationSendServlet(BaseFederationServlet):
PATH = "/send/(?P<transaction_id>[^/]*)/"
PATH = "/send/(?P<transaction_id>[^/]*)/?"
def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__(
@@ -378,7 +378,7 @@ class FederationSendServlet(BaseFederationServlet):
class FederationEventServlet(BaseFederationServlet):
PATH = "/event/(?P<event_id>[^/]*)/"
PATH = "/event/(?P<event_id>[^/]*)/?"
# This is when someone asks for a data item for a given server data_id pair.
def on_GET(self, origin, content, query, event_id):
@@ -386,7 +386,7 @@ class FederationEventServlet(BaseFederationServlet):
class FederationStateServlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/"
PATH = "/state/(?P<context>[^/]*)/?"
# This is when someone asks for all data for a given context.
def on_GET(self, origin, content, query, context):
@@ -398,7 +398,7 @@ class FederationStateServlet(BaseFederationServlet):
class FederationStateIdsServlet(BaseFederationServlet):
PATH = "/state_ids/(?P<room_id>[^/]*)/"
PATH = "/state_ids/(?P<room_id>[^/]*)/?"
def on_GET(self, origin, content, query, room_id):
return self.handler.on_state_ids_request(
@@ -409,7 +409,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/(?P<context>[^/]*)/"
PATH = "/backfill/(?P<context>[^/]*)/?"
def on_GET(self, origin, content, query, context):
versions = [x.decode('ascii') for x in query[b"v"]]
@@ -759,7 +759,7 @@ class FederationVersionServlet(BaseFederationServlet):
class FederationGroupsProfileServlet(BaseFederationServlet):
"""Get/set the basic profile of a group on behalf of a user
"""
PATH = "/groups/(?P<group_id>[^/]*)/profile$"
PATH = "/groups/(?P<group_id>[^/]*)/profile"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, group_id):
@@ -787,7 +787,7 @@ class FederationGroupsProfileServlet(BaseFederationServlet):
class FederationGroupsSummaryServlet(BaseFederationServlet):
PATH = "/groups/(?P<group_id>[^/]*)/summary$"
PATH = "/groups/(?P<group_id>[^/]*)/summary"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, group_id):
@@ -805,7 +805,7 @@ class FederationGroupsSummaryServlet(BaseFederationServlet):
class FederationGroupsRoomsServlet(BaseFederationServlet):
"""Get the rooms in a group on behalf of a user
"""
PATH = "/groups/(?P<group_id>[^/]*)/rooms$"
PATH = "/groups/(?P<group_id>[^/]*)/rooms"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, group_id):
@@ -823,7 +823,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet):
class FederationGroupsAddRoomsServlet(BaseFederationServlet):
"""Add/remove room from group
"""
PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)$"
PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, room_id):
@@ -855,7 +855,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
"""
PATH = (
"/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)"
"/config/(?P<config_key>[^/]*)$"
"/config/(?P<config_key>[^/]*)"
)
@defer.inlineCallbacks
@@ -874,7 +874,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
class FederationGroupsUsersServlet(BaseFederationServlet):
"""Get the users in a group on behalf of a user
"""
PATH = "/groups/(?P<group_id>[^/]*)/users$"
PATH = "/groups/(?P<group_id>[^/]*)/users"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, group_id):
@@ -892,7 +892,7 @@ class FederationGroupsUsersServlet(BaseFederationServlet):
class FederationGroupsInvitedUsersServlet(BaseFederationServlet):
"""Get the users that have been invited to a group
"""
PATH = "/groups/(?P<group_id>[^/]*)/invited_users$"
PATH = "/groups/(?P<group_id>[^/]*)/invited_users"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, group_id):
@@ -910,7 +910,7 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet):
class FederationGroupsInviteServlet(BaseFederationServlet):
"""Ask a group server to invite someone to the group
"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$"
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -928,7 +928,7 @@ class FederationGroupsInviteServlet(BaseFederationServlet):
class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
"""Accept an invitation from the group server
"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite$"
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -945,7 +945,7 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
class FederationGroupsJoinServlet(BaseFederationServlet):
"""Attempt to join a group
"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -962,7 +962,7 @@ class FederationGroupsJoinServlet(BaseFederationServlet):
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
"""Leave or kick a user from the group
"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$"
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -980,7 +980,7 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet):
class FederationGroupsLocalInviteServlet(BaseFederationServlet):
"""A group server has invited a local user
"""
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$"
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -997,7 +997,7 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet):
class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
"""A group server has removed a local user
"""
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$"
PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -1014,7 +1014,7 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
"""A group or user's server renews their attestation
"""
PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)$"
PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
@@ -1037,7 +1037,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
PATH = (
"/groups/(?P<group_id>[^/]*)/summary"
"(/categories/(?P<category_id>[^/]+))?"
"/rooms/(?P<room_id>[^/]*)$"
"/rooms/(?P<room_id>[^/]*)"
)
@defer.inlineCallbacks
@@ -1080,7 +1080,7 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet):
"""Get all categories for a group
"""
PATH = (
"/groups/(?P<group_id>[^/]*)/categories/$"
"/groups/(?P<group_id>[^/]*)/categories/?"
)
@defer.inlineCallbacks
@@ -1100,7 +1100,7 @@ class FederationGroupsCategoryServlet(BaseFederationServlet):
"""Add/remove/get a category in a group
"""
PATH = (
"/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$"
"/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)"
)
@defer.inlineCallbacks
@@ -1150,7 +1150,7 @@ class FederationGroupsRolesServlet(BaseFederationServlet):
"""Get roles in a group
"""
PATH = (
"/groups/(?P<group_id>[^/]*)/roles/$"
"/groups/(?P<group_id>[^/]*)/roles/?"
)
@defer.inlineCallbacks
@@ -1170,7 +1170,7 @@ class FederationGroupsRoleServlet(BaseFederationServlet):
"""Add/remove/get a role in a group
"""
PATH = (
"/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$"
"/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)"
)
@defer.inlineCallbacks
@@ -1226,7 +1226,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
PATH = (
"/groups/(?P<group_id>[^/]*)/summary"
"(/roles/(?P<role_id>[^/]+))?"
"/users/(?P<user_id>[^/]*)$"
"/users/(?P<user_id>[^/]*)"
)
@defer.inlineCallbacks
@@ -1269,7 +1269,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
"""Get roles in a group
"""
PATH = (
"/get_groups_publicised$"
"/get_groups_publicised"
)
@defer.inlineCallbacks
@@ -1284,7 +1284,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
"""Sets whether a group is joinable without an invite or knock
"""
PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, group_id):

View File

@@ -93,9 +93,9 @@ class BaseHandler(object):
messages_per_second = self.hs.config.rc_messages_per_second
burst_count = self.hs.config.rc_message_burst_count
allowed, time_allowed = self.ratelimiter.send_message(
allowed, time_allowed = self.ratelimiter.can_do_action(
user_id, time_now,
msg_rate_hz=messages_per_second,
rate_hz=messages_per_second,
burst_count=burst_count,
update=update,
)
@@ -165,6 +165,7 @@ class BaseHandler(object):
member_event.room_id,
"leave",
ratelimit=False,
require_consent=False,
)
except Exception as e:
logger.exception("Error kicking guest user: %s" % (e,))

View File

@@ -35,6 +35,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
login_types.append(t)
self._supported_login_types = login_types
self._account_ratelimiter = Ratelimiter()
self._failed_attempts_ratelimiter = Ratelimiter()
self._clock = self.hs.get_clock()
@defer.inlineCallbacks
def validate_user_via_ui_auth(self, requester, request_body, clientip):
"""
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
Returns:
defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
Raises:
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
"""
self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None:
defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request
LoginError if there was an authentication problem.
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
"""
if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
username, self.hs.hostname
).to_string()
self.ratelimit_login_per_account(qualified_user_id)
login_type = login_submission.get("type")
known_login_type = False
@@ -715,14 +730,57 @@ class AuthHandler(BaseHandler):
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
# unknown username or invalid password. We raise a 403 here, but note
# that if we're doing user-interactive login, it turns all LoginErrors
# into a 401 anyway.
# unknown username or invalid password.
self._failed_attempts_ratelimiter.ratelimit(
qualified_user_id.lower(), time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=True,
)
# We raise a 403 here, but note that if we're doing user-interactive
# login, it turns all LoginErrors into a 401 anyway.
raise LoginError(
403, "Invalid password",
errcode=Codes.FORBIDDEN
)
@defer.inlineCallbacks
def check_password_provider_3pid(self, medium, address, password):
"""Check if a password provider is able to validate a thirdparty login
Args:
medium (str): The medium of the 3pid (ex. email).
address (str): The address of the 3pid (ex. jdoe@example.com).
password (str): The password of the user.
Returns:
Deferred[(str|None, func|None)]: A tuple of `(user_id,
callback)`. If authentication is successful, `user_id` is a `str`
containing the authenticated, canonical user ID. `callback` is
then either a function to be later run after the server has
completed login/registration, or `None`. If authentication was
unsuccessful, `user_id` and `callback` are both `None`.
"""
for provider in self.password_providers:
if hasattr(provider, "check_3pid_auth"):
# This function is able to return a deferred that either
# resolves None, meaning authentication failure, or upon
# success, to a str (which is the user_id) or a tuple of
# (user_id, callback_func), where callback_func should be run
# after we've finished everything else
result = yield provider.check_3pid_auth(
medium, address, password,
)
if result:
# Check if the return value is a str or a tuple
if isinstance(result, str):
# If it's a str, set callback function to None
result = (result, None)
defer.returnValue(result)
defer.returnValue((None, None))
@defer.inlineCallbacks
def _check_local_password(self, user_id, password):
"""Authenticate a user against the local password database.
@@ -734,7 +792,12 @@ class AuthHandler(BaseHandler):
user_id (unicode): complete @user:id
password (unicode): the provided password
Returns:
(unicode) the canonical_user_id, or None if unknown user / bad password
Deferred[unicode] the canonical_user_id, or Deferred[None] if
unknown user/bad password
Raises:
LimitExceededError if the ratelimiter's login requests count for this
user is too high too proceed.
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -763,6 +826,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
self.ratelimit_login_per_account(user_id)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@@ -934,6 +998,33 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
def ratelimit_login_per_account(self, user_id):
"""Checks whether the process must be stopped because of ratelimiting.
Checks against two ratelimiters: the generic one for login attempts per
account and the one specific to failed attempts.
Args:
user_id (unicode): complete @user:id
Raises:
LimitExceededError if one of the ratelimiters' login requests count
for this user is too high too proceed.
"""
self._failed_attempts_ratelimiter.ratelimit(
user_id.lower(), time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
update=False,
)
self._account_ratelimiter.ratelimit(
user_id.lower(), time_now_s=self._clock.time(),
rate_hz=self.hs.config.rc_login_account.per_second,
burst_count=self.hs.config.rc_login_account.burst_count,
update=True,
)
@attr.s
class MacaroonGenerator(object):

View File

@@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler):
room_id,
"leave",
ratelimit=False,
require_consent=False,
)
except Exception:
logger.exception(

View File

@@ -37,13 +37,185 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
class DeviceHandler(BaseHandler):
class DeviceWorkerHandler(BaseHandler):
def __init__(self, hs):
super(DeviceHandler, self).__init__(hs)
super(DeviceWorkerHandler, self).__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
Retrieve the given user's devices
Args:
user_id (str):
Returns:
defer.Deferred: list[dict[str, X]]: info on each device
"""
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(
user_id, device_id=None
)
devices = list(device_map.values())
for device in devices:
_update_device_from_client_ips(device, ips)
defer.returnValue(devices)
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
Args:
user_id (str):
device_id (str):
Returns:
defer.Deferred: dict[str, X]: info on the device
Raises:
errors.NotFoundError: if the device was not found
"""
try:
device = yield self.store.get_device(user_id, device_id)
except errors.StoreError:
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(
user_id, device_id,
)
_update_device_from_client_ips(device, ips)
defer.returnValue(device)
@measure_func("device.get_user_ids_changed")
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
Args:
user_id (str)
from_token (StreamToken)
"""
now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id)
# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
from_token.device_list_key
)
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
member_events = yield self.store.get_membership_changes_for_user(
user_id, from_token.room_key, now_room_key,
)
rooms_changed.update(event.room_id for event in member_events)
stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key
).stream
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
current_state_ids = yield self.store.get_current_state_ids(room_id)
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
continue
# Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering
)
except errors.StoreError:
# we have purged the stream_ordering index since the stream
# ordering: treat it the same as a new room
event_ids = []
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
if not current_member_id:
continue
# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
for state_dict in itervalues(prev_state_ids):
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
break
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in itervalues(prev_state_ids):
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
if state_key != user_id:
possibly_changed.add(state_key)
break
if possibly_changed or possibly_left:
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
possibly_joined = possibly_changed & users_who_share_room
possibly_left = (possibly_changed | possibly_left) - users_who_share_room
else:
possibly_joined = []
possibly_left = []
defer.returnValue({
"changed": list(possibly_joined),
"left": list(possibly_left),
})
class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs):
super(DeviceHandler, self).__init__(hs)
self.federation_sender = hs.get_federation_sender()
self._edu_updater = DeviceListEduUpdater(hs, self)
@@ -103,52 +275,6 @@ class DeviceHandler(BaseHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
Retrieve the given user's devices
Args:
user_id (str):
Returns:
defer.Deferred: list[dict[str, X]]: info on each device
"""
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(
user_id, device_id=None
)
devices = list(device_map.values())
for device in devices:
_update_device_from_client_ips(device, ips)
defer.returnValue(devices)
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
Args:
user_id (str):
device_id (str):
Returns:
defer.Deferred: dict[str, X]: info on the device
Raises:
errors.NotFoundError: if the device was not found
"""
try:
device = yield self.store.get_device(user_id, device_id)
except errors.StoreError:
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(
user_id, device_id,
)
_update_device_from_client_ips(device, ips)
defer.returnValue(device)
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -276,6 +402,12 @@ class DeviceHandler(BaseHandler):
user_id, device_ids, list(hosts)
)
for device_id in device_ids:
logger.debug(
"Notifying about update %r/%r, ID: %r", user_id, device_id,
position,
)
room_ids = yield self.store.get_rooms_for_user(user_id)
yield self.notifier.on_new_event(
@@ -283,130 +415,10 @@ class DeviceHandler(BaseHandler):
)
if hosts:
logger.info("Sending device list update notif to: %r", hosts)
logger.info("Sending device list update notif for %r to: %r", user_id, hosts)
for host in hosts:
self.federation_sender.send_device_messages(host)
@measure_func("device.get_user_ids_changed")
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
Args:
user_id (str)
from_token (StreamToken)
"""
now_token = yield self.hs.get_event_sources().get_current_token()
room_ids = yield self.store.get_rooms_for_user(user_id)
# First we check if any devices have changed
changed = yield self.store.get_user_whose_devices_changed(
from_token.device_list_key
)
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
member_events = yield self.store.get_membership_changes_for_user(
user_id, from_token.room_key, now_token.room_key
)
rooms_changed.update(event.room_id for event in member_events)
stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key
).stream
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
current_state_ids = yield self.store.get_current_state_ids(room_id)
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
continue
# Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering
)
except errors.StoreError:
# we have purged the stream_ordering index since the stream
# ordering: treat it the same as a new room
event_ids = []
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
if not current_member_id:
continue
# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
for state_dict in itervalues(prev_state_ids):
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
break
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in itervalues(prev_state_ids):
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
if state_key != user_id:
possibly_changed.add(state_key)
break
if possibly_changed or possibly_left:
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
possibly_joined = possibly_changed & users_who_share_room
possibly_left = (possibly_changed | possibly_left) - users_who_share_room
else:
possibly_joined = []
possibly_left = []
defer.returnValue({
"changed": list(possibly_joined),
"left": list(possibly_left),
})
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
@@ -473,15 +485,26 @@ class DeviceListEduUpdater(object):
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
logger.warning("Got device list update edu for %r from %r", user_id, origin)
logger.warning(
"Got device list update edu for %r/%r from %r",
user_id, device_id, origin,
)
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id, device_id,
)
return
logger.debug(
"Received device list update for %r/%r", user_id, device_id,
)
self._pending_updates.setdefault(user_id, []).append(
(device_id, stream_id, prev_ids, edu_content)
)
@@ -499,10 +522,18 @@ class DeviceListEduUpdater(object):
# This can happen since we batch updates
return
for device_id, stream_id, prev_ids, content in pending_updates:
logger.debug(
"Handling update %r/%r, ID: %r, prev: %r ",
user_id, device_id, stream_id, prev_ids,
)
# Given a list of updates we check if we need to resync. This
# happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates)
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
@@ -555,11 +586,21 @@ class DeviceListEduUpdater(object):
)
devices = []
for device in devices:
logger.debug(
"Handling resync update %r/%r, ID: %r",
user_id, device["device_id"], stream_id,
)
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
# We clobber the seen updates since we've re-synced from a given
# point.
self._seen_updates[user_id] = set([stream_id])
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
@@ -572,9 +613,9 @@ class DeviceListEduUpdater(object):
user_id, [device_id for device_id, _, _, _ in pending_updates]
)
self._seen_updates.setdefault(user_id, set()).update(
stream_id for _, stream_id, _, _ in pending_updates
)
self._seen_updates.setdefault(user_id, set()).update(
stream_id for _, stream_id, _, _ in pending_updates
)
@defer.inlineCallbacks
def _need_to_do_resync(self, user_id, updates):
@@ -587,6 +628,11 @@ class DeviceListEduUpdater(object):
user_id
)
logger.debug(
"Current extremity for %r: %r",
user_id, extremity,
)
stream_id_in_updates = set() # stream_ids in updates list
for _, stream_id, prev_ids, _ in updates:
if not prev_ids:

View File

@@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.config = hs.config
self.enable_room_list_search = hs.config.enable_room_list_search
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
if visibility == "public" and not self.enable_room_list_search:
# The room list has been disabled.
raise AuthError(
403,
"This user is not permitted to publish rooms to the room list"
)
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")

View File

@@ -19,7 +19,7 @@ import random
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID
@@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
if room_id:
blocked = yield self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(auth_user_id)

View File

@@ -45,6 +45,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -858,6 +859,52 @@ class FederationHandler(BaseHandler):
logger.debug("Not backfilling as no extremeties found.")
return
# We only want to paginate if we can actually see the events we'll get,
# as otherwise we'll just spend a lot of resources to get redacted
# events.
#
# We do this by filtering all the backwards extremities and seeing if
# any remain. Given we don't have the extremity events themselves, we
# need to actually check the events that reference them.
#
# *Note*: the spec wants us to keep backfilling until we reach the start
# of the room in case we are allowed to see some of the history. However
# in practice that causes more issues than its worth, as a) its
# relatively rare for there to be any visible history and b) even when
# there is its often sufficiently long ago that clients would stop
# attempting to paginate before backfill reached the visible history.
#
# TODO: If we do do a backfill then we should filter the backwards
# extremities to only include those that point to visible portions of
# history.
#
# TODO: Correctly handle the case where we are allowed to see the
# forward event but not the backward extremity, e.g. in the case of
# initial join of the server where we are allowed to see the join
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.
forward_events = yield self.store.get_successor_events(
list(extremities),
)
extremities_events = yield self.store.get_events(
forward_events,
check_redacted=False,
get_prev_content=False,
)
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = yield filter_events_for_server(
self.store, self.server_name, list(extremities_events.values()),
redact=False, check_history_visibility_only=True,
)
if not filtered_extremities:
defer.returnValue(False)
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(
extremities.items(),
@@ -1582,6 +1629,7 @@ class FederationHandler(BaseHandler):
origin, event,
state=state,
auth_events=auth_events,
backfilled=backfilled,
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
@@ -1626,6 +1674,7 @@ class FederationHandler(BaseHandler):
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
backfilled=backfilled,
)
defer.returnValue(res)
@@ -1748,7 +1797,7 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
def _prep_event(self, origin, event, state, auth_events, backfilled):
"""
Args:
@@ -1756,6 +1805,7 @@ class FederationHandler(BaseHandler):
event:
state:
auth_events:
backfilled (bool)
Returns:
Deferred, which resolves to synapse.events.snapshot.EventContext
@@ -1797,11 +1847,99 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
if not context.rejected:
yield self._check_for_soft_fail(event, state, backfilled)
if event.type == EventTypes.GuestAccess and not context.rejected:
yield self.maybe_kick_guest_users(event)
defer.returnValue(context)
@defer.inlineCallbacks
def _check_for_soft_fail(self, event, state, backfilled):
"""Checks if we should soft fail the event, if so marks the event as
such.
Args:
event (FrozenEvent)
state (dict|None): The state at the event if we don't have all the
event's prev events
backfilled (bool): Whether the event is from backfill
Returns:
Deferred
"""
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
if do_soft_fail_check:
extrem_ids = yield self.store.get_latest_event_ids_in_room(
event.room_id,
)
extrem_ids = set(extrem_ids)
prev_event_ids = set(event.prev_event_ids())
if extrem_ids == prev_event_ids:
# If they're the same then the current state is the same as the
# state at the event, so no point rechecking auth for soft fail.
do_soft_fail_check = False
if do_soft_fail_check:
room_version = yield self.store.get_room_version(event.room_id)
# Calculate the "current state".
if state is not None:
# If we're explicitly given the state then we won't have all the
# prev events, and so we have a gap in the graph. In this case
# we want to be a little careful as we might have been down for
# a while and have an incorrect view of the current state,
# however we still want to do checks as gaps are easy to
# maliciously manufacture.
#
# So we use a "current state" that is actually a state
# resolution across the current forward extremities and the
# given state at the event. This should correctly handle cases
# like bans, especially with state res v2.
state_sets = yield self.store.get_state_groups(
event.room_id, extrem_ids,
)
state_sets = list(state_sets.values())
state_sets.append(state)
current_state_ids = yield self.state_handler.resolve_events(
room_version, state_sets, event,
)
current_state_ids = {
k: e.event_id for k, e in iteritems(current_state_ids)
}
else:
current_state_ids = yield self.state_handler.get_current_state_ids(
event.room_id, latest_event_ids=extrem_ids,
)
# Now check if event pass auth against said current state
auth_types = auth_types_for_event(event)
current_state_ids = [
e for k, e in iteritems(current_state_ids)
if k in auth_types
]
current_auth_events = yield self.store.get_events(current_state_ids)
current_auth_events = {
(e.type, e.state_key): e for e in current_auth_events.values()
}
try:
self.auth.check(room_version, event, auth_events=current_auth_events)
except AuthError as e:
logger.warn(
"Failed current state auth resolution for %r because %s",
event, e,
)
event.internal_metadata.soft_failed = True
@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
missing):

View File

@@ -18,7 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
@@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler):
A JSON serialisable dict with the snapshot of the room.
"""
blocked = yield self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
user_id = requester.user.to_string()
membership, member_event_id = yield self._check_in_room_or_world_readable(

View File

@@ -243,12 +243,19 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
if self.config.block_events_without_consent_error is not None:
self._block_events_without_consent_error = (
self.config.block_events_without_consent_error
)
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None):
prev_events_and_hashes=None, require_consent=True):
"""
Given a dict from a client, create a new event.
@@ -269,6 +276,9 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
require_consent (bool): Whether to check if the requester has
consented to privacy policy.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
@@ -310,7 +320,7 @@ class EventCreationHandler(object):
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
if require_consent and not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
if token_id is not None:
@@ -378,7 +388,7 @@ class EventCreationHandler(object):
Raises:
ConsentNotGivenError: if the user has not given consent yet
"""
if self.config.block_events_without_consent_error is None:
if self._block_events_without_consent_error is None:
return
# exempt AS users from needing consent
@@ -405,7 +415,7 @@ class EventCreationHandler(object):
consent_uri = self._consent_uri_builder.build_user_consent_uri(
requester.user.localpart,
)
msg = self.config.block_events_without_consent_error % {
msg = self._block_events_without_consent_error % {
'consent_uri': consent_uri,
}
raise ConsentNotGivenError(

View File

@@ -147,8 +147,14 @@ class BaseProfileHandler(BaseHandler):
@defer.inlineCallbacks
def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
"""target_user is the user whose displayname is to be changed;
auth_user is the user attempting to make this change."""
"""Set the displayname of a user
Args:
target_user (UserID): the user whose displayname is to be changed.
requester (Requester): The user attempting to make this change.
new_displayname (str): The displayname to give this user.
by_admin (bool): Whether this change was made by an administrator.
"""
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this Home Server")

View File

@@ -16,9 +16,8 @@ import logging
from twisted.internet import defer
from synapse.types import get_domain_from_id
from ._base import BaseHandler
from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
@@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = [
{
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": user_values["event_ids"],
"data": user_values.get("data", {}),
}
ReadReceipt(
room_id=room_id,
receipt_type=receipt_type,
user_id=user_id,
event_ids=user_values["event_ids"],
data=user_values.get("data", {}),
)
for room_id, room_values in content.items()
for receipt_type, users in room_values.items()
for user_id, user_values in users.items()
@@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
max_batch_id = None
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
res = yield self.store.insert_receipt(
room_id, receipt_type, user_id, event_ids, data
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.data,
)
if not res:
@@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
affected_room_ids = list(set([r["room_id"] for r in receipts]))
affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
@@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
receipt = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": [event_id],
"data": {
receipt = ReadReceipt(
room_id=room_id,
receipt_type=receipt_type,
user_id=user_id,
event_ids=[event_id],
data={
"ts": int(self.clock.time_msec()),
}
}
},
)
is_new = yield self._handle_new_receipts([receipt])
if not is_new:
return
# Work out which remote servers should be poked and poke them.
# TODO: optimise this to move some of the work to the workers.
data = receipt["data"]
# XXX why does this not use state.get_current_hosts_in_room() ?
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": [event_id],
"data": data,
}
}
},
},
key=(room_id, receipt_type, user_id),
)
yield self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):

View File

@@ -23,7 +23,9 @@ from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
InvalidCaptchaError,
LimitExceededError,
RegistrationError,
SynapseError,
)
@@ -60,6 +62,7 @@ class RegistrationHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self.identity_handler = self.hs.get_handlers().identity_handler
self.ratelimiter = hs.get_registration_ratelimiter()
self._next_generated_user_id = None
@@ -149,6 +152,7 @@ class RegistrationHandler(BaseHandler):
threepid=None,
user_type=None,
default_display_name=None,
address=None,
):
"""Registers a new client on the server.
@@ -167,6 +171,7 @@ class RegistrationHandler(BaseHandler):
api.constants.UserTypes, or None for a normal user.
default_display_name (unicode|None): if set, the new user's displayname
will be set to this. Defaults to 'localpart'.
address (str|None): the IP address used to perform the registration.
Returns:
A tuple of (user_id, access_token).
Raises:
@@ -206,7 +211,7 @@ class RegistrationHandler(BaseHandler):
token = None
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
yield self._register_with_store(
yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
@@ -215,6 +220,7 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=default_display_name,
admin=admin,
user_type=user_type,
address=address,
)
if self.hs.config.user_directory_search_all_users:
@@ -238,12 +244,13 @@ class RegistrationHandler(BaseHandler):
if default_display_name is None:
default_display_name = localpart
try:
yield self._register_with_store(
yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
make_guest=make_guest,
create_profile_with_displayname=default_display_name,
address=address,
)
except SynapseError:
# if user id is taken, just generate another
@@ -305,6 +312,10 @@ class RegistrationHandler(BaseHandler):
)
else:
yield self._join_user_to_room(fake_requester, r)
except ConsentNotGivenError as e:
# Technically not necessary to pull out this error though
# moving away from bare excepts is a good thing to do.
logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
@@ -337,7 +348,7 @@ class RegistrationHandler(BaseHandler):
user_id, allowed_appservice=service
)
yield self._register_with_store(
yield self.register_with_store(
user_id=user_id,
password_hash="",
appservice_id=service_id,
@@ -513,7 +524,7 @@ class RegistrationHandler(BaseHandler):
token = self.macaroon_gen.generate_access_token(user_id)
if need_register:
yield self._register_with_store(
yield self.register_with_store(
user_id=user_id,
token=token,
password_hash=password_hash,
@@ -590,10 +601,10 @@ class RegistrationHandler(BaseHandler):
ratelimit=False,
)
def _register_with_store(self, user_id, token=None, password_hash=None,
was_guest=False, make_guest=False, appservice_id=None,
create_profile_with_displayname=None, admin=False,
user_type=None):
def register_with_store(self, user_id, token=None, password_hash=None,
was_guest=False, make_guest=False, appservice_id=None,
create_profile_with_displayname=None, admin=False,
user_type=None, address=None):
"""Register user in the datastore.
Args:
@@ -612,10 +623,26 @@ class RegistrationHandler(BaseHandler):
admin (boolean): is an admin user?
user_type (str|None): type of user. One of the values from
api.constants.UserTypes, or None for a normal user.
address (str|None): the IP address used to perform the registration.
Returns:
Deferred
"""
# Don't rate limit for app services
if appservice_id is None and address is not None:
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.can_do_action(
address, time_now_s=time_now,
rate_hz=self.hs.config.rc_registration.per_second,
burst_count=self.hs.config.rc_registration.burst_count,
)
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now)),
)
if self.hs.config.worker_app:
return self._register_client(
user_id=user_id,
@@ -627,6 +654,7 @@ class RegistrationHandler(BaseHandler):
create_profile_with_displayname=create_profile_with_displayname,
admin=admin,
user_type=user_type,
address=address,
)
else:
return self.store.register(
@@ -693,9 +721,9 @@ class RegistrationHandler(BaseHandler):
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
bind_email (bool): Whether to bind the email with the identity
server
server.
bind_msisdn (bool): Whether to bind the msisdn with the identity
server
server.
"""
if self.hs.config.worker_app:
yield self._post_registration_client(
@@ -737,7 +765,7 @@ class RegistrationHandler(BaseHandler):
"""A user consented to the terms on registration
Args:
user_id (str): The user ID that consented
user_id (str): The user ID that consented.
consent_version (str): version of the policy the user has
consented to.
"""

View File

@@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
self.response_cache = ResponseCache(hs, "room_list")
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
@@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
if not self.enable_room_list_search:
return defer.succeed({
"chunk": [],
"total_room_count_estimate": 0,
})
logger.info(
"Getting public room list: limit=%r, since=%r, search=%r, network=%r",
limit, since_token, bool(search_filter), network_tuple,
)
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler):
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
if not self.enable_room_list_search:
defer.returnValue({
"chunk": [],
"total_room_count_estimate": 0,
})
if search_filter:
# We currently don't support searching across federation, so we have
# to do it manually without pagination

View File

@@ -160,6 +160,7 @@ class RoomMemberHandler(object):
txn_id=None,
ratelimit=True,
content=None,
require_consent=True,
):
user_id = target.to_string()
@@ -185,6 +186,7 @@ class RoomMemberHandler(object):
token_id=requester.access_token_id,
txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes,
require_consent=require_consent,
)
# Check if this event matches the previous membership event for the user.
@@ -232,6 +234,10 @@ class RoomMemberHandler(object):
self.copy_room_tags_and_direct_to_room(
predecessor["room_id"], room_id, user_id,
)
# Move over old push rules
self.store.move_push_rules_from_room_to_room_for_user(
predecessor["room_id"], room_id, user_id,
)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -301,6 +307,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
require_consent=True,
):
key = (room_id,)
@@ -315,6 +322,7 @@ class RoomMemberHandler(object):
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
require_consent=require_consent,
)
defer.returnValue(result)
@@ -331,6 +339,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
require_consent=True,
):
content_specified = bool(content)
if content is None:
@@ -512,6 +521,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
require_consent=require_consent,
)
defer.returnValue(res)

View File

@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
logger = logging.getLogger(__name__)
class StateDeltasHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
"""Given two events check if the `key_name` field in content changed
from not matching `public_value` to doing so.
For example, check if `history_visibility` (`key_name`) changed from
`shared` to `world_readable` (`public_value`).
Returns:
None if the field in the events either both match `public_value`
or if neither do, i.e. there has been no change.
True if it didnt match `public_value` but now does
False if it did match `public_value` but now doesn't
"""
prev_event = None
event = None
if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if event_id:
event = yield self.store.get_event(event_id, allow_none=True)
if not event and not prev_event:
logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
defer.returnValue(None)
prev_value = None
value = None
if prev_event:
prev_value = prev_event.content.get(key_name)
if event:
value = event.content.get(key_name)
logger.debug("prev_value: %r -> value: %r", prev_value, value)
if value == public_value and prev_value != public_value:
defer.returnValue(True)
elif value != public_value and prev_value == public_value:
defer.returnValue(False)
else:
defer.returnValue(None)

View File

@@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -962,6 +965,15 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_groups(sync_result_builder)
# debug for https://github.com/matrix-org/synapse/issues/4422
for joined_room in sync_result_builder.joined:
room_id = joined_room.room_id
if room_id in newly_joined_rooms:
issue4422_logger.debug(
"Sync result for newly joined room %s: %r",
room_id, joined_room,
)
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@@ -1425,6 +1437,17 @@ class SyncHandler(object):
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
# debug for #4422
if has_join:
prev_membership = None
if old_mem_ev:
prev_membership = old_mem_ev.membership
issue4422_logger.debug(
"Previous membership for room %s with join: %s (event %s)",
room_id, prev_membership, old_mem_ev_id,
)
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
@@ -1519,30 +1542,39 @@ class SyncHandler(object):
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
room_entries.append(RoomSyncResultBuilder(
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
newly_joined=room_id in newly_joined_rooms,
newly_joined=newly_joined,
full_state=False,
since_token=None if room_id in newly_joined_rooms else since_token,
since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
))
)
else:
room_entries.append(RoomSyncResultBuilder(
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
newly_joined=room_id in newly_joined_rooms,
newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
))
)
if newly_joined:
# debugging for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"RoomSyncResultBuilder events for newly joined room %s: %r",
room_id, entry.events,
)
room_entries.append(entry)
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
@@ -1663,6 +1695,13 @@ class SyncHandler(object):
newly_joined_room=newly_joined,
)
if newly_joined:
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
room_id, batch,
)
# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
@@ -1894,15 +1933,34 @@ def _calculate_state(
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
"""Used to help build up a new SyncResult for a user
Attributes:
sync_config (SyncConfig)
full_state (bool)
since_token (StreamToken)
now_token (StreamToken)
joined_room_ids (list[str])
# The following mirror the fields in a sync response
presence (list)
account_data (list)
joined (list[JoinedSyncResult])
invited (list[InvitedSyncResult])
archived (list[ArchivedSyncResult])
device (list)
groups (GroupsSyncResult|None)
to_device (list)
"""
def __init__(self, sync_config, full_state, since_token, now_token,
joined_room_ids):
"""
Args:
sync_config(SyncConfig)
full_state(bool): The full_state flag as specified by user
since_token(StreamToken): The token supplied by user, or None.
now_token(StreamToken): The token to sync up to.
sync_config (SyncConfig)
full_state (bool): The full_state flag as specified by user
since_token (StreamToken): The token supplied by user, or None.
now_token (StreamToken): The token to sync up to.
joined_room_ids (list[str]): List of rooms the user is joined to
"""
self.sync_config = sync_config
self.full_state = full_state
@@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object):
Args:
room_id(str)
rtype(str): One of `"joined"` or `"archived"`
events(list): List of events to include in the room, (more events
may be added when generating result).
events(list[FrozenEvent]): List of events to include in the room
(more events may be added when generating result).
newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None

View File

@@ -15,12 +15,13 @@
import logging
from six import iteritems
from six import iteritems, iterkeys
from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
@@ -29,7 +30,7 @@ from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
class UserDirectoryHandler(object):
class UserDirectoryHandler(StateDeltasHandler):
"""Handles querying of and keeping updated the user_directory.
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
@@ -38,19 +39,11 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
For each user in the directory we also store a room_id which is public and that the
user is joined to. This allows us to ignore history_visibility and join_rules changes
for that user in all other public rooms, as we know they'll still be in at least
one public room.
"""
INITIAL_ROOM_SLEEP_MS = 50
INITIAL_ROOM_SLEEP_COUNT = 100
INITIAL_ROOM_BATCH_SIZE = 100
INITIAL_USER_SLEEP_MS = 10
def __init__(self, hs):
super(UserDirectoryHandler, self).__init__(hs)
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
@@ -59,15 +52,6 @@ class UserDirectoryHandler(object):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()
self.initially_handled_users_in_public = set()
self.initially_handled_users_share = set()
self.initially_handled_users_share_private_room = set()
# The current position in the current_state_delta stream
self.pos = None
@@ -130,7 +114,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None
user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
@@ -140,7 +124,6 @@ class UserDirectoryHandler(object):
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id)
@defer.inlineCallbacks
def _unsafe_process(self):
@@ -148,10 +131,9 @@ class UserDirectoryHandler(object):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
# If still None then we need to do the initial fill of directory
# If still None then the initial background update hasn't happened yet
if self.pos is None:
yield self._do_initial_spam()
self.pos = yield self.store.get_user_directory_stream_pos()
defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
@@ -172,149 +154,6 @@ class UserDirectoryHandler(object):
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
def _do_initial_spam(self):
"""Populates the user_directory from the current state of the DB, used
when synapse first starts with user_directory support
"""
new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
# Delete any existing entries just in case there are any
yield self.store.delete_all_from_user_dir()
# We process by going through each existing room at a time.
room_ids = yield self.store.get_all_rooms()
logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
num_processed_rooms = 0
for room_id in room_ids:
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
logger.info("Processed all rooms.")
if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
logger.info(
"Doing initial update of user directory. %d users", len(user_ids)
)
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
logger.info(
"Handling user %d/%d", num_processed_users + 1, len(user_ids)
)
yield self._handle_local_user(user_id)
num_processed_users += 1
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
logger.info("Processed all users")
self.initially_handled_users = None
self.initially_handled_users_in_public = None
self.initially_handled_users_share = None
self.initially_handled_users_share_private_room = None
yield self.store.update_user_directory_stream_pos(new_pos)
@defer.inlineCallbacks
def _handle_initial_room(self, room_id):
"""Called when we initially fill out user_directory one room at a time
"""
is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
if not is_in_room:
return
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
room_id,
{user_id: users_with_profile[user_id] for user_id in unhandled_users},
)
self.initially_handled_users |= unhandled_users
if is_public:
yield self.store.add_users_to_public_room(
room_id, user_ids=user_ids - self.initially_handled_users_in_public
)
self.initially_handled_users_in_public |= user_ids
# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
# We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set()
to_update = set()
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
if not self.is_mine_id(user_id):
count += 1
continue
if self.store.get_if_app_services_interested_in_user(user_id):
count += 1
continue
for other_user_id in user_ids:
if user_id == other_user_id:
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
count += 1
user_set = (user_id, other_user_id)
if user_set in self.initially_handled_users_share_private_room:
continue
if user_set in self.initially_handled_users_share:
if is_public:
continue
to_update.add(user_set)
else:
to_insert.add(user_set)
if is_public:
self.initially_handled_users_share.add(user_set)
else:
self.initially_handled_users_share_private_room.add(user_set)
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert
)
to_insert.clear()
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update
)
to_update.clear()
if to_insert:
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
to_insert.clear()
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update
)
to_update.clear()
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
@@ -356,6 +195,7 @@ class UserDirectoryHandler(object):
user_ids = yield self.store.get_users_in_dir_due_to_room(
room_id
)
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
return
@@ -436,14 +276,20 @@ class UserDirectoryHandler(object):
# ignore the change
return
if change:
users_with_profile = yield self.state.get_current_user_in_room(room_id)
for user_id, profile in iteritems(users_with_profile):
yield self._handle_new_user(room_id, user_id, profile)
else:
users = yield self.store.get_users_in_public_due_to_room(room_id)
for user_id in users:
yield self._handle_remove_user(room_id, user_id)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in iterkeys(users_with_profile):
yield self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
# NOTE: this is not the most efficient method, as handle_new_user sets
# up local_user -> other_user and other_user_whos_local -> local_user,
# which when ran over an entire room, will result in the same values
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
for user_id, profile in iteritems(users_with_profile):
yield self._handle_new_user(room_id, user_id, profile)
@defer.inlineCallbacks
def _handle_local_user(self, user_id):
@@ -457,7 +303,9 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@@ -469,177 +317,68 @@ class UserDirectoryHandler(object):
"""
logger.debug("Adding new user to dir, %r", user_id)
row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
if is_public:
row = yield self.store.get_user_in_public_room(user_id)
if not row:
yield self.store.add_users_to_public_room(room_id, [user_id])
else:
logger.debug("Not adding new user to public dir, %r", user_id)
# Now we update users who share rooms with users. We do this by getting
# all the current users in the room and seeing which aren't already
# marked in the database as sharing with `user_id`
# Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_user_in_room(room_id)
to_insert = set()
to_update = set()
if is_public:
yield self.store.add_users_in_public_rooms(room_id, (user_id,))
else:
to_insert = set()
is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id):
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id) and not is_appservice:
# Returns a map of other_user_id -> shared_private. We only need
# to update mappings if for users that either don't share a room
# already (aren't in the map) or, if the room is private, those that
# only share a public room.
user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
user_id
)
is_appservice = self.store.get_if_app_services_interested_in_user(
user_id
)
# We don't care about appservice users.
if not is_appservice:
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue
to_insert.add((user_id, other_user_id))
# Next we need to update for every local user in the room
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue
shared_is_private = user_ids_shared.get(other_user_id)
if shared_is_private is True:
# We've already marked in the database they share a private room
continue
elif shared_is_private is False:
# They already share a public room, so only update if this is
# a private room
if not is_public:
to_update.add((user_id, other_user_id))
elif shared_is_private is None:
# This is the first time they both share a room
to_insert.add((user_id, other_user_id))
# Next we need to update for every local user in the room
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue
is_appservice = self.store.get_if_app_services_interested_in_user(
other_user_id
)
if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
other_user_id, user_id
is_appservice = self.store.get_if_app_services_interested_in_user(
other_user_id
)
if shared_is_private is True:
# We've already marked in the database they share a private room
continue
elif shared_is_private is False:
# They already share a public room, so only update if this is
# a private room
if not is_public:
to_update.add((other_user_id, user_id))
elif shared_is_private is None:
# This is the first time they both share a room
if self.is_mine_id(other_user_id) and not is_appservice:
to_insert.add((other_user_id, user_id))
if to_insert:
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update
)
if to_insert:
yield self.store.add_users_who_share_private_room(room_id, to_insert)
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
"""Called when we might need to remove user to directory
"""Called when we might need to remove user from directory
Args:
room_id (str): room_id that user left or stopped being public that
user_id (str)
"""
logger.debug("Maybe removing user %r", user_id)
logger.debug("Removing user %r", user_id)
row = yield self.store.get_user_in_directory(user_id)
update_user_dir = row and row["room_id"] == room_id
# Remove user from sharing tables
yield self.store.remove_user_who_share_room(user_id, room_id)
row = yield self.store.get_user_in_public_room(user_id)
update_user_in_public = row and row["room_id"] == room_id
# Are they still in any rooms? If not, remove them entirely.
rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
if update_user_in_public or update_user_dir:
# XXX: Make this faster?
rooms = yield self.store.get_rooms_for_user(user_id)
for j_room_id in rooms:
if not update_user_in_public and not update_user_dir:
break
is_in_room = yield self.store.is_host_joined(
j_room_id, self.server_name
)
if not is_in_room:
continue
if update_user_dir:
update_user_dir = False
yield self.store.update_user_in_user_dir(user_id, j_room_id)
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
j_room_id
)
if update_user_in_public and is_public:
yield self.store.update_user_in_public_user_list(user_id, j_room_id)
update_user_in_public = False
if update_user_dir:
if len(rooms_user_is_in) == 0:
yield self.store.remove_from_user_dir(user_id)
elif update_user_in_public:
yield self.store.remove_from_user_in_public_room(user_id)
# Now handle users_who_share_rooms.
# Get a list of user tuples that were in the DB due to this room and
# users (this includes tuples where the other user matches `user_id`)
user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
user_id, room_id
)
for user_id, other_user_id in user_tuples:
# For each user tuple get a list of rooms that they still share,
# trying to find a private room, and update the entry in the DB
rooms = yield self.store.get_rooms_in_common_for_users(
user_id, other_user_id
)
# If they dont share a room anymore, remove the mapping
if not rooms:
yield self.store.remove_user_who_share_room(user_id, other_user_id)
continue
found_public_share = None
for j_room_id in rooms:
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
j_room_id
)
if is_public:
found_public_share = j_room_id
else:
found_public_share = None
yield self.store.update_users_who_share_room(
room_id, not is_public, [(user_id, other_user_id)]
)
break
if found_public_share:
yield self.store.update_users_who_share_room(
room_id, not is_public, [(user_id, other_user_id)]
)
@defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
@@ -665,50 +404,4 @@ class UserDirectoryHandler(object):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(
user_id, new_name, new_avatar, room_id
)
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
"""Given two events check if the `key_name` field in content changed
from not matching `public_value` to doing so.
For example, check if `history_visibility` (`key_name`) changed from
`shared` to `world_readable` (`public_value`).
Returns:
None if the field in the events either both match `public_value`
or if neither do, i.e. there has been no change.
True if it didnt match `public_value` but now does
False if it did match `public_value` but now doesn't
"""
prev_event = None
event = None
if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if event_id:
event = yield self.store.get_event(event_id, allow_none=True)
if not event and not prev_event:
logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
defer.returnValue(None)
prev_value = None
value = None
if prev_event:
prev_value = prev_event.content.get(key_name)
if event:
value = event.content.get(key_name)
logger.debug("prev_value: %r -> value: %r", prev_value, value)
if value == public_value and prev_value != public_value:
defer.returnValue(True)
elif value != public_value and prev_value == public_value:
defer.returnValue(False)
else:
defer.returnValue(None)
yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)

View File

@@ -188,6 +188,58 @@ class MatrixFederationHttpClient(object):
self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks
def _send_request_with_optional_trailing_slash(
self,
request,
try_trailing_slash_on_400=False,
**send_request_args
):
"""Wrapper for _send_request which can optionally retry the request
upon receiving a combination of a 400 HTTP response code and a
'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
due to #3622.
Args:
request (MatrixFederationRequest): details of request to be sent
try_trailing_slash_on_400 (bool): Whether on receiving a 400
'M_UNRECOGNIZED' from the server to retry the request with a
trailing slash appended to the request path.
send_request_args (Dict): A dictionary of arguments to pass to
`_send_request()`.
Raises:
HttpResponseException: If we get an HTTP response code >= 300
(except 429).
Returns:
Deferred[Dict]: Parsed JSON response body.
"""
try:
response = yield self._send_request(
request, **send_request_args
)
except HttpResponseException as e:
# Received an HTTP error > 300. Check if it meets the requirements
# to retry with a trailing slash
if not try_trailing_slash_on_400:
raise
if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
raise
# Retry with a trailing slash if we received a 400 with
# 'M_UNRECOGNIZED' which some endpoints can return when omitting a
# trailing slash on Synapse <= v0.99.3.
logger.info("Retrying request with trailing slash")
request.path += "/"
response = yield self._send_request(
request, **send_request_args
)
defer.returnValue(response)
@defer.inlineCallbacks
def _send_request(
self,
@@ -196,7 +248,7 @@ class MatrixFederationHttpClient(object):
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False
backoff_on_404=False,
):
"""
Sends a request to the given server.
@@ -473,7 +525,8 @@ class MatrixFederationHttpClient(object):
json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
backoff_on_404=False,
try_trailing_slash_on_400=False):
""" Sends the specifed json data using PUT
Args:
@@ -493,7 +546,12 @@ class MatrixFederationHttpClient(object):
and try the request anyway.
backoff_on_404 (bool): True if we should count a 404 response as
a failure of the server (and should therefore back off future
requests)
requests).
try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end
of the request. Workaround for #3622 in Synapse <= v0.99.3. This
will be attempted before backing off if backing off has been
enabled.
Returns:
Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
@@ -509,7 +567,6 @@ class MatrixFederationHttpClient(object):
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
method="PUT",
destination=destination,
@@ -519,17 +576,19 @@ class MatrixFederationHttpClient(object):
json=data,
)
response = yield self._send_request(
response = yield self._send_request_with_optional_trailing_slash(
request,
try_trailing_slash_on_400,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@@ -592,7 +651,8 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
timeout=None, ignore_backoff=False):
timeout=None, ignore_backoff=False,
try_trailing_slash_on_400=False):
""" GETs some json from the given host homeserver and path
Args:
@@ -606,6 +666,9 @@ class MatrixFederationHttpClient(object):
be retried.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
response we should try appending a trailing slash to the end of
the request. Workaround for #3622 in Synapse <= v0.99.3.
Returns:
Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
@@ -631,16 +694,19 @@ class MatrixFederationHttpClient(object):
query=args,
)
response = yield self._send_request(
response = yield self._send_request_with_optional_trailing_slash(
request,
try_trailing_slash_on_400,
backoff_on_404=False,
ignore_backoff=ignore_backoff,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks

View File

@@ -73,14 +73,26 @@ class ModuleApi(object):
"""
return self._auth_handler.check_user_exists(user_id)
def register(self, localpart):
"""Registers a new user with given localpart
@defer.inlineCallbacks
def register(self, localpart, displayname=None):
"""Registers a new user with given localpart and optional
displayname.
Args:
localpart (str): The localpart of the new user.
displayname (str|None): The displayname of the new user. If None,
the user's displayname will default to `localpart`.
Returns:
Deferred: a 2-tuple of (user_id, access_token)
"""
# Register the user
reg = self.hs.get_registration_handler()
return reg.register(localpart=localpart)
user_id, access_token = yield reg.register(
localpart=localpart, default_display_name=displayname,
)
defer.returnValue((user_id, access_token))
@defer.inlineCallbacks
def invalidate_access_token(self, access_token):

View File

@@ -178,8 +178,6 @@ class Notifier(object):
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
)
self.replication_deferred = ObservableDeferred(defer.Deferred())
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
@@ -205,7 +203,9 @@ class Notifier(object):
def add_replication_callback(self, cb):
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
wrapped with run_as_background_process.
"""
self.replication_callbacks.append(cb)
@@ -517,60 +517,5 @@ class Notifier(object):
def notify_replication(self):
"""Notify the any replication listeners that there's a new event"""
with PreserveLoggingContext():
deferred = self.replication_deferred
self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None)
# the callbacks may well outlast the current request, so we run
# them in the sentinel logcontext.
#
# (ideally it would be up to the callbacks to know if they were
# starting off background processes and drop the logcontext
# accordingly, but that requires more changes)
for cb in self.replication_callbacks:
cb()
@defer.inlineCallbacks
def wait_for_replication(self, callback, timeout):
"""Wait for an event to happen.
Args:
callback: Gets called whenever an event happens. If this returns a
truthy value then ``wait_for_replication`` returns, otherwise
it waits for another event.
timeout: How many milliseconds to wait for callback return a truthy
value.
Returns:
A deferred that resolves with the value returned by the callback.
"""
listener = _NotificationListener(None)
end_time = self.clock.time_msec() + timeout
while True:
listener.deferred = self.replication_deferred.observe()
result = yield callback()
if result:
break
now = self.clock.time_msec()
if end_time <= now:
break
listener.deferred = timeout_deferred(
listener.deferred,
timeout=(end_time - now) / 1000.,
reactor=self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
yield listener.deferred
except defer.TimeoutError:
break
except defer.CancelledError:
break
defer.returnValue(result)
for cb in self.replication_callbacks:
cb()

View File

@@ -33,11 +33,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
def __init__(self, hs):
super(ReplicationRegisterServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.registration_handler = hs.get_registration_handler()
@staticmethod
def _serialize_payload(
user_id, token, password_hash, was_guest, make_guest, appservice_id,
create_profile_with_displayname, admin, user_type,
create_profile_with_displayname, admin, user_type, address,
):
"""
Args:
@@ -56,6 +57,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
admin (boolean): is an admin user?
user_type (str|None): type of user. One of the values from
api.constants.UserTypes, or None for a normal user.
address (str|None): the IP address used to perform the regitration.
"""
return {
"token": token,
@@ -66,13 +68,14 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
"create_profile_with_displayname": create_profile_with_displayname,
"admin": admin,
"user_type": user_type,
"address": address,
}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
yield self.store.register(
yield self.registration_handler.register_with_store(
user_id=user_id,
token=content["token"],
password_hash=content["password_hash"],
@@ -82,6 +85,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
create_profile_with_displayname=content["create_profile_with_displayname"],
admin=content["admin"],
user_type=content["user_type"],
address=content["address"]
)
defer.returnValue((200, {}))

View File

@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

Some files were not shown because too many files have changed in this diff Show More