1
0

Compare commits

...

152 Commits

Author SHA1 Message Date
Erik Johnston
b044860e56 Merge branch 'erikj/smaller_events' into erikj/test_send 2021-05-05 16:46:41 +01:00
Erik Johnston
2e1a8878d5 Fix default 2021-05-05 16:46:33 +01:00
Erik Johnston
a7b7770bef Merge branch 'erikj/smaller_events' into erikj/test_send 2021-05-05 16:41:14 +01:00
Erik Johnston
5d9bbca631 Make origin optional 2021-05-05 16:41:10 +01:00
Erik Johnston
015fdfe5bb Merge branch 'erikj/smaller_events' into erikj/test_send 2021-05-05 16:37:37 +01:00
Erik Johnston
faa7d48930 More ensmalling 2021-05-05 16:35:16 +01:00
Erik Johnston
f4bb01d41a Compress 2021-05-05 15:06:47 +01:00
Erik Johnston
ec1c2c69a2 Encode json dict 2021-05-05 15:06:46 +01:00
Erik Johnston
7e7b99bca9 Slots 2021-05-05 15:06:28 +01:00
Erik Johnston
3d937d23fd Don't use DictProperty 2021-05-05 15:06:27 +01:00
Erik Johnston
c856e29ccd Remvoe dictionary based access form events 2021-05-05 13:52:29 +01:00
Erik Johnston
941a0a76d3 Fix log contexts 2021-05-05 11:48:09 +01:00
Erik Johnston
b0d014819f Fix log contexts 2021-05-05 11:36:24 +01:00
Erik Johnston
eeafa29399 Merge branch 'erikj/better_backfill' into erikj/test_send 2021-05-05 11:29:29 +01:00
Erik Johnston
016d55b94b Use less memory when backfilling 2021-05-05 11:25:27 +01:00
Erik Johnston
adef51ab98 Fix cache metrics 2021-05-05 10:33:23 +01:00
Erik Johnston
cdeb6050ea Log contexts 2021-05-05 10:25:54 +01:00
Erik Johnston
88bd909a4a Merge branch 'erikj/jemalloc_stats' into erikj/test_send 2021-05-05 10:20:36 +01:00
Erik Johnston
a94edad23b Merge branch 'erikj/cache_mem_size' into erikj/test_send 2021-05-05 10:14:07 +01:00
Erik Johnston
fc17e4e62e fix logging contexts 2021-05-04 18:09:03 +01:00
Erik Johnston
0d9d84dac0 fix logging contexts 2021-05-04 18:06:23 +01:00
Erik Johnston
04db7b9581 Merge branch 'erikj/limit_how_often_gc' into erikj/test_send 2021-05-04 17:59:35 +01:00
Erik Johnston
24965fc073 Merge branch 'erikj/efficient_presence_join' into erikj/test_send 2021-05-04 17:59:31 +01:00
Erik Johnston
5b031e2da3 Merge branch 'erikj/fix_presence_joined' into erikj/test_send 2021-05-04 17:59:16 +01:00
Erik Johnston
14b70bbd9f Merge branch 'erikj/refactor_keyring' into erikj/test_send 2021-05-04 17:57:57 +01:00
Erik Johnston
d4175abe52 Allow fetching events 2021-05-04 17:57:46 +01:00
Erik Johnston
b76fe71627 Fix remote resource 2021-05-04 17:57:46 +01:00
Erik Johnston
7f237d5639 Remove key_ready 2021-05-04 17:57:46 +01:00
Erik Johnston
f37c5843d3 Merge branch 'erikj/refactor_keyring' into erikj/test_send 2021-05-04 17:37:22 +01:00
Erik Johnston
d6ae1aef46 Merge remote-tracking branch 'origin/develop' into erikj/test_send 2021-05-04 17:37:15 +01:00
Erik Johnston
3bfd3c55f9 Refactor keyring 2021-05-04 17:36:02 +01:00
Erik Johnston
cad5a47621 Bugfix newsfile 2021-05-04 15:54:39 +01:00
Erik Johnston
e3bc4617fc Time external cache response time (#9904) 2021-05-04 15:14:22 +01:00
Erik Johnston
7e3d333b28 Move newsfile 2021-05-04 14:51:28 +01:00
Erik Johnston
aabc46f0f6 Merge remote-tracking branch 'origin/develop' into erikj/cache_mem_size 2021-05-04 14:49:16 +01:00
Erik Johnston
78e3502ada Always report memory usage metrics when TRACK_MEMORY_USAGE is True 2021-05-04 14:32:42 +01:00
Erik Johnston
8206069c63 Comment 2021-05-04 14:29:30 +01:00
Erik Johnston
a99524f383 Apply suggestions from code review
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2021-05-04 14:29:26 +01:00
Erik Johnston
b5169b68e9 Document default. Add type annotations. Correctly convert to seconds 2021-05-04 14:23:02 +01:00
Erik Johnston
4c9446c4cb isort 2021-05-04 14:14:08 +01:00
Erik Johnston
4a8a483060 Fix store.get_users_in_room_with_profiles 2021-05-04 14:09:44 +01:00
Erik Johnston
d145ba6ccc Move jemalloc to metrics to a sepearte file, and load from app to get proper logs 2021-05-04 14:00:12 +01:00
Erik Johnston
dcb79da38a More decriptive log when failing to set up jemalloc collector 2021-05-04 13:40:38 +01:00
Erik Johnston
35c13c730c Apply suggestions from code review
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2021-05-04 13:39:44 +01:00
Andrew Morgan
b85821aca2 Add port parameter to the sample config for psycopg2 args (#9911)
Adds the `port` option with the default value to the sample config file.
2021-05-04 13:28:59 +01:00
Erik Johnston
8624333cd9 Correctly invalidate get_users_in_room_with_profiles cache 2021-05-04 13:27:28 +01:00
Erik Johnston
4caa84b279 Use lists instead of sets where appropriate 2021-05-04 13:16:15 +01:00
Erik Johnston
48cf260c7a Process state deltas in presence by room 2021-05-04 13:16:02 +01:00
Erik Johnston
7e5f78a698 Convert other uses of get_current_users_in_room and add warning 2021-05-04 13:02:18 +01:00
Erik Johnston
43c9acda4c Config 2021-05-04 11:49:13 +01:00
Erik Johnston
bd04fb6308 Code review 2021-05-04 10:47:32 +01:00
Erik Johnston
d3a6e38c96 Apply suggestions from code review
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2021-05-04 10:40:18 +01:00
Erik Johnston
aa1a026509 Stuff 2021-04-30 15:19:31 +01:00
Erik Johnston
260c760d69 Don't log response 2021-04-30 15:18:17 +01:00
Erik Johnston
49da5e9ec4 Chunk _check_sigs_and_hash_and_fetch 2021-04-30 15:17:50 +01:00
Erik Johnston
3b2991e3fb Log memory usage 2021-04-30 15:00:22 +01:00
Erik Johnston
aec80899ab Merge branch 'erikj/stream_deserealize' into erikj/test_send 2021-04-30 14:21:58 +01:00
Erik Johnston
68f1d258d9 Use ijson 2021-04-30 14:19:27 +01:00
Erik Johnston
8481bacc93 Merge branch 'erikj/efficient_presence_join' into erikj/test_send 2021-04-30 13:37:08 +01:00
Erik Johnston
68b6106ce5 Newsfile 2021-04-30 13:36:50 +01:00
Erik Johnston
0ed608cf56 Increase perf of handling presence when joining large rooms.
We ended up doing a *lot* of duplicate work, and e.g. ended up doing n^2
worth of `is_mine_id(..)` checks across all joined users.
2021-04-30 13:34:15 +01:00
Erik Johnston
ac0143c4ac Record size of incoming bytes 2021-04-30 10:24:19 +01:00
Erik Johnston
f5a25c7b53 Merge branch 'erikj/limit_how_often_gc' into erikj/test_send 2021-04-30 10:14:23 +01:00
Erik Johnston
5813719696 Merge branch 'erikj/fix_presence_joined' into erikj/test_send 2021-04-30 10:14:18 +01:00
Erik Johnston
6640fb467f Use correct name 2021-04-29 17:44:54 +01:00
Erik Johnston
0c8cd62149 Newsfile 2021-04-29 17:36:34 +01:00
Erik Johnston
996c0ce3d5 Use get_current_users_in_room from store and not StateHandler 2021-04-29 17:35:47 +01:00
Erik Johnston
938efeb595 Add some logging 2021-04-29 16:41:16 +01:00
Erik Johnston
4a3a9597f5 Merge remote-tracking branch 'origin/develop' into erikj/test_send 2021-04-29 16:41:04 +01:00
Andrew Morgan
4d624f467a Merge tag 'v1.33.0rc2' into develop
Synapse 1.33.0rc2 (2021-04-29)
==============================

Bugfixes
--------

- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))
2021-04-29 14:35:14 +01:00
Andrew Morgan
d11f2dfee5 typo in changelog 2021-04-29 14:31:14 +01:00
Patrick Cloke
bb4b11846f Add missing type hints to handlers and fix a Spam Checker type hint. (#9896)
The user_may_create_room_alias method on spam checkers
declared the room_alias parameter as a str when in reality it is
passed a RoomAlias object.
2021-04-29 07:17:28 -04:00
Andrew Morgan
e9444cc74d 1.33.0rc2 2021-04-29 11:45:37 +01:00
ThibF
0085dc5abc Delete room endpoint (#9889)
Support the delete of a room through DELETE request and mark
previous request as deprecated through documentation.

Signed-off-by: Thibault Ferrante <thibault.ferrante@pm.me>
2021-04-29 10:31:45 +01:00
Erik Johnston
351f886bc8 Newsfile 2021-04-28 14:55:46 +01:00
Erik Johnston
79627b3a3c Limit how often GC happens by time.
Synapse can be quite memory intensive, and unless care is taken to tune
the GC thresholds it can end up thrashing, causing noticable performance
problems for large servers. We fix this by limiting how often we GC a
given generation, regardless of current counts/thresholds.

This does not help with the reverse problem where the thresholds are set
too high, but that should only happen in situations where they've been
manually configured.

Adds a `gc_min_seconds_between` config option to override the defaults.

Fixes #9890.
2021-04-28 14:51:31 +01:00
Erik Johnston
802560211a Merge remote-tracking branch 'origin/release-v1.33.0' into develop 2021-04-28 14:43:10 +01:00
Erik Johnston
e4ab8676b4 Fix tight loop handling presence replication. (#9900)
Only affects workers. Introduced in #9819.

Fixes #9899.
2021-04-28 14:42:50 +01:00
Patrick Cloke
10a08ab88a Use the parent's logging context name for runWithConnection. (#9895)
This fixes a regression where the logging context for runWithConnection
was reported as runWithConnection instead of the connection name,
e.g. "POST-XYZ".
2021-04-28 07:44:52 -04:00
Andrew Morgan
fa6679e794 Merge tag 'v1.33.0rc1' into develop
Synapse 1.33.0rc1 (2021-04-28)
==============================

Features
--------

- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))

Bugfixes
--------

- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))

Improved Documentation
----------------------

- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))

Internal Changes
----------------

- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
2021-04-28 12:12:29 +01:00
Andrew Morgan
8ba086980d Reword account validity template change to sound less like a bugfix 2021-04-28 12:07:49 +01:00
Erik Johnston
391bfe9a7b Reduce memory footprint of caches (#9886) 2021-04-28 11:59:28 +01:00
Andrew Morgan
787de3190f 1.33.0rc1 2021-04-28 11:43:33 +01:00
Andrew Morgan
4e0fd35bc9 Revert "Experimental Federation Speedup (#9702)"
This reverts commit 05e8c70c05.
2021-04-28 11:38:33 +01:00
Erik Johnston
dd2d32dcdb Add type hints to presence handler (#9885) 2021-04-28 11:07:47 +01:00
Andrew Morgan
fe604a022a Remove various bits of compatibility code for Python <3.6 (#9879)
I went through and removed a bunch of cruft that was lying around for compatibility with old Python versions. This PR also will now prevent Synapse from starting unless you're running Python 3.6+.
2021-04-27 13:13:07 +01:00
Patrick Cloke
1350b053da Pass errors back to the client when trying multiple federation destinations. (#9868)
This ensures that something like an auth error (403) will be
returned to the requester instead of attempting to try more
servers, which will likely result in the same error, and then
passing back a generic 400 error.
2021-04-27 07:30:34 -04:00
Erik Johnston
0ffa5fb935 Use current state table for presence.get_interested_remotes (#9887)
This should be a lot quicker than asking the state handler.
2021-04-27 10:09:41 +01:00
Erik Johnston
6237096e80 Newsfile 2021-04-26 14:23:15 +01:00
Erik Johnston
1b4ec8ef0e Export jemalloc stats to prometheus when used 2021-04-26 14:18:06 +01:00
Erik Johnston
5add13e05d Newsfile 2021-04-26 11:13:08 +01:00
Erik Johnston
2bf93f9b34 Fix 2021-04-26 10:58:04 +01:00
Erik Johnston
bcf8858b67 Don't explode if memory has been twiddled 2021-04-26 10:56:42 +01:00
Erik Johnston
99fb72e63e Move TRACK_MEMORY_USAGE to root 2021-04-26 10:50:15 +01:00
Erik Johnston
567fe5e387 Make TRACK_MEMORY_USAGE configurable 2021-04-26 10:39:54 +01:00
Erik Johnston
0c9bab290f Ignore singletons 2021-04-26 10:29:26 +01:00
Richard van der Hoff
3ff2251754 Improved validation for received requests (#9817)
* Simplify `start_listening` callpath

* Correctly check the size of uploaded files
2021-04-23 19:20:44 +01:00
Richard van der Hoff
84936e2264 Kill off _PushHTTPChannel. (#9878)
First of all, a fixup to `FakeChannel` which is needed to make it work with the default HTTP channel implementation.

Secondly, it looks like we no longer need `_PushHTTPChannel`, because as of #8013, the producer that gets attached to the `HTTPChannel` is now an `IPushProducer`. This is good, because it means we can remove a whole load of test-specific boilerplate which causes variation between tests and production.
2021-04-23 18:40:57 +01:00
Andrew Morgan
695b73c861 Allow OIDC cookies to work on non-root public baseurls (#9726)
Applied a (slightly modified) patch from https://github.com/matrix-org/synapse/issues/9574.

As far as I understand this would allow the cookie set during the OIDC flow to work on deployments using public baseurls that do not sit at the URL path root.
2021-04-23 18:22:47 +01:00
Erik Johnston
5003bd29d2 Don't have a global Asizer 2021-04-23 17:16:49 +01:00
Richard van der Hoff
59d24c5bef pass a reactor into SynapseSite (#9874) 2021-04-23 17:06:47 +01:00
Patrick Cloke
e83627926f Add type hints to auth and auth_blocking. (#9876) 2021-04-23 12:02:16 -04:00
Erik Johnston
e9f5812eff Track memory usage of caches 2021-04-23 16:26:10 +01:00
Erik Johnston
a15c003e5b Make DomainSpecificString an attrs class (#9875) 2021-04-23 15:46:29 +01:00
Andrew Morgan
ceaa76970f Remove room and user invite ratelimits in default unit test config (#9871) 2021-04-23 13:37:48 +01:00
Erik Johnston
9d25a0ae65 Split presence out of master (#9820) 2021-04-23 12:21:55 +01:00
Patrick Cloke
d924827da1 Check for space membership during a remote join of a restricted room (#9814)
When receiving a /send_join request for a room with join rules set to 'restricted',
check if the user is a member of the spaces defined in the 'allow' key of the join rules.

This only applies to an experimental room version, as defined in MSC3083.
2021-04-23 07:05:51 -04:00
Erik Johnston
3853a7edfc Only store data in caches, not "smart" objects (#9845) 2021-04-23 11:47:07 +01:00
Richard van der Hoff
51a20914a8 Limit the size of HTTP responses read over federation. (#9833) 2021-04-23 11:08:41 +01:00
manuroe
c1ddbbde4f Handle all new rate limits in demo scripts (#9858) 2021-04-22 17:49:42 +01:00
Erik Johnston
177dae2704 Limit length of accepted email addresses (#9855) 2021-04-22 17:49:11 +01:00
Richard van der Hoff
69018acbd2 Clear the resync bit after resyncing device lists (#9867)
Fixes #9866.
2021-04-22 16:53:24 +01:00
Richard van der Hoff
294c675033 Remove synapse.types.Collection (#9856)
This is no longer required, since we have dropped support for Python 3.5.
2021-04-22 16:43:50 +01:00
Andrew Morgan
3186324260 Merge branch 'master' into develop 2021-04-22 11:23:56 +01:00
Andrew Morgan
0f2629ebc6 Merge tag 'v1.32.2'
Synapse 1.32.2 (2021-04-22)
===========================

This release includes a fix for a regression introduced in 1.32.0.

Bugfixes
--------

- Fix a regression in Synapse 1.32.0 and 1.32.1 which caused `LoggingContext` errors in plugins. ([\#9857](https://github.com/matrix-org/synapse/issues/9857))
2021-04-22 11:23:34 +01:00
Andrew Morgan
dac4445934 A regression can't be introduced twice 2021-04-22 11:09:31 +01:00
Andrew Morgan
79e6d9e4b1 Note regression was in 1.32.0 and 1.32.1 2021-04-22 11:04:51 +01:00
Andrew Morgan
ca380881b1 Update dates in changelogs 2021-04-21 18:47:31 +01:00
Andrew Morgan
55159c48e3 1.32.2 2021-04-21 18:45:39 +01:00
Andrew Morgan
ca6ecb8d67 Merge branch 'release-v1.32.1' of github.com:matrix-org/synapse into release-v1.32.2 2021-04-21 18:39:45 +01:00
Andrew Morgan
8798f2291c Merge branch 'master' of github.com:matrix-org/synapse into develop 2021-04-21 18:21:56 +01:00
Andrew Morgan
046175daba Merge branch 'release-v1.32.1' of github.com:matrix-org/synapse 2021-04-21 18:21:14 +01:00
Andrew Morgan
0c23aa393c Note LoggingContext signature change incompatibility in 1.32.0 (#9859)
1.32.0 also introduced an incompatibility with Synapse modules that make use of `synapse.logging.context.LoggingContext`, such as [synapse-s3-storage-provider](https://github.com/matrix-org/synapse-s3-storage-provider).

This PR adds a note to the 1.32.0 changelog and upgrade notes about it.
2021-04-21 18:16:58 +01:00
Richard van der Hoff
d9bd62f9d1 Make LoggingContext's name optional (#9857)
Fixes https://github.com/matrix-org/synapse-s3-storage-provider/issues/55
2021-04-21 16:39:34 +01:00
Andrew Morgan
4b2217ace2 Merge branch 'master' into develop 2021-04-21 14:55:06 +01:00
Andrew Morgan
a0972085ed Merge tag 'v1.32.1'
Synapse 1.32.1 (2021-04-21)
===========================

This release fixes [a regression](https://github.com/matrix-org/synapse/issues/9853) in Synapse 1.32.0 that caused connected Prometheus instances to become unstable. If you ran Synapse 1.32.0 with Prometheus metrics, first upgrade to Synapse 1.32.1 and follow [these instructions](https://github.com/matrix-org/synapse/pull/9854#issuecomment-823472183) to clean up any excess writeahead logs.

Bugfixes
--------

- Fix a regression in Synapse 1.32.0 which caused Synapse to report large numbers of Prometheus time series, potentially overwhelming Prometheus instances. ([\#9854](https://github.com/matrix-org/synapse/issues/9854))
2021-04-21 14:54:03 +01:00
Andrew Morgan
bdb4c20dc1 Clarify 1.32.0/1 changelog and upgrade notes 2021-04-21 14:44:04 +01:00
Andrew Morgan
acb8c81041 Add regression notes to CHANGES.md; fix link in 1.32.0 changelog 2021-04-21 14:24:16 +01:00
Andrew Morgan
98a1b84631 Add link to fixing prometheus to 1.32.0 upgrade notes; 1.32.1 has a fix 2021-04-21 14:19:11 +01:00
Andrew Morgan
026a66f2b3 Fix typo in link to regression in 1.32.0 upgrade notes 2021-04-21 14:04:44 +01:00
Andrew Morgan
a745531c10 1.32.1 2021-04-21 14:01:12 +01:00
Andrew Morgan
30c94862b4 Mention Prometheus metrics regression in v1.32.0 2021-04-21 14:00:31 +01:00
Richard van der Hoff
5d281c10dd Stop BackgroundProcessLoggingContext making new prometheus timeseries (#9854)
This undoes part of b076bc276e.
2021-04-21 10:03:31 +01:00
Patrick Cloke
683d6f75af Rename handler and config modules which end in handler/config. (#9816) 2021-04-20 14:55:20 -04:00
Andrew Morgan
eccacd72cb Merge branch 'master' into develop 2021-04-20 17:14:15 +01:00
Andrew Morgan
b8c5f6fddb Mention Prometheus metrics regression in v1.32.0 2021-04-20 17:11:36 +01:00
Andrew Morgan
272402c4d7 Merge branch 'master' into develop 2021-04-20 16:07:53 +01:00
Andrew Morgan
05fa06834d Further tweaking on gpg signing key notice 2021-04-20 15:52:06 +01:00
Andrew Morgan
913f790bb2 Add note about expired Debian gpg signing keys to CHANGES.md 2021-04-20 15:33:56 +01:00
Andrew Morgan
6982db9651 Merge branch 'master' into develop 2021-04-20 14:55:16 +01:00
Andrew Morgan
438a8594cb Update v1.32.0 changelog. It's m.login.application_service, not plural 2021-04-20 14:47:17 +01:00
Andrew Morgan
e031c7e0cc 1.32.0 2021-04-20 14:31:27 +01:00
Andrew Morgan
0a88ec0a87 Add Application Service registration type requirement + py35, pg95 deprecation notices to v1.32.0 upgrade notes (#9849)
Fixes https://github.com/matrix-org/synapse/issues/9846.

Adds important removal information from the top of https://github.com/matrix-org/synapse/releases/tag/v1.32.0rc1 into UPGRADE.rst.
2021-04-20 14:19:35 +01:00
Patrick Cloke
b076bc276e Always use the name as the log ID. (#9829)
As far as I can tell our logging contexts are meant to log the request ID, or sometimes the request ID followed by a suffix (this is generally stored in the name field of LoggingContext). There's also code to log the name@memory location, but I'm not sure this is ever used.

This simplifies the code paths to require every logging context to have a name and use that in logging. For sub-contexts (created via nested_logging_contexts, defer_to_threadpool, Measure) we use the current context's str (which becomes their name or the string "sentinel") and then potentially modify that (e.g. add a suffix).
2021-04-20 14:19:00 +01:00
Erik Johnston
de0d088adc Add presence federation stream (#9819) 2021-04-20 14:11:24 +01:00
Erik Johnston
db70435de7 Fix bug where we sent remote presence states to remote servers (#9850) 2021-04-20 13:37:54 +01:00
Jonathan de Jong
495b214f4f Fix (final) Bugbear violations (#9838) 2021-04-20 11:50:49 +01:00
Andrew Morgan
71f0623de9 Port "Allow users to click account renewal links multiple times without hitting an 'Invalid Token' page #74" from synapse-dinsic (#9832)
This attempts to be a direct port of https://github.com/matrix-org/synapse-dinsic/pull/74 to mainline. There was some fiddling required to deal with the changes that have been made to mainline since (mainly dealing with the split of `RegistrationWorkerStore` from `RegistrationStore`, and the changes made to `self.make_request` in test code).
2021-04-19 19:16:34 +01:00
Denis Kasak
e694a598f8 Sanity check identity server passed to bind/unbind. (#9802)
Signed-off-by: Denis Kasak <dkasak@termina.org.uk>
2021-04-19 17:21:46 +01:00
Erik Johnston
2b7dd21655 Don't send normal presence updates over federation replication stream (#9828) 2021-04-19 10:50:49 +01:00
Andrew Morgan
c571736c6c User directory: use calculated room membership state instead (#9821)
Fixes: #9797.

Should help reduce CPU usage on the user directory, especially when memberships change in rooms with lots of state history.
2021-04-16 18:17:18 +01:00
Erik Johnston
601b893352 Small speed up joining large remote rooms (#9825)
There are a couple of points in `persist_events` where we are doing a
query per event in series, which we can replace.
2021-04-16 14:44:55 +01:00
188 changed files with 4263 additions and 2587 deletions

View File

@@ -1,11 +1,128 @@
Synapse 1.32.0rc1 (2021-04-13)
Synapse 1.33.0rc2 (2021-04-29)
==============================
Bugfixes
--------
- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))
Synapse 1.33.0rc1 (2021-04-28)
==============================
Features
--------
- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))
Bugfixes
--------
- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))
Improved Documentation
----------------------
- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))
Internal Changes
----------------
- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
Synapse 1.32.2 (2021-04-22)
===========================
This release includes a fix for a regression introduced in 1.32.0.
Bugfixes
--------
- Fix a regression in Synapse 1.32.0 and 1.32.1 which caused `LoggingContext` errors in plugins. ([\#9857](https://github.com/matrix-org/synapse/issues/9857))
Synapse 1.32.1 (2021-04-21)
===========================
This release fixes [a regression](https://github.com/matrix-org/synapse/issues/9853)
in Synapse 1.32.0 that caused connected Prometheus instances to become unstable.
However, as this release is still subject to the `LoggingContext` change in 1.32.0,
it is recommended to remain on or downgrade to 1.31.0.
Bugfixes
--------
- Fix a regression in Synapse 1.32.0 which caused Synapse to report large numbers of Prometheus time series, potentially overwhelming Prometheus instances. ([\#9854](https://github.com/matrix-org/synapse/issues/9854))
Synapse 1.32.0 (2021-04-20)
===========================
**Note:** This release introduces [a regression](https://github.com/matrix-org/synapse/issues/9853)
that can overwhelm connected Prometheus instances. This issue was not present in
1.32.0rc1. If affected, it is recommended to downgrade to 1.31.0 in the meantime, and
follow [these instructions](https://github.com/matrix-org/synapse/pull/9854#issuecomment-823472183)
to clean up any excess writeahead logs.
**Note:** This release also mistakenly included a change that may affected Synapse
modules that import `synapse.logging.context.LoggingContext`, such as
[synapse-s3-storage-provider](https://github.com/matrix-org/synapse-s3-storage-provider).
This will be fixed in a later Synapse version.
**Note:** This release requires Python 3.6+ and Postgres 9.6+ or SQLite 3.22+.
This release removes the deprecated `GET /_synapse/admin/v1/users/<user_id>` admin API. Please use the [v2 API](https://github.com/matrix-org/synapse/blob/develop/docs/admin_api/user_admin_api.rst#query-user-account) instead, which has improved capabilities.
This release requires Application Services to use type `m.login.application_services` when registering users via the `/_matrix/client/r0/register` endpoint to comply with the spec. Please ensure your Application Services are up to date.
This release requires Application Services to use type `m.login.application_service` when registering users via the `/_matrix/client/r0/register` endpoint to comply with the spec. Please ensure your Application Services are up to date.
If you are using the `packages.matrix.org` Debian repository for Synapse packages,
note that we have recently updated the expiry date on the gpg signing key. If you see an
error similar to `The following signatures were invalid: EXPKEYSIG F473DD4473365DE1`, you
will need to get a fresh copy of the keys. You can do so with:
```sh
sudo wget -O /usr/share/keyrings/matrix-org-archive-keyring.gpg https://packages.matrix.org/debian/matrix-org-archive-keyring.gpg
```
Bugfixes
--------
- Fix the log lines of nested logging contexts. Broke in 1.32.0rc1. ([\#9829](https://github.com/matrix-org/synapse/issues/9829))
Synapse 1.32.0rc1 (2021-04-13)
==============================
Features
--------

View File

@@ -85,9 +85,52 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.33.0
====================
Account Validity HTML templates can now display a user's expiration date
------------------------------------------------------------------------
This may affect you if you have enabled the account validity feature, and have made use of a
custom HTML template specified by the ``account_validity.template_dir`` or ``account_validity.account_renewed_html_path``
Synapse config options.
The template can now accept an ``expiration_ts`` variable, which represents the unix timestamp in milliseconds for the
future date of which their account has been renewed until. See the
`default template <https://github.com/matrix-org/synapse/blob/release-v1.33.0/synapse/res/templates/account_renewed.html>`_
for an example of usage.
ALso note that a new HTML template, ``account_previously_renewed.html``, has been added. This is is shown to users
when they attempt to renew their account with a valid renewal token that has already been used before. The default
template contents can been found
`here <https://github.com/matrix-org/synapse/blob/release-v1.33.0/synapse/res/templates/account_previously_renewed.html>`_,
and can also accept an ``expiration_ts`` variable. This template replaces the error message users would previously see
upon attempting to use a valid renewal token more than once.
Upgrading to v1.32.0
====================
Regression causing connected Prometheus instances to become overwhelmed
-----------------------------------------------------------------------
This release introduces `a regression <https://github.com/matrix-org/synapse/issues/9853>`_
that can overwhelm connected Prometheus instances. This issue is not present in
Synapse v1.32.0rc1.
If you have been affected, please downgrade to 1.31.0. You then may need to
remove excess writeahead logs in order for Prometheus to recover. Instructions
for doing so are provided
`here <https://github.com/matrix-org/synapse/pull/9854#issuecomment-823472183>`_.
Dropping support for old Python, Postgres and SQLite versions
-------------------------------------------------------------
In line with our `deprecation policy <https://github.com/matrix-org/synapse/blob/release-v1.32.0/docs/deprecation_policy.md>`_,
we've dropped support for Python 3.5 and PostgreSQL 9.5, as they are no longer supported upstream.
This release of Synapse requires Python 3.6+ and PostgresSQL 9.6+ or SQLite 3.22+.
Removal of old List Accounts Admin API
--------------------------------------
@@ -98,6 +141,16 @@ has been available since Synapse 1.7.0 (2019-12-13), and is accessible under ``G
The deprecation of the old endpoint was announced with Synapse 1.28.0 (released on 2021-02-25).
Application Services must use type ``m.login.application_service`` when registering users
-----------------------------------------------------------------------------------------
In compliance with the
`Application Service spec <https://matrix.org/docs/spec/application_service/r0.1.2#server-admin-style-permissions>`_,
Application Services are now required to use the ``m.login.application_service`` type when registering users via the
``/_matrix/client/r0/register`` endpoint. This behaviour was deprecated in Synapse v1.30.0.
Please ensure your Application Services are up to date.
Upgrading to v1.29.0
====================

View File

@@ -1 +0,0 @@
Add a dockerfile for running Synapse in worker-mode under Complement.

View File

@@ -1 +0,0 @@
Speed up federation transmission by using fewer database calls. Contributed by @ShadowJonathan.

View File

@@ -1 +0,0 @@
Apply `pyupgrade` across the codebase.

View File

@@ -1 +0,0 @@
Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg.

View File

@@ -1 +0,0 @@
Move some replication processing out of `generic_worker`.

View File

@@ -1 +0,0 @@
Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.

View File

@@ -1 +0,0 @@
Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms.

View File

@@ -1 +0,0 @@
Replace `HomeServer.get_config()` with inline references.

1
changelog.d/9881.feature Normal file
View File

@@ -0,0 +1 @@
Add experimental option to track memory usage of the caches.

1
changelog.d/9882.misc Normal file
View File

@@ -0,0 +1 @@
Export jemalloc stats to Prometheus if it is being used.

1
changelog.d/9885.misc Normal file
View File

@@ -0,0 +1 @@
Add type hints to presence handler.

1
changelog.d/9886.misc Normal file
View File

@@ -0,0 +1 @@
Reduce memory usage of the LRU caches.

1
changelog.d/9889.feature Normal file
View File

@@ -0,0 +1 @@
Add support for `DELETE /_synapse/admin/v1/rooms/<room_id>`.

1
changelog.d/9889.removal Normal file
View File

@@ -0,0 +1 @@
Mark as deprecated `POST /_synapse/admin/v1/rooms/<room_id>/delete`.

1
changelog.d/9895.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements.

1
changelog.d/9896.bugfix Normal file
View File

@@ -0,0 +1 @@
Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`.

1
changelog.d/9896.misc Normal file
View File

@@ -0,0 +1 @@
Add type hints to the `synapse.handlers` module.

1
changelog.d/9902.feature Normal file
View File

@@ -0,0 +1 @@
Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set.

1
changelog.d/9904.misc Normal file
View File

@@ -0,0 +1 @@
Time response time for external cache requests.

1
changelog.d/9910.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession.

1
changelog.d/9910.feature Normal file
View File

@@ -0,0 +1 @@
Improve performance after joining a large room when presence is enabled.

1
changelog.d/9911.doc Normal file
View File

@@ -0,0 +1 @@
Add `port` argument to the Postgres database sample config section.

1
changelog.d/9916.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance of handling presence when joining large rooms.

View File

@@ -224,16 +224,14 @@ class HomeServer(ReplicationHandler):
destinations = yield self.get_servers_for_context(room_name)
try:
yield self.replication_layer.send_pdus(
[
Pdu.create_new(
context=room_name,
pdu_type="sy.room.message",
content={"sender": sender, "body": body},
origin=self.server_name,
destinations=destinations,
)
]
yield self.replication_layer.send_pdu(
Pdu.create_new(
context=room_name,
pdu_type="sy.room.message",
content={"sender": sender, "body": body},
origin=self.server_name,
destinations=destinations,
)
)
except Exception as e:
logger.exception(e)
@@ -255,7 +253,7 @@ class HomeServer(ReplicationHandler):
origin=self.server_name,
destinations=destinations,
)
yield self.replication_layer.send_pdus([pdu])
yield self.replication_layer.send_pdu(pdu)
except Exception as e:
logger.exception(e)
@@ -267,18 +265,16 @@ class HomeServer(ReplicationHandler):
destinations = yield self.get_servers_for_context(room_name)
try:
yield self.replication_layer.send_pdus(
[
Pdu.create_new(
context=room_name,
is_state=True,
pdu_type="sy.room.member",
state_key=invitee,
content={"membership": "invite"},
origin=self.server_name,
destinations=destinations,
)
]
yield self.replication_layer.send_pdu(
Pdu.create_new(
context=room_name,
is_state=True,
pdu_type="sy.room.member",
state_key=invitee,
content={"membership": "invite"},
origin=self.server_name,
destinations=destinations,
)
)
except Exception as e:
logger.exception(e)

20
debian/changelog vendored
View File

@@ -1,8 +1,24 @@
matrix-synapse-py3 (1.31.0+nmu1) UNRELEASED; urgency=medium
matrix-synapse-py3 (1.32.2) stable; urgency=medium
* New synapse release 1.32.2.
-- Synapse Packaging team <packages@matrix.org> Wed, 22 Apr 2021 12:43:52 +0100
matrix-synapse-py3 (1.32.1) stable; urgency=medium
* New synapse release 1.32.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 21 Apr 2021 14:00:55 +0100
matrix-synapse-py3 (1.32.0) stable; urgency=medium
[ Dan Callahan ]
* Skip tests when DEB_BUILD_OPTIONS contains "nocheck".
-- Dan Callahan <danc@element.io> Mon, 12 Apr 2021 13:07:36 +0000
[ Synapse Packaging team ]
* New synapse release 1.32.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 20 Apr 2021 14:28:39 +0100
matrix-synapse-py3 (1.31.0) stable; urgency=medium

View File

@@ -96,18 +96,48 @@ for port in 8080 8081 8082; do
# Check script parameters
if [ $# -eq 1 ]; then
if [ $1 = "--no-rate-limit" ]; then
# messages rate limit
echo 'rc_messages_per_second: 1000' >> $DIR/etc/$port.config
echo 'rc_message_burst_count: 1000' >> $DIR/etc/$port.config
# registration rate limit
printf 'rc_registration:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
# login rate limit
echo 'rc_login:' >> $DIR/etc/$port.config
printf ' address:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
printf ' account:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
printf ' failed_attempts:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
# Disable any rate limiting
ratelimiting=$(cat <<-RC
rc_message:
per_second: 1000
burst_count: 1000
rc_registration:
per_second: 1000
burst_count: 1000
rc_login:
address:
per_second: 1000
burst_count: 1000
account:
per_second: 1000
burst_count: 1000
failed_attempts:
per_second: 1000
burst_count: 1000
rc_admin_redaction:
per_second: 1000
burst_count: 1000
rc_joins:
local:
per_second: 1000
burst_count: 1000
remote:
per_second: 1000
burst_count: 1000
rc_3pid_validation:
per_second: 1000
burst_count: 1000
rc_invites:
per_room:
per_second: 1000
burst_count: 1000
per_user:
per_second: 1000
burst_count: 1000
RC
)
echo "${ratelimiting}" >> $DIR/etc/$port.config
fi
fi

View File

@@ -427,7 +427,7 @@ the new room. Users on other servers will be unaffected.
The API is:
```
POST /_synapse/admin/v1/rooms/<room_id>/delete
DELETE /_synapse/admin/v1/rooms/<room_id>
```
with a body of:
@@ -528,6 +528,15 @@ You will have to manually handle, if you so choose, the following:
* Users that would have been booted from the room (and will have been force-joined to the Content Violation room).
* Removal of the Content Violation room if desired.
## Deprecated endpoint
The previous deprecated API will be removed in a future release, it was:
```
POST /_synapse/admin/v1/rooms/<room_id>/delete
```
It behaves the same way than the current endpoint except the path and the method.
# Make Room Admin API

View File

@@ -152,6 +152,16 @@ presence:
#
#gc_thresholds: [700, 10, 10]
# The minimum time in seconds between each GC for a generation, regardless of
# the GC thresholds. This ensures that we don't do GC too frequently.
#
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
# generation 0 GCs, etc.
#
# Defaults to `[1s, 10s, 30s]`.
#
#gc_min_interval: [0.5s, 30s, 1m]
# Set the limit on the returned events in the timeline in the get
# and sync operations. The default value is 100. -1 means no upper limit.
#
@@ -810,6 +820,7 @@ caches:
# password: secretpassword
# database: synapse
# host: localhost
# port: 5432
# cp_min: 5
# cp_max: 10
#
@@ -1175,69 +1186,6 @@ url_preview_accept_language:
#
#enable_registration: false
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#
# Note that this is not currently compatible with guest logins.
@@ -1432,6 +1380,91 @@ account_threepid_delegates:
#auto_join_rooms_for_guests: false
## Account Validity ##
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
# The currently available templates are:
#
# * account_renewed.html: Displayed to the user after they have successfully
# renewed their account.
#
# * account_previously_renewed.html: Displayed to the user if they attempt to
# renew their account with a token that is valid, but that has already
# been used. In this case the account is not renewed again.
#
# * invalid_token.html: Displayed to the user when they try to renew an account
# with an unknown or invalid renewal token.
#
# See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for
# default template contents.
#
# The file name of some of these templates can be configured below for legacy
# reasons.
#
#template_dir: "res/templates"
# A custom file name for the 'account_renewed.html' template.
#
# If not set, the file is assumed to be named "account_renewed.html".
#
#account_renewed_html_path: "account_renewed.html"
# A custom file name for the 'invalid_token.html' template.
#
# If not set, the file is assumed to be named "invalid_token.html".
#
#invalid_token_html_path: "invalid_token.html"
## Metrics ###
# Enable collection and rendering of performance metrics
@@ -1878,7 +1911,7 @@ saml2_config:
# sub-properties:
#
# module: The class name of a custom mapping module. Default is
# 'synapse.handlers.oidc_handler.JinjaOidcMappingProvider'.
# 'synapse.handlers.oidc.JinjaOidcMappingProvider'.
# See https://github.com/matrix-org/synapse/blob/master/docs/sso_mapping_providers.md#openid-mapping-providers
# for information on implementing a custom mapping provider.
#

View File

@@ -106,7 +106,7 @@ A custom mapping provider must specify the following methods:
Synapse has a built-in OpenID mapping provider if a custom provider isn't
specified in the config. It is located at
[`synapse.handlers.oidc_handler.JinjaOidcMappingProvider`](../synapse/handlers/oidc_handler.py).
[`synapse.handlers.oidc.JinjaOidcMappingProvider`](../synapse/handlers/oidc.py).
## SAML Mapping Providers
@@ -190,4 +190,4 @@ A custom mapping provider must specify the following methods:
Synapse has a built-in SAML mapping provider if a custom provider isn't
specified in the config. It is located at
[`synapse.handlers.saml_handler.DefaultSamlMappingProvider`](../synapse/handlers/saml_handler.py).
[`synapse.handlers.saml.DefaultSamlMappingProvider`](../synapse/handlers/saml.py).

View File

@@ -41,7 +41,6 @@ files =
synapse/push,
synapse/replication,
synapse/rest,
synapse/secrets.py,
synapse/server.py,
synapse/server_notices,
synapse/spam_checker_api,
@@ -172,3 +171,6 @@ ignore_missing_imports = True
[mypy-txacme.*]
ignore_missing_imports = True
[mypy-pympler.*]
ignore_missing_imports = True

View File

@@ -140,7 +140,7 @@ if __name__ == "__main__":
definitions = {}
for directory in args.directories:
for root, dirs, files in os.walk(directory):
for root, _, files in os.walk(directory):
for filename in files:
if filename.endswith(".py"):
filepath = os.path.join(root, filename)

View File

@@ -48,7 +48,7 @@ args = parser.parse_args()
for directory in args.directories:
for root, dirs, files in os.walk(directory):
for root, _, files in os.walk(directory):
for filename in files:
if filename.endswith(".py"):
filepath = os.path.join(root, filename)

View File

@@ -634,8 +634,11 @@ class Porter(object):
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
await self._setup_sequence(
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
"account_data_sequence",
("room_account_data", "room_tags_revisions", "account_data"),
)
await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
await self._setup_auth_chain_sequence()
# Step 3. Get tables.

View File

@@ -18,8 +18,7 @@ ignore =
# E203: whitespace before ':' (which is contrary to pep8?)
# E731: do not assign a lambda expression, use a def
# E501: Line too long (black enforces this for us)
# B007: Subsection of the bugbear suite (TODO: add in remaining fixes)
ignore=W503,W504,E203,E731,E501,B007
ignore=W503,W504,E203,E731,E501
[isort]
line_length = 88

View File

@@ -21,8 +21,8 @@ import os
import sys
# Check that we're not running on an unsupported Python version.
if sys.version_info < (3, 5):
print("Synapse requires Python 3.5 or above.")
if sys.version_info < (3, 6):
print("Synapse requires Python 3.6 or above.")
sys.exit(1)
# Twisted and canonicaljson will fail to import when this file is executed to
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.32.0rc1"
__version__ = "1.33.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -12,14 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import pymacaroons
from netaddr import IPAddress
from twisted.web.server import Request
import synapse.types
from synapse import event_auth
from synapse.api.auth_blocking import AuthBlocking
from synapse.api.constants import EventTypes, HistoryVisibility, Membership
@@ -36,11 +35,14 @@ from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing as opentracing
from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import StateMap, UserID
from synapse.types import Requester, StateMap, UserID, create_requester
from synapse.util.caches.lrucache import LruCache
from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -65,9 +67,10 @@ class Auth:
"""
FIXME: This class contains a mix of functions for authenticating users
of our client-server API and authenticating events added to room graphs.
The latter should be moved to synapse.handlers.event_auth.EventAuthHandler.
"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -79,19 +82,21 @@ class Auth:
self._auth_blocking = AuthBlocking(self.hs)
self._account_validity = hs.config.account_validity
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
self._track_appservice_user_ips = hs.config.track_appservice_user_ips
self._macaroon_secret_key = hs.config.macaroon_secret_key
async def check_from_context(
self, room_version: str, event, context, do_sig_check=True
):
) -> None:
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
auth_events_by_id = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
event_auth.check(
@@ -148,17 +153,11 @@ class Auth:
raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
async def check_host_in_room(self, room_id, host):
async def check_host_in_room(self, room_id: str, host: str) -> bool:
with Measure(self.clock, "check_host_in_room"):
latest_event_ids = await self.store.is_host_joined(room_id, host)
return latest_event_ids
return await self.store.is_host_joined(room_id, host)
def can_federate(self, event, auth_events):
creation_event = auth_events.get((EventTypes.Create, ""))
return creation_event.content.get("m.federate", True) is True
def get_public_keys(self, invite_event):
def get_public_keys(self, invite_event: EventBase) -> List[Dict[str, Any]]:
return event_auth.get_public_keys(invite_event)
async def get_user_by_req(
@@ -167,7 +166,7 @@ class Auth:
allow_guest: bool = False,
rights: str = "access",
allow_expired: bool = False,
) -> synapse.types.Requester:
) -> Requester:
"""Get a registered user's ID.
Args:
@@ -193,7 +192,7 @@ class Auth:
access_token = self.get_access_token_from_request(request)
user_id, app_service = await self._get_appservice_user_id(request)
if user_id:
if user_id and app_service:
if ip_addr and self._track_appservice_user_ips:
await self.store.insert_client_ip(
user_id=user_id,
@@ -203,9 +202,7 @@ class Auth:
device_id="dummy-device", # stubbed
)
requester = synapse.types.create_requester(
user_id, app_service=app_service
)
requester = create_requester(user_id, app_service=app_service)
request.requester = user_id
opentracing.set_tag("authenticated_entity", user_id)
@@ -222,7 +219,7 @@ class Auth:
shadow_banned = user_info.shadow_banned
# Deny the request if the user account has expired.
if self._account_validity.enabled and not allow_expired:
if self._account_validity_enabled and not allow_expired:
if await self.store.is_account_expired(
user_info.user_id, self.clock.time_msec()
):
@@ -248,7 +245,7 @@ class Auth:
errcode=Codes.GUEST_ACCESS_FORBIDDEN,
)
requester = synapse.types.create_requester(
requester = create_requester(
user_info.user_id,
token_id,
is_guest,
@@ -268,7 +265,9 @@ class Auth:
except KeyError:
raise MissingClientTokenError()
async def _get_appservice_user_id(self, request):
async def _get_appservice_user_id(
self, request: Request
) -> Tuple[Optional[str], Optional[ApplicationService]]:
app_service = self.store.get_app_service_by_token(
self.get_access_token_from_request(request)
)
@@ -280,6 +279,9 @@ class Auth:
if ip_address not in app_service.ip_range_whitelist:
return None, None
# This will always be set by the time Twisted calls us.
assert request.args is not None
if b"user_id" not in request.args:
return app_service.sender, app_service
@@ -384,7 +386,9 @@ class Auth:
logger.warning("Invalid macaroon in auth: %s %s", type(e), e)
raise InvalidClientTokenError("Invalid macaroon passed.")
def _parse_and_validate_macaroon(self, token, rights="access"):
def _parse_and_validate_macaroon(
self, token: str, rights: str = "access"
) -> Tuple[str, bool]:
"""Takes a macaroon and tries to parse and validate it. This is cached
if and only if rights == access and there isn't an expiry.
@@ -429,15 +433,16 @@ class Auth:
return user_id, guest
def validate_macaroon(self, macaroon, type_string, user_id):
def validate_macaroon(
self, macaroon: pymacaroons.Macaroon, type_string: str, user_id: str
) -> None:
"""
validate that a Macaroon is understood by and was signed by this server.
Args:
macaroon(pymacaroons.Macaroon): The macaroon to validate
type_string(str): The kind of token required (e.g. "access",
"delete_pusher")
user_id (str): The user_id required
macaroon: The macaroon to validate
type_string: The kind of token required (e.g. "access", "delete_pusher")
user_id: The user_id required
"""
v = pymacaroons.Verifier()
@@ -462,9 +467,7 @@ class Auth:
if not service:
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
request.requester = synapse.types.create_requester(
service.sender, app_service=service
)
request.requester = create_requester(service.sender, app_service=service)
return service
async def is_server_admin(self, user: UserID) -> bool:
@@ -516,7 +519,7 @@ class Auth:
return auth_ids
async def check_can_change_room_list(self, room_id: str, user: UserID):
async def check_can_change_room_list(self, room_id: str, user: UserID) -> bool:
"""Determine whether the user is allowed to edit the room's entry in the
published room list.
@@ -551,11 +554,11 @@ class Auth:
return user_level >= send_level
@staticmethod
def has_access_token(request: Request):
def has_access_token(request: Request) -> bool:
"""Checks if the request has an access_token.
Returns:
bool: False if no access_token was given, True otherwise.
False if no access_token was given, True otherwise.
"""
# This will always be set by the time Twisted calls us.
assert request.args is not None
@@ -565,13 +568,13 @@ class Auth:
return bool(query_params) or bool(auth_headers)
@staticmethod
def get_access_token_from_request(request: Request):
def get_access_token_from_request(request: Request) -> str:
"""Extracts the access_token from the request.
Args:
request: The http request.
Returns:
unicode: The access_token
The access_token
Raises:
MissingClientTokenError: If there isn't a single access_token in the
request
@@ -646,5 +649,5 @@ class Auth:
% (user_id, room_id),
)
def check_auth_blocking(self, *args, **kwargs):
return self._auth_blocking.check_auth_blocking(*args, **kwargs)
async def check_auth_blocking(self, *args, **kwargs) -> None:
await self._auth_blocking.check_auth_blocking(*args, **kwargs)

View File

@@ -13,18 +13,21 @@
# limitations under the License.
import logging
from typing import Optional
from typing import TYPE_CHECKING, Optional
from synapse.api.constants import LimitBlockingTypes, UserTypes
from synapse.api.errors import Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
from synapse.types import Requester
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class AuthBlocking:
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self._server_notices_mxid = hs.config.server_notices_mxid
@@ -43,7 +46,7 @@ class AuthBlocking:
threepid: Optional[dict] = None,
user_type: Optional[str] = None,
requester: Optional[Requester] = None,
):
) -> None:
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag

View File

@@ -17,6 +17,9 @@
"""Contains constants from the specification."""
# the max size of a (canonical-json-encoded) event
MAX_PDU_SIZE = 65536
# the "depth" field on events is limited to 2**63 - 1
MAX_DEPTH = 2 ** 63 - 1

View File

@@ -23,6 +23,7 @@ from jsonschema import FormatChecker
from synapse.api.constants import EventContentFields
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.types import RoomID, UserID
FILTER_SCHEMA = {
@@ -290,6 +291,13 @@ class Filter:
ev_type = "m.presence"
contains_url = False
labels = [] # type: List[str]
elif isinstance(event, EventBase):
sender = event.sender
room_id = event.room_id
ev_type = event.type
content = event.content
contains_url = isinstance(content.get("url"), str)
labels = content.get(EventContentFields.LABELS, [])
else:
sender = event.get("sender", None)
if not sender:

View File

@@ -30,12 +30,14 @@ from twisted.internet import defer, error, reactor
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
from synapse.api.constants import MAX_PDU_SIZE
from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config.server import ListenerConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.async_helpers import Linearizer
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
@@ -114,6 +116,7 @@ def start_reactor(
def run():
logger.info("Running")
setup_jemalloc_stats()
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
@@ -288,7 +291,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")
async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
async def start(hs: "synapse.server.HomeServer"):
"""
Start a Synapse server or worker.
@@ -300,7 +303,6 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
Args:
hs: homeserver instance
listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
@@ -336,7 +338,7 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa
# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.start_listening()
hs.get_datastore().db_pool.start_profiling()
hs.get_pusherpool().start()
@@ -530,3 +532,25 @@ def sdnotify(state):
# this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
# unless systemd is expecting us to notify it.
logger.warning("Unable to send notification to systemd: %s", e)
def max_request_body_size(config: HomeServerConfig) -> int:
"""Get a suitable maximum size for incoming HTTP requests"""
# Other than media uploads, the biggest request we expect to see is a fully-loaded
# /federation/v1/send request.
#
# The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are
# limited to 65536 bytes (possibly slightly more if the sender didn't use canonical
# json encoding); there is no specced limit to EDUs (see
# https://github.com/matrix-org/matrix-doc/issues/3121).
#
# in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M)
#
max_request_size = 200 * MAX_PDU_SIZE
# if we have a media repo enabled, we may need to allow larger uploads than that
if config.media.can_load_media_repo:
max_request_size = max(max_request_size, config.media.max_upload_size)
return max_request_size

View File

@@ -70,12 +70,6 @@ class AdminCmdSlavedStore(
class AdminCmdServer(HomeServer):
DATASTORE_CLASS = AdminCmdSlavedStore
def _listen_http(self, listener_config):
pass
def start_listening(self, listeners):
pass
async def export_data_command(hs, args):
"""Export data for a user.
@@ -232,7 +226,7 @@ def start(config_options):
async def run():
with LoggingContext("command"):
_base.start(ss, [])
_base.start(ss)
await args.func(ss, args)
_base.start_worker_reactor(

View File

@@ -15,7 +15,7 @@
# limitations under the License.
import logging
import sys
from typing import Dict, Iterable, Optional
from typing import Dict, Optional
from twisted.internet import address
from twisted.web.resource import IResource
@@ -32,7 +32,7 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
from synapse.app._base import register_start
from synapse.app._base import max_request_body_size, register_start
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -55,7 +55,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -64,7 +63,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events, login, room
from synapse.rest.client.v1 import events, login, presence, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
@@ -110,6 +109,7 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
@@ -121,26 +121,6 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.generic_worker")
class PresenceStatusStubServlet(RestServlet):
"""If presence is disabled this servlet can be used to stub out setting
presence status.
"""
PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
async def on_GET(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {"presence": "offline"}
async def on_PUT(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {}
class KeyUploadServlet(RestServlet):
"""An implementation of the `KeyUploadServlet` that responds to read only
requests, but otherwise proxies through to the master instance.
@@ -241,6 +221,7 @@ class GenericWorkerSlavedStore(
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
@@ -259,7 +240,6 @@ class GenericWorkerSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedPresenceStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
@@ -327,10 +307,7 @@ class GenericWorkerServer(HomeServer):
user_directory.register_servlets(self, resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
presence.register_servlets(self, resource)
groups.register_servlets(self, resource)
@@ -390,14 +367,16 @@ class GenericWorkerServer(HomeServer):
listener_config,
root_resource,
self.version_string,
max_request_body_size=max_request_body_size(self.config),
reactor=self.get_reactor(),
),
reactor=self.get_reactor(),
)
logger.info("Synapse worker now listening on port %d", port)
def start_listening(self, listeners: Iterable[ListenerConfig]):
for listener in listeners:
def start_listening(self):
for listener in self.config.worker_listeners:
if listener.type == "http":
self._listen_http(listener)
elif listener.type == "manhole":
@@ -475,6 +454,10 @@ def start(config_options):
config.server.update_user_directory = False
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
if config.server.gc_seconds:
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
hs = GenericWorkerServer(
config.server_name,
@@ -490,7 +473,7 @@ def start(config_options):
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
register_start(_base.start, hs, config.worker_listeners)
register_start(_base.start, hs)
_base.start_worker_reactor("synapse-generic-worker", config)

View File

@@ -17,7 +17,7 @@
import logging
import os
import sys
from typing import Iterable, Iterator
from typing import Iterator
from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
@@ -36,7 +36,13 @@ from synapse.api.urls import (
WEB_CLIENT_PREFIX,
)
from synapse.app import _base
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
from synapse.app._base import (
listen_ssl,
listen_tcp,
max_request_body_size,
quit_with_error,
register_start,
)
from synapse.config._base import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
@@ -126,19 +132,21 @@ class SynapseHomeServer(HomeServer):
else:
root_resource = OptionsResource()
root_resource = create_resource_tree(resources, root_resource)
site = SynapseSite(
"synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
site_tag,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
max_request_body_size=max_request_body_size(self.config),
reactor=self.get_reactor(),
)
if tls:
ports = listen_ssl(
bind_addresses,
port,
SynapseSite(
"synapse.access.https.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
site,
self.tls_server_context_factory,
reactor=self.get_reactor(),
)
@@ -148,13 +156,7 @@ class SynapseHomeServer(HomeServer):
ports = listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
site,
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d", port)
@@ -273,14 +275,14 @@ class SynapseHomeServer(HomeServer):
return resources
def start_listening(self, listeners: Iterable[ListenerConfig]):
def start_listening(self):
if self.config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)
for listener in listeners:
for listener in self.config.server.listeners:
if listener.type == "http":
self._listening_services.extend(
self._listener_http(self.config, listener)
@@ -339,6 +341,10 @@ def setup(config_options):
sys.exit(0)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
if config.server.gc_seconds:
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
hs = SynapseHomeServer(
config.server_name,
@@ -412,7 +418,7 @@ def setup(config_options):
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
await _base.start(hs, config.listeners)
await _base.start(hs)
hs.get_datastore().db_pool.updates.start_doing_background_updates()

View File

@@ -1,21 +1,22 @@
from typing import Any, Iterable, List, Optional
from synapse.config import (
account_validity,
api,
appservice,
auth,
captcha,
cas,
consent_config,
consent,
database,
emailconfig,
experimental,
groups,
jwt_config,
jwt,
key,
logger,
metrics,
oidc_config,
oidc,
password_auth_providers,
push,
ratelimiting,
@@ -23,9 +24,9 @@ from synapse.config import (
registration,
repository,
room_directory,
saml2_config,
saml2,
server,
server_notices_config,
server_notices,
spam_checker,
sso,
stats,
@@ -59,15 +60,16 @@ class RootConfig:
captcha: captcha.CaptchaConfig
voip: voip.VoipConfig
registration: registration.RegistrationConfig
account_validity: account_validity.AccountValidityConfig
metrics: metrics.MetricsConfig
api: api.ApiConfig
appservice: appservice.AppServiceConfig
key: key.KeyConfig
saml2: saml2_config.SAML2Config
saml2: saml2.SAML2Config
cas: cas.CasConfig
sso: sso.SSOConfig
oidc: oidc_config.OIDCConfig
jwt: jwt_config.JWTConfig
oidc: oidc.OIDCConfig
jwt: jwt.JWTConfig
auth: auth.AuthConfig
email: emailconfig.EmailConfig
worker: workers.WorkerConfig
@@ -76,9 +78,9 @@ class RootConfig:
spamchecker: spam_checker.SpamCheckerConfig
groups: groups.GroupsConfig
userdirectory: user_directory.UserDirectoryConfig
consent: consent_config.ConsentConfig
consent: consent.ConsentConfig
stats: stats.StatsConfig
servernotices: server_notices_config.ServerNoticesConfig
servernotices: server_notices.ServerNoticesConfig
roomdirectory: room_directory.RoomDirectoryConfig
thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig
tracer: tracer.TracerConfig

View File

@@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config, ConfigError
class AccountValidityConfig(Config):
section = "account_validity"
def read_config(self, config, **kwargs):
account_validity_config = config.get("account_validity") or {}
self.account_validity_enabled = account_validity_config.get("enabled", False)
self.account_validity_renew_by_email_enabled = (
"renew_at" in account_validity_config
)
if self.account_validity_enabled:
if "period" in account_validity_config:
self.account_validity_period = self.parse_duration(
account_validity_config["period"]
)
else:
raise ConfigError("'period' is required when using account validity")
if "renew_at" in account_validity_config:
self.account_validity_renew_at = self.parse_duration(
account_validity_config["renew_at"]
)
if "renew_email_subject" in account_validity_config:
self.account_validity_renew_email_subject = account_validity_config[
"renew_email_subject"
]
else:
self.account_validity_renew_email_subject = "Renew your %(app)s account"
self.account_validity_startup_job_max_delta = (
self.account_validity_period * 10.0 / 100.0
)
if self.account_validity_renew_by_email_enabled:
if not self.public_baseurl:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
# Load account validity templates.
account_validity_template_dir = account_validity_config.get("template_dir")
account_renewed_template_filename = account_validity_config.get(
"account_renewed_html_path", "account_renewed.html"
)
invalid_token_template_filename = account_validity_config.get(
"invalid_token_html_path", "invalid_token.html"
)
# Read and store template content
(
self.account_validity_account_renewed_template,
self.account_validity_account_previously_renewed_template,
self.account_validity_invalid_token_template,
) = self.read_templates(
[
account_renewed_template_filename,
"account_previously_renewed.html",
invalid_token_template_filename,
],
account_validity_template_dir,
)
def generate_config_section(self, **kwargs):
return """\
## Account Validity ##
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
# The currently available templates are:
#
# * account_renewed.html: Displayed to the user after they have successfully
# renewed their account.
#
# * account_previously_renewed.html: Displayed to the user if they attempt to
# renew their account with a token that is valid, but that has already
# been used. In this case the account is not renewed again.
#
# * invalid_token.html: Displayed to the user when they try to renew an account
# with an unknown or invalid renewal token.
#
# See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for
# default template contents.
#
# The file name of some of these templates can be configured below for legacy
# reasons.
#
#template_dir: "res/templates"
# A custom file name for the 'account_renewed.html' template.
#
# If not set, the file is assumed to be named "account_renewed.html".
#
#account_renewed_html_path: "account_renewed.html"
# A custom file name for the 'invalid_token.html' template.
#
# If not set, the file is assumed to be named "invalid_token.html".
#
#invalid_token_html_path: "invalid_token.html"
"""

View File

@@ -17,6 +17,8 @@ import re
import threading
from typing import Callable, Dict
from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
# The prefix for all cache factor-related environment variables
@@ -189,6 +191,15 @@ class CacheConfig(Config):
)
self.cache_factors[cache] = factor
self.track_memory_usage = cache_config.get("track_memory_usage", False)
if self.track_memory_usage:
try:
check_requirements("cache_memory")
except DependencyException as e:
raise ConfigError(
e.message # noqa: B306, DependencyException.message is a property
)
# Resize all caches (if necessary) with the new factors we've loaded
self.resize_all_caches()

View File

@@ -58,6 +58,7 @@ DEFAULT_CONFIG = """\
# password: secretpassword
# database: synapse
# host: localhost
# port: 5432
# cp_min: 5
# cp_max: 10
#

View File

@@ -299,7 +299,7 @@ class EmailConfig(Config):
"client_base_url", email_config.get("riot_base_url", None)
)
if self.account_validity.renew_by_email_enabled:
if self.account_validity_renew_by_email_enabled:
expiry_template_html = email_config.get(
"expiry_template_html", "notice_expiry.html"
)

View File

@@ -12,25 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import RootConfig
from .account_validity import AccountValidityConfig
from .api import ApiConfig
from .appservice import AppServiceConfig
from .auth import AuthConfig
from .cache import CacheConfig
from .captcha import CaptchaConfig
from .cas import CasConfig
from .consent_config import ConsentConfig
from .consent import ConsentConfig
from .database import DatabaseConfig
from .emailconfig import EmailConfig
from .experimental import ExperimentalConfig
from .federation import FederationConfig
from .groups import GroupsConfig
from .jwt_config import JWTConfig
from .jwt import JWTConfig
from .key import KeyConfig
from .logger import LoggingConfig
from .metrics import MetricsConfig
from .oidc_config import OIDCConfig
from .oidc import OIDCConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
@@ -39,9 +39,9 @@ from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room import RoomConfig
from .room_directory import RoomDirectoryConfig
from .saml2_config import SAML2Config
from .saml2 import SAML2Config
from .server import ServerConfig
from .server_notices_config import ServerNoticesConfig
from .server_notices import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
from .sso import SSOConfig
from .stats import StatsConfig
@@ -68,6 +68,7 @@ class HomeServerConfig(RootConfig):
CaptchaConfig,
VoipConfig,
RegistrationConfig,
AccountValidityConfig,
MetricsConfig,
ApiConfig,
AppServiceConfig,

View File

@@ -31,7 +31,6 @@ from twisted.logger import (
)
import synapse
from synapse.app import _base as appbase
from synapse.logging._structured import setup_structured_logging
from synapse.logging.context import LoggingContextFilter
from synapse.logging.filter import MetadataFilter
@@ -318,6 +317,8 @@ def setup_logging(
# Perform one-time logging configuration.
_setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
# Add a SIGHUP handler to reload the logging configuration, if one is available.
from synapse.app import _base as appbase
appbase.register_sighup(_reload_logging_config, log_config_path)
# Log immediately so we can grep backwards.

View File

@@ -14,20 +14,23 @@
# limitations under the License.
from collections import Counter
from typing import Iterable, List, Mapping, Optional, Tuple, Type
from typing import Collection, Iterable, List, Mapping, Optional, Tuple, Type
import attr
from synapse.config._util import validate_config
from synapse.config.sso import SsoAttributeRequirement
from synapse.python_dependencies import DependencyException, check_requirements
from synapse.types import Collection, JsonDict
from synapse.types import JsonDict
from synapse.util.module_loader import load_module
from synapse.util.stringutils import parse_and_validate_mxc_uri
from ._base import Config, ConfigError, read_file
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider"
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc.JinjaOidcMappingProvider"
# The module that JinjaOidcMappingProvider is in was renamed, we want to
# transparently handle both the same.
LEGACY_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider"
class OIDCConfig(Config):
@@ -403,6 +406,8 @@ def _parse_oidc_config_dict(
"""
ump_config = oidc_config.get("user_mapping_provider", {})
ump_config.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
if ump_config.get("module") == LEGACY_USER_MAPPING_PROVIDER:
ump_config["module"] = DEFAULT_USER_MAPPING_PROVIDER
ump_config.setdefault("config", {})
(

View File

@@ -12,74 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pkg_resources
from synapse.api.constants import RoomCreationPreset
from synapse.config._base import Config, ConfigError
from synapse.types import RoomAlias, UserID
from synapse.util.stringutils import random_string_with_symbols, strtobool
class AccountValidityConfig(Config):
section = "accountvalidity"
def __init__(self, config, synapse_config):
if config is None:
return
super().__init__()
self.enabled = config.get("enabled", False)
self.renew_by_email_enabled = "renew_at" in config
if self.enabled:
if "period" in config:
self.period = self.parse_duration(config["period"])
else:
raise ConfigError("'period' is required when using account validity")
if "renew_at" in config:
self.renew_at = self.parse_duration(config["renew_at"])
if "renew_email_subject" in config:
self.renew_email_subject = config["renew_email_subject"]
else:
self.renew_email_subject = "Renew your %(app)s account"
self.startup_job_max_delta = self.period * 10.0 / 100.0
if self.renew_by_email_enabled:
if "public_baseurl" not in synapse_config:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
template_dir = config.get("template_dir")
if not template_dir:
template_dir = pkg_resources.resource_filename("synapse", "res/templates")
if "account_renewed_html_path" in config:
file_path = os.path.join(template_dir, config["account_renewed_html_path"])
self.account_renewed_html_content = self.read_file(
file_path, "account_validity.account_renewed_html_path"
)
else:
self.account_renewed_html_content = (
"<html><body>Your account has been successfully renewed.</body><html>"
)
if "invalid_token_html_path" in config:
file_path = os.path.join(template_dir, config["invalid_token_html_path"])
self.invalid_token_html_content = self.read_file(
file_path, "account_validity.invalid_token_html_path"
)
else:
self.invalid_token_html_content = (
"<html><body>Invalid renewal token.</body><html>"
)
class RegistrationConfig(Config):
section = "registration"
@@ -92,10 +30,6 @@ class RegistrationConfig(Config):
str(config["disable_registration"])
)
self.account_validity = AccountValidityConfig(
config.get("account_validity") or {}, config
)
self.registrations_require_3pid = config.get("registrations_require_3pid", [])
self.allowed_local_3pids = config.get("allowed_local_3pids", [])
self.enable_3pid_lookup = config.get("enable_3pid_lookup", True)
@@ -207,69 +141,6 @@ class RegistrationConfig(Config):
#
#enable_registration: false
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10%% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %%(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#
# Note that this is not currently compatible with guest logins.

View File

@@ -25,7 +25,10 @@ from ._util import validate_config
logger = logging.getLogger(__name__)
DEFAULT_USER_MAPPING_PROVIDER = (
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.saml.DefaultSamlMappingProvider"
# The module that DefaultSamlMappingProvider is in was renamed, we want to
# transparently handle both the same.
LEGACY_USER_MAPPING_PROVIDER = (
"synapse.handlers.saml_handler.DefaultSamlMappingProvider"
)
@@ -97,6 +100,8 @@ class SAML2Config(Config):
# Use the default user mapping provider if not set
ump_dict.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
if ump_dict.get("module") == LEGACY_USER_MAPPING_PROVIDER:
ump_dict["module"] = DEFAULT_USER_MAPPING_PROVIDER
# Ensure a config is present
ump_dict["config"] = ump_dict.get("config") or {}

View File

@@ -19,7 +19,7 @@ import logging
import os.path
import re
from textwrap import indent
from typing import Any, Dict, Iterable, List, Optional, Set
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
import attr
import yaml
@@ -235,7 +235,11 @@ class ServerConfig(Config):
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl")
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
# Whether to enable user presence.
presence_config = config.get("presence") or {}
@@ -407,10 +411,6 @@ class ServerConfig(Config):
config_path=("federation_ip_range_blacklist",),
)
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
# sending out any replication updates.
@@ -572,6 +572,7 @@ class ServerConfig(Config):
_warn_if_webclient_configured(self.listeners)
self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None))
@attr.s
class LimitRemoteRoomsConfig:
@@ -917,6 +918,16 @@ class ServerConfig(Config):
#
#gc_thresholds: [700, 10, 10]
# The minimum time in seconds between each GC for a generation, regardless of
# the GC thresholds. This ensures that we don't do GC too frequently.
#
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
# generation 0 GCs, etc.
#
# Defaults to `[1s, 10s, 30s]`.
#
#gc_min_interval: [0.5s, 30s, 1m]
# Set the limit on the returned events in the timeline in the get
# and sync operations. The default value is 100. -1 means no upper limit.
#
@@ -1305,6 +1316,24 @@ class ServerConfig(Config):
help="Turn on the twisted telnet manhole service on the given port.",
)
def read_gc_intervals(self, durations) -> Optional[Tuple[float, float, float]]:
"""Reads the three durations for the GC min interval option, returning seconds."""
if durations is None:
return None
try:
if len(durations) != 3:
raise ValueError()
return (
self.parse_duration(durations[0]) / 1000,
self.parse_duration(durations[1]) / 1000,
self.parse_duration(durations[2]) / 1000,
)
except Exception:
raise ConfigError(
"Value of `gc_min_interval` must be a list of three durations if set"
)
def is_threepid_reserved(reserved_threepids, threepid):
"""Check the threepid against the reserved threepid config

View File

@@ -64,6 +64,14 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream. Currently
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
"""
events = attr.ib(
@@ -85,6 +93,11 @@ class WriterLocations:
type=List[str],
converter=_instance_to_list_converter,
)
presence = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
class WorkerConfig(Config):
@@ -188,7 +201,14 @@ class WorkerConfig(Config):
# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
for stream in (
"events",
"typing",
"to_device",
"account_data",
"receipts",
"presence",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
@@ -215,6 +235,11 @@ class WorkerConfig(Config):
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")
if len(self.writers.presence) != 1:
raise ConfigError(
"Must only specify one instance to handle `presence` messages."
)
self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)

View File

@@ -48,7 +48,7 @@ def check_event_content_hash(
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
hashes = getattr(event, "hashes", None)
# nb it might be a frozendict or a dict
if not isinstance(hashes, collections.abc.Mapping):
raise SynapseError(

View File

@@ -16,8 +16,7 @@
import abc
import logging
import urllib
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
import attr
from signedjson.key import (
@@ -42,17 +41,18 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.config.key import TrustedKeyServer
from synapse.events import EventBase
from synapse.events.utils import prune_event_dict
from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
preserve_fn,
run_in_background,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import yieldable_gather_results
from synapse.util.metrics import Measure
from synapse.util.async_helpers import Linearizer, yieldable_gather_results
from synapse.util.retryutils import NotRetryingDestination
if TYPE_CHECKING:
@@ -74,8 +74,6 @@ class VerifyJsonRequest:
minimum_valid_until_ts: time at which we require the signing key to
be valid. (0 implies we don't care)
request_name: The name of the request.
key_ids: The set of key_ids to that could be used to verify the JSON object
key_ready (Deferred[str, str, nacl.signing.VerifyKey]):
@@ -88,20 +86,93 @@ class VerifyJsonRequest:
"""
server_name = attr.ib(type=str)
json_object = attr.ib(type=JsonDict)
json_object_callback = attr.ib(type=Callable[[], JsonDict])
minimum_valid_until_ts = attr.ib(type=int)
request_name = attr.ib(type=str)
key_ids = attr.ib(init=False, type=List[str])
key_ready = attr.ib(default=attr.Factory(defer.Deferred), type=defer.Deferred)
key_ids = attr.ib(type=List[str])
def __attrs_post_init__(self):
self.key_ids = signature_ids(self.json_object, self.server_name)
@staticmethod
def from_json_object(
server_name: str, minimum_valid_until_ms: int, json_object: JsonDict
):
key_ids = signature_ids(json_object, server_name)
return VerifyJsonRequest(
server_name, lambda: json_object, minimum_valid_until_ms, key_ids
)
@staticmethod
def from_event(
server_name: str,
minimum_valid_until_ms: int,
event: EventBase,
):
key_ids = list(event.signatures.get(server_name, []))
return VerifyJsonRequest(
server_name,
lambda: prune_event_dict(event.room_version, event.get_pdu_json()),
minimum_valid_until_ms,
key_ids,
)
class KeyLookupError(ValueError):
pass
@attr.s(slots=True)
class _QueueValue:
server_name = attr.ib(type=str)
minimum_valid_until_ts = attr.ib(type=int)
key_ids = attr.ib(type=List[str])
class _Queue:
def __init__(self, name, clock, process_items):
self._name = name
self._clock = clock
self._is_processing = False
self._next_values = []
self.process_items = process_items
async def add_to_queue(self, value: _QueueValue) -> Dict[str, FetchKeyResult]:
d = defer.Deferred()
self._next_values.append((value, d))
if not self._is_processing:
run_as_background_process(self._name, self._unsafe_process)
return await make_deferred_yieldable(d)
async def _unsafe_process(self):
try:
if self._is_processing:
return
self._is_processing = True
while self._next_values:
# We purposefully defer to the next loop.
await self._clock.sleep(0)
next_values = self._next_values
self._next_values = []
try:
values = [value for value, _ in next_values]
results = await self.process_items(values)
for value, deferred in next_values:
with PreserveLoggingContext():
deferred.callback(results.get(value.server_name, {}))
except Exception as e:
for _, deferred in next_values:
deferred.errback(e)
finally:
self._is_processing = False
class Keyring:
def __init__(
self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None
@@ -116,12 +187,7 @@ class Keyring:
)
self._key_fetchers = key_fetchers
# map from server name to Deferred. Has an entry for each server with
# an ongoing key download; the Deferred completes once the download
# completes.
#
# These are regular, logcontext-agnostic Deferreds.
self.key_downloads = {} # type: Dict[str, defer.Deferred]
self._server_queue = Linearizer("keyring_server")
def verify_json_for_server(
self,
@@ -130,365 +196,150 @@ class Keyring:
validity_time: int,
request_name: str,
) -> defer.Deferred:
"""Verify that a JSON object has been signed by a given server
Args:
server_name: name of the server which must have signed this object
json_object: object to be checked
validity_time: timestamp at which we require the signing key to
be valid. (0 implies we don't care)
request_name: an identifier for this json object (eg, an event id)
for logging.
Returns:
Deferred[None]: completes if the the object was correctly signed, otherwise
errbacks with an error
"""
req = VerifyJsonRequest(server_name, json_object, validity_time, request_name)
requests = (req,)
return make_deferred_yieldable(self._verify_objects(requests)[0])
request = VerifyJsonRequest.from_json_object(
server_name,
validity_time,
json_object,
)
return defer.ensureDeferred(self._verify_object(request))
def verify_json_objects_for_server(
self, server_and_json: Iterable[Tuple[str, dict, int, str]]
) -> List[defer.Deferred]:
"""Bulk verifies signatures of json objects, bulk fetching keys as
necessary.
Args:
server_and_json:
Iterable of (server_name, json_object, validity_time, request_name)
tuples.
validity_time is a timestamp at which the signing key must be
valid.
request_name is an identifier for this json object (eg, an event id)
for logging.
Returns:
List<Deferred[None]>: for each input triplet, a deferred indicating success
or failure to verify each json object's signature for the given
server_name. The deferreds run their callbacks in the sentinel
logcontext.
"""
return self._verify_objects(
VerifyJsonRequest(server_name, json_object, validity_time, request_name)
for server_name, json_object, validity_time, request_name in server_and_json
)
def _verify_objects(
self, verify_requests: Iterable[VerifyJsonRequest]
) -> List[defer.Deferred]:
"""Does the work of verify_json_[objects_]for_server
Args:
verify_requests: Iterable of verification requests.
Returns:
List<Deferred[None]>: for each input item, a deferred indicating success
or failure to verify each json object's signature for the given
server_name. The deferreds run their callbacks in the sentinel
logcontext.
"""
# a list of VerifyJsonRequests which are awaiting a key lookup
key_lookups = []
handle = preserve_fn(_handle_key_deferred)
def process(verify_request: VerifyJsonRequest) -> defer.Deferred:
"""Process an entry in the request list
Adds a key request to key_lookups, and returns a deferred which
will complete or fail (in the sentinel context) when verification completes.
"""
if not verify_request.key_ids:
return defer.fail(
SynapseError(
400,
"Not signed by %s" % (verify_request.server_name,),
Codes.UNAUTHORIZED,
)
return [
defer.ensureDeferred(
run_in_background(
self._verify_object,
VerifyJsonRequest.from_json_object(
server_name,
validity_time,
json_object,
),
)
logger.debug(
"Verifying %s for %s with key_ids %s, min_validity %i",
verify_request.request_name,
verify_request.server_name,
verify_request.key_ids,
verify_request.minimum_valid_until_ts,
)
for server_name, json_object, validity_time, request_name in server_and_json
]
# add the key request to the queue, but don't start it off yet.
key_lookups.append(verify_request)
# now run _handle_key_deferred, which will wait for the key request
# to complete and then do the verification.
#
# We want _handle_key_request to log to the right context, so we
# wrap it with preserve_fn (aka run_in_background)
return handle(verify_request)
results = [process(r) for r in verify_requests]
if key_lookups:
run_in_background(self._start_key_lookups, key_lookups)
return results
async def _start_key_lookups(
self, verify_requests: List[VerifyJsonRequest]
) -> None:
"""Sets off the key fetches for each verify request
Once each fetch completes, verify_request.key_ready will be resolved.
Args:
verify_requests:
"""
try:
# map from server name to a set of outstanding request ids
server_to_request_ids = {} # type: Dict[str, Set[int]]
for verify_request in verify_requests:
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids.setdefault(server_name, set()).add(request_id)
# Wait for any previous lookups to complete before proceeding.
await self.wait_for_previous_lookups(server_to_request_ids.keys())
# take out a lock on each of the servers by sticking a Deferred in
# key_downloads
for server_name in server_to_request_ids.keys():
self.key_downloads[server_name] = defer.Deferred()
logger.debug("Got key lookup lock on %s", server_name)
# When we've finished fetching all the keys for a given server_name,
# drop the lock by resolving the deferred in key_downloads.
def drop_server_lock(server_name):
d = self.key_downloads.pop(server_name)
d.callback(None)
def lookup_done(res, verify_request):
server_name = verify_request.server_name
server_requests = server_to_request_ids[server_name]
server_requests.remove(id(verify_request))
# if there are no more requests for this server, we can drop the lock.
if not server_requests:
logger.debug("Releasing key lookup lock on %s", server_name)
drop_server_lock(server_name)
return res
for verify_request in verify_requests:
verify_request.key_ready.addBoth(lookup_done, verify_request)
# Actually start fetching keys.
self._get_server_verify_keys(verify_requests)
except Exception:
logger.exception("Error starting key lookups")
async def wait_for_previous_lookups(self, server_names: Iterable[str]) -> None:
"""Waits for any previous key lookups for the given servers to finish.
Args:
server_names: list of servers which we want to look up
Returns:
Resolves once all key lookups for the given servers have
completed. Follows the synapse rules of logcontext preservation.
"""
loop_count = 1
while True:
wait_on = [
(server_name, self.key_downloads[server_name])
for server_name in server_names
if server_name in self.key_downloads
]
if not wait_on:
break
logger.info(
"Waiting for existing lookups for %s to complete [loop %i]",
[w[0] for w in wait_on],
loop_count,
def verify_events_for_server(
self, server_and_json: Iterable[Tuple[str, EventBase, int]]
) -> List[defer.Deferred]:
return [
run_in_background(
self._verify_object,
VerifyJsonRequest.from_event(
server_name,
validity_time,
event,
),
)
with PreserveLoggingContext():
await defer.DeferredList((w[1] for w in wait_on))
for server_name, event, validity_time in server_and_json
]
loop_count += 1
async def _verify_object(self, verify_request: VerifyJsonRequest):
# TODO: Use a batching thing.
with (await self._server_queue.queue(verify_request.server_name)):
found_keys: Dict[str, FetchKeyResult] = {}
missing_key_ids = set(verify_request.key_ids)
for fetcher in self._key_fetchers:
if not missing_key_ids:
break
def _get_server_verify_keys(self, verify_requests: List[VerifyJsonRequest]) -> None:
"""Tries to find at least one key for each verify request
For each verify_request, verify_request.key_ready is called back with
params (server_name, key_id, VerifyKey) if a key is found, or errbacked
with a SynapseError if none of the keys are found.
Args:
verify_requests: list of verify requests
"""
remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called}
async def do_iterations():
try:
with Measure(self.clock, "get_server_verify_keys"):
for f in self._key_fetchers:
if not remaining_requests:
return
await self._attempt_key_fetches_with_fetcher(
f, remaining_requests
)
# look for any requests which weren't satisfied
while remaining_requests:
verify_request = remaining_requests.pop()
rq_str = (
"VerifyJsonRequest(server=%s, key_ids=%s, min_valid=%i)"
% (
verify_request.server_name,
verify_request.key_ids,
verify_request.minimum_valid_until_ts,
)
)
# If we run the errback immediately, it may cancel our
# loggingcontext while we are still in it, so instead we
# schedule it for the next time round the reactor.
#
# (this also ensures that we don't get a stack overflow if we
# has a massive queue of lookups waiting for this server).
self.clock.call_later(
0,
verify_request.key_ready.errback,
SynapseError(
401,
"Failed to find any key to satisfy %s" % (rq_str,),
Codes.UNAUTHORIZED,
),
)
except Exception as err:
# we don't really expect to get here, because any errors should already
# have been caught and logged. But if we do, let's log the error and make
# sure that all of the deferreds are resolved.
logger.error("Unexpected error in _get_server_verify_keys: %s", err)
with PreserveLoggingContext():
for verify_request in remaining_requests:
if not verify_request.key_ready.called:
verify_request.key_ready.errback(err)
run_in_background(do_iterations)
async def _attempt_key_fetches_with_fetcher(
self, fetcher: "KeyFetcher", remaining_requests: Set[VerifyJsonRequest]
):
"""Use a key fetcher to attempt to satisfy some key requests
Args:
fetcher: fetcher to use to fetch the keys
remaining_requests: outstanding key requests.
Any successfully-completed requests will be removed from the list.
"""
# The keys to fetch.
# server_name -> key_id -> min_valid_ts
missing_keys = defaultdict(dict) # type: Dict[str, Dict[str, int]]
for verify_request in remaining_requests:
# any completed requests should already have been removed
assert not verify_request.key_ready.called
keys_for_server = missing_keys[verify_request.server_name]
for key_id in verify_request.key_ids:
# If we have several requests for the same key, then we only need to
# request that key once, but we should do so with the greatest
# min_valid_until_ts of the requests, so that we can satisfy all of
# the requests.
keys_for_server[key_id] = max(
keys_for_server.get(key_id, -1),
keys = await fetcher.get_keys(
verify_request.server_name,
list(missing_key_ids),
verify_request.minimum_valid_until_ts,
)
results = await fetcher.get_keys(missing_keys)
for key_id, key in keys.items():
if not key:
continue
completed = []
for verify_request in remaining_requests:
server_name = verify_request.server_name
if key.valid_until_ts < verify_request.minimum_valid_until_ts:
continue
existing_key = found_keys.get(key_id)
if existing_key:
if key.valid_until_ts <= existing_key.valid_until_ts:
continue
found_keys[key_id] = key
missing_key_ids.difference_update(found_keys)
if missing_key_ids:
raise SynapseError(
400,
"Missing keys for %s: %s"
% (verify_request.server_name, missing_key_ids),
Codes.UNAUTHORIZED,
)
# see if any of the keys we got this time are sufficient to
# complete this VerifyJsonRequest.
result_keys = results.get(server_name, {})
for key_id in verify_request.key_ids:
fetch_key_result = result_keys.get(key_id)
if not fetch_key_result:
# we didn't get a result for this key
continue
if (
fetch_key_result.valid_until_ts
< verify_request.minimum_valid_until_ts
):
# key was not valid at this point
continue
# we have a valid key for this request. If we run the callback
# immediately, it may cancel our loggingcontext while we are still in
# it, so instead we schedule it for the next time round the reactor.
#
# (this also ensures that we don't get a stack overflow if we had
# a massive queue of lookups waiting for this server).
logger.debug(
"Found key %s:%s for %s",
server_name,
key_id,
verify_request.request_name,
)
self.clock.call_later(
0,
verify_request.key_ready.callback,
(server_name, key_id, fetch_key_result.verify_key),
)
completed.append(verify_request)
break
remaining_requests.difference_update(completed)
verify_key = found_keys[key_id].verify_key
try:
json_object = verify_request.json_object_callback()
verify_signed_json(
json_object,
verify_request.server_name,
verify_key,
)
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
verify_request.server_name,
verify_key.alg,
verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s: %s"
% (
verify_request.server_name,
verify_key.alg,
verify_key.version,
str(e),
),
Codes.UNAUTHORIZED,
)
class KeyFetcher(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def get_keys(
self, keys_to_fetch: Dict[str, Dict[str, int]]
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""
Args:
keys_to_fetch:
the keys to be fetched. server_name -> key_id -> min_valid_ts
def __init__(self, hs: "HomeServer"):
self._queue = _Queue(self.__class__.__name__, hs.get_clock(), self._fetch_keys)
Returns:
Map from server_name -> key_id -> FetchKeyResult
"""
raise NotImplementedError
async def get_keys(
self, server_name: str, key_ids: List[str], minimum_valid_until_ts: int
) -> Dict[str, FetchKeyResult]:
return await self._queue.add_to_queue(
_QueueValue(
server_name=server_name,
key_ids=key_ids,
minimum_valid_until_ts=minimum_valid_until_ts,
)
)
@abc.abstractmethod
async def _fetch_keys(
self, keys_to_fetch: List[_QueueValue]
) -> Dict[str, Dict[str, FetchKeyResult]]:
pass
class StoreKeyFetcher(KeyFetcher):
"""KeyFetcher impl which fetches keys from our data store"""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.store = hs.get_datastore()
async def get_keys(
self, keys_to_fetch: Dict[str, Dict[str, int]]
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""see KeyFetcher.get_keys"""
async def _fetch_keys(self, keys_to_fetch: List[_QueueValue]):
key_ids_to_fetch = (
(server_name, key_id)
for server_name, keys_for_server in keys_to_fetch.items()
for key_id in keys_for_server.keys()
(queue_value.server_name, key_id)
for queue_value in keys_to_fetch
for key_id in queue_value.key_ids
)
res = await self.store.get_server_verify_keys(key_ids_to_fetch)
@@ -500,6 +351,8 @@ class StoreKeyFetcher(KeyFetcher):
class BaseV2KeyFetcher(KeyFetcher):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.store = hs.get_datastore()
self.config = hs.config
@@ -607,10 +460,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
self.client = hs.get_federation_http_client()
self.key_servers = self.config.key_servers
async def get_keys(
self, keys_to_fetch: Dict[str, Dict[str, int]]
async def _fetch_keys(
self, keys_to_fetch: List[_QueueValue]
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""see KeyFetcher.get_keys"""
"""see KeyFetcher._fetch_keys"""
async def get_key(key_server: TrustedKeyServer) -> Dict:
try:
@@ -646,12 +499,12 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
return union_of_keys
async def get_server_verify_key_v2_indirect(
self, keys_to_fetch: Dict[str, Dict[str, int]], key_server: TrustedKeyServer
self, keys_to_fetch: List[_QueueValue], key_server: TrustedKeyServer
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""
Args:
keys_to_fetch:
the keys to be fetched. server_name -> key_id -> min_valid_ts
the keys to be fetched.
key_server: notary server to query for the keys
@@ -665,7 +518,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
perspective_name = key_server.server_name
logger.info(
"Requesting keys %s from notary server %s",
keys_to_fetch.items(),
keys_to_fetch,
perspective_name,
)
@@ -675,11 +528,13 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
path="/_matrix/key/v2/query",
data={
"server_keys": {
server_name: {
key_id: {"minimum_valid_until_ts": min_valid_ts}
for key_id, min_valid_ts in server_keys.items()
queue_value.server_name: {
key_id: {
"minimum_valid_until_ts": queue_value.minimum_valid_until_ts,
}
for key_id in queue_value.key_ids
}
for server_name, server_keys in keys_to_fetch.items()
for queue_value in keys_to_fetch
}
},
)
@@ -779,8 +634,8 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
self.clock = hs.get_clock()
self.client = hs.get_federation_http_client()
async def get_keys(
self, keys_to_fetch: Dict[str, Dict[str, int]]
async def _fetch_keys(
self, keys_to_fetch: List[_QueueValue]
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""
Args:
@@ -793,8 +648,10 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
results = {}
async def get_key(key_to_fetch_item: Tuple[str, Dict[str, int]]) -> None:
server_name, key_ids = key_to_fetch_item
async def get_key(key_to_fetch_item: _QueueValue) -> None:
server_name = key_to_fetch_item.server_name
key_ids = key_to_fetch_item.key_ids
try:
keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
results[server_name] = keys
@@ -805,7 +662,7 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
except Exception:
logger.exception("Error getting keys %s from %s", key_ids, server_name)
await yieldable_gather_results(get_key, keys_to_fetch.items())
await yieldable_gather_results(get_key, keys_to_fetch)
return results
async def get_server_verify_key_v2_direct(
@@ -877,37 +734,3 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
keys.update(response_keys)
return keys
async def _handle_key_deferred(verify_request: VerifyJsonRequest) -> None:
"""Waits for the key to become available, and then performs a verification
Args:
verify_request:
Raises:
SynapseError if there was a problem performing the verification
"""
server_name = verify_request.server_name
with PreserveLoggingContext():
_, key_id, verify_key = await verify_request.key_ready
json_object = verify_request.json_object
try:
verify_signed_json(json_object, server_name, verify_key)
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
server_name,
verify_key.alg,
verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s: %s"
% (server_name, verify_key.alg, verify_key.version, str(e)),
Codes.UNAUTHORIZED,
)

View File

@@ -14,14 +14,14 @@
# limitations under the License.
import logging
from typing import List, Optional, Set, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
@@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None:
too_big("type")
if len(event.event_id) > 255:
too_big("event_id")
if len(encode_canonical_json(event.get_pdu_json())) > 65536:
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
too_big("event")
@@ -418,7 +418,9 @@ def get_send_level(
def _can_send_event(event: EventBase, auth_events: StateMap[EventBase]) -> bool:
power_levels_event = _get_power_level_event(auth_events)
send_level = get_send_level(event.type, event.get("state_key"), power_levels_event)
send_level = get_send_level(
event.type, getattr(event, "state_key", None), power_levels_event
)
user_level = get_user_power_level(event.user_id, auth_events)
if user_level < send_level:
@@ -670,7 +672,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
public_key = public_key_object["public_key"]
try:
for server, signature_block in signed["signatures"].items():
for key_name, encoded_signature in signature_block.items():
for key_name in signature_block.keys():
if not key_name.startswith("ed25519:"):
continue
verify_key = decode_verify_key_bytes(
@@ -688,7 +690,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
return False
def get_public_keys(invite_event):
def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]:
public_keys = []
if "public_key" in invite_event.content:
o = {"public_key": invite_event.content["public_key"]}

View File

@@ -16,12 +16,15 @@
import abc
import os
from typing import Dict, Optional, Tuple, Type
import zlib
from typing import Dict, List, Optional, Tuple, Type, Union
from unpaddedbase64 import encode_base64
import attr
from unpaddedbase64 import decode_base64, encode_base64
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict, RoomStreamToken
from synapse.util import json_decoder, json_encoder
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
from synapse.util.stringutils import strtobool
@@ -37,6 +40,26 @@ from synapse.util.stringutils import strtobool
USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0"))
_PRESET_ZDICT = b"""{"auth_events":[],"prev_events":[],"type":"m.room.member",m.room.message"room_id":,"sender":,"content":{"msgtype":"m.text","body":""room_version":"creator":"depth":"prev_state":"state_key":""origin":"origin_server_ts":"hashes":{"sha256":"signatures":,"unsigned":{"age_ts":"ed25519"""
def _encode_dict(d: JsonDict) -> bytes:
json_bytes = json_encoder.encode(d).encode("utf-8")
c = zlib.compressobj(1, zdict=_PRESET_ZDICT)
result_bytes = c.compress(json_bytes)
result_bytes += c.flush()
return result_bytes
def _decode_dict(b: bytes) -> JsonDict:
d = zlib.decompressobj(zdict=_PRESET_ZDICT)
result_bytes = d.decompress(b)
result_bytes += d.flush()
return json_decoder.decode(result_bytes.decode("utf-8"))
class DictProperty:
"""An object property which delegates to the `_dict` within its parent object."""
@@ -205,7 +228,81 @@ class _EventInternalMetadata:
return self._dict.get("redacted", False)
@attr.s(slots=True, auto_attribs=True)
class _Signatures:
_signatures_bytes: bytes
@staticmethod
def from_dict(signature_dict: JsonDict) -> "_Signatures":
return _Signatures(_encode_dict(signature_dict))
def get_dict(self) -> JsonDict:
return _decode_dict(self._signatures_bytes)
def get(self, server_name, default=None):
return self.get_dict().get(server_name, default)
def update(self, other: Union[JsonDict, "_Signatures"]):
if isinstance(other, _Signatures):
other_dict = _decode_dict(other._signatures_bytes)
else:
other_dict = other
signatures = self.get_dict()
signatures.update(other_dict)
self._signatures_bytes = _encode_dict(signatures)
class _SmallListV1(str):
__slots__ = []
def get(self):
return self.split(",")
@staticmethod
def create(event_ids):
return _SmallListV1(",".join(event_ids))
class _SmallListV2_V3(bytes):
__slots__ = []
def get(self, url_safe):
i = 0
while i * 32 < len(self):
bit = self[i * 32 : (i + 1) * 32]
i += 1
yield "$" + encode_base64(bit, urlsafe=url_safe)
@staticmethod
def create(event_ids):
return _SmallListV2_V3(
b"".join(decode_base64(event_id[1:]) for event_id in event_ids)
)
class EventBase(metaclass=abc.ABCMeta):
__slots__ = [
"room_version",
"signatures",
"unsigned",
"rejected_reason",
"_encoded_dict",
"_auth_event_ids",
"depth",
"_content",
"_hashes",
"origin",
"origin_server_ts",
"_prev_event_ids",
"redacts",
"room_id",
"sender",
"type",
"state_key",
"internal_metadata",
]
@property
@abc.abstractmethod
def format_version(self) -> int:
@@ -224,32 +321,44 @@ class EventBase(metaclass=abc.ABCMeta):
assert room_version.event_format == self.format_version
self.room_version = room_version
self.signatures = signatures
self.signatures = _Signatures.from_dict(signatures)
self.unsigned = unsigned
self.rejected_reason = rejected_reason
self._dict = event_dict
self._encoded_dict = _encode_dict(event_dict)
self.depth = event_dict["depth"]
self.origin = event_dict.get("origin")
self.origin_server_ts = event_dict["origin_server_ts"]
self.redacts = event_dict.get("redacts")
self.room_id = event_dict["room_id"]
self.sender = event_dict["sender"]
self.type = event_dict["type"]
if "state_key" in event_dict:
self.state_key = event_dict["state_key"]
self.internal_metadata = _EventInternalMetadata(internal_metadata_dict)
auth_events = DictProperty("auth_events")
depth = DictProperty("depth")
content = DictProperty("content")
hashes = DictProperty("hashes")
origin = DictProperty("origin")
origin_server_ts = DictProperty("origin_server_ts")
prev_events = DictProperty("prev_events")
redacts = DefaultDictProperty("redacts", None)
room_id = DictProperty("room_id")
sender = DictProperty("sender")
state_key = DictProperty("state_key")
type = DictProperty("type")
user_id = DictProperty("sender")
@property
def content(self) -> JsonDict:
return self.get_dict()["content"]
@property
def hashes(self) -> JsonDict:
return self.get_dict()["hashes"]
@property
def prev_events(self) -> List[str]:
return list(self._prev_events)
@property
def event_id(self) -> str:
raise NotImplementedError()
@property
def user_id(self) -> str:
return self.sender
@property
def membership(self):
return self.content["membership"]
@@ -258,17 +367,13 @@ class EventBase(metaclass=abc.ABCMeta):
return hasattr(self, "state_key") and self.state_key is not None
def get_dict(self) -> JsonDict:
d = dict(self._dict)
d.update({"signatures": self.signatures, "unsigned": dict(self.unsigned)})
d = _decode_dict(self._encoded_dict)
d.update(
{"signatures": self.signatures.get_dict(), "unsigned": dict(self.unsigned)}
)
return d
def get(self, key, default=None):
return self._dict.get(key, default)
def get_internal_metadata_dict(self):
return self.internal_metadata.get_dict()
def get_pdu_json(self, time_now=None) -> JsonDict:
pdu_json = self.get_dict()
@@ -285,41 +390,11 @@ class EventBase(metaclass=abc.ABCMeta):
def __set__(self, instance, value):
raise AttributeError("Unrecognized attribute %s" % (instance,))
def __getitem__(self, field):
return self._dict[field]
def __contains__(self, field):
return field in self._dict
def items(self):
return list(self._dict.items())
def keys(self):
return self._dict.keys()
def prev_event_ids(self):
"""Returns the list of prev event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's prev_events
"""
return [e for e, _ in self.prev_events]
def auth_event_ids(self):
"""Returns the list of auth event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's auth_events
"""
return [e for e, _ in self.auth_events]
def freeze(self):
"""'Freeze' the event dict, so it cannot be modified by accident"""
# this will be a no-op if the event dict is already frozen.
self._dict = freeze(self._dict)
# self._dict = freeze(self._dict)
class FrozenEvent(EventBase):
@@ -355,6 +430,12 @@ class FrozenEvent(EventBase):
frozen_dict = event_dict
self._event_id = event_dict["event_id"]
self._auth_event_ids = _SmallListV1.create(
e for e, _ in event_dict["auth_events"]
)
self._prev_event_ids = _SmallListV1.create(
e for e, _ in event_dict["prev_events"]
)
super().__init__(
frozen_dict,
@@ -369,18 +450,26 @@ class FrozenEvent(EventBase):
def event_id(self) -> str:
return self._event_id
def auth_event_ids(self):
return list(self._auth_event_ids.get())
def prev_event_ids(self):
return list(self._prev_event_ids.get())
def __str__(self):
return self.__repr__()
def __repr__(self):
return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % (
self.get("event_id", None),
self.get("type", None),
self.get("state_key", None),
self.event_id,
self.type,
getattr(self, "state_key", None),
)
class FrozenEventV2(EventBase):
__slots__ = ["_event_id"]
format_version = EventFormatVersions.V2 # All events of this type are V2
def __init__(
@@ -415,6 +504,8 @@ class FrozenEventV2(EventBase):
frozen_dict = event_dict
self._event_id = None
self._auth_event_ids = _SmallListV2_V3.create(event_dict["auth_events"])
self._prev_event_ids = _SmallListV2_V3.create(event_dict["prev_events"])
super().__init__(
frozen_dict,
@@ -436,24 +527,6 @@ class FrozenEventV2(EventBase):
self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
return self._event_id
def prev_event_ids(self):
"""Returns the list of prev event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's prev_events
"""
return self.prev_events
def auth_event_ids(self):
"""Returns the list of auth event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's auth_events
"""
return self.auth_events
def __str__(self):
return self.__repr__()
@@ -461,14 +534,22 @@ class FrozenEventV2(EventBase):
return "<%s event_id=%r, type=%r, state_key=%r>" % (
self.__class__.__name__,
self.event_id,
self.get("type", None),
self.get("state_key", None),
self.type,
self.state_key if self.is_state() else None,
)
def auth_event_ids(self):
return list(self._auth_event_ids.get(False))
def prev_event_ids(self):
return list(self._prev_event_ids.get(False))
class FrozenEventV3(FrozenEventV2):
"""FrozenEventV3, which differs from FrozenEventV2 only in the event_id format"""
__slots__ = ["_event_id"]
format_version = EventFormatVersions.V3 # All events of this type are V3
@property
@@ -484,6 +565,12 @@ class FrozenEventV3(FrozenEventV2):
)
return self._event_id
def auth_event_ids(self):
return list(self._auth_event_ids.get(True))
def prev_event_ids(self):
return list(self._prev_event_ids.get(True))
def _event_type_from_format_version(format_version: int) -> Type[EventBase]:
"""Returns the python type to use to construct an Event object for the

View File

@@ -15,12 +15,12 @@
import inspect
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, Union
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
from synapse.spam_checker_api import RegistrationBehaviour
from synapse.types import Collection
from synapse.types import RoomAlias
from synapse.util.async_helpers import maybe_awaitable
if TYPE_CHECKING:
@@ -114,7 +114,9 @@ class SpamChecker:
return True
async def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool:
async def user_may_create_room_alias(
self, userid: str, room_alias: RoomAlias
) -> bool:
"""Checks if a given user may create a room alias
If this method returns false, the association request will be rejected.

View File

@@ -38,6 +38,8 @@ class EventValidator:
if event.format_version == EventFormatVersions.V1:
EventID.from_string(event.event_id)
event_dict = event.get_dict()
required = [
"auth_events",
"content",
@@ -49,7 +51,7 @@ class EventValidator:
]
for k in required:
if not hasattr(event, k):
if k not in event_dict:
raise SynapseError(400, "Event does not have key %s" % (k,))
# Check that the following keys have string values

View File

@@ -73,10 +73,10 @@ class FederationBase:
* throws a SynapseError if the signature check failed.
The deferreds run their callbacks in the sentinel
"""
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
ctx = current_context()
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
@defer.inlineCallbacks
def callback(_, pdu: EventBase):
with PreserveLoggingContext(ctx):
@@ -90,9 +90,7 @@ class FederationBase:
# received event was probably a redacted copy (but we then use our
# *actual* redacted copy to be on the safe side.)
redacted_event = prune_event(pdu)
if set(redacted_event.keys()) == set(pdu.keys()) and set(
redacted_event.content.keys()
) == set(pdu.content.keys()):
if set(redacted_event.content.keys()) == set(pdu.content.keys()):
logger.info(
"Event %s seems to have been redacted; using our redacted "
"copy",
@@ -137,11 +135,7 @@ class FederationBase:
return deferreds
class PduToCheckSig(
namedtuple(
"PduToCheckSig", ["pdu", "redacted_pdu_json", "sender_domain", "deferreds"]
)
):
class PduToCheckSig(namedtuple("PduToCheckSig", ["pdu", "sender_domain", "deferreds"])):
pass
@@ -184,7 +178,6 @@ def _check_sigs_on_pdus(
pdus_to_check = [
PduToCheckSig(
pdu=p,
redacted_pdu_json=prune_event(p).get_pdu_json(),
sender_domain=get_domain_from_id(p.sender),
deferreds=[],
)
@@ -195,13 +188,12 @@ def _check_sigs_on_pdus(
# (except if its a 3pid invite, in which case it may be sent by any server)
pdus_to_check_sender = [p for p in pdus_to_check if not _is_invite_via_3pid(p.pdu)]
more_deferreds = keyring.verify_json_objects_for_server(
more_deferreds = keyring.verify_events_for_server(
[
(
p.sender_domain,
p.redacted_pdu_json,
p.pdu,
p.pdu.origin_server_ts if room_version.enforce_key_validity else 0,
p.pdu.event_id,
)
for p in pdus_to_check_sender
]
@@ -230,13 +222,12 @@ def _check_sigs_on_pdus(
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
]
more_deferreds = keyring.verify_json_objects_for_server(
more_deferreds = keyring.verify_events_for_server(
[
(
get_domain_from_id(p.pdu.event_id),
p.redacted_pdu_json,
p.pdu,
p.pdu.origin_server_ts if room_version.enforce_key_validity else 0,
p.pdu.event_id,
)
for p in pdus_to_check_event_id
]

View File

@@ -33,6 +33,7 @@ from typing import (
)
import attr
import ijson
from prometheus_client import Counter
from twisted.internet import defer
@@ -55,11 +56,16 @@ from synapse.api.room_versions import (
)
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.logging.context import (
get_thread_resource_usage,
make_deferred_yieldable,
preserve_fn,
)
from synapse.logging.utils import log_function
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
if TYPE_CHECKING:
@@ -385,7 +391,6 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
deferreds = self._check_sigs_and_hashes(room_version, pdus)
async def handle_check_result(pdu: EventBase, deferred: Deferred):
try:
@@ -420,6 +425,7 @@ class FederationClient(FederationBase):
return res
handle = preserve_fn(handle_check_result)
deferreds = self._check_sigs_and_hashes(room_version, pdus)
deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
valid_pdus = await make_deferred_yieldable(
@@ -451,6 +457,28 @@ class FederationClient(FederationBase):
return signed_auth
def _is_unknown_endpoint(
self, e: HttpResponseException, synapse_error: Optional[SynapseError] = None
) -> bool:
"""
Returns true if the response was due to an endpoint being unimplemented.
Args:
e: The error response received from the remote server.
synapse_error: The above error converted to a SynapseError. This is
automatically generated if not provided.
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
# There is no good way to detect an "unknown" endpoint.
#
# Dendrite returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
return e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
)
async def _try_destination_list(
self,
description: str,
@@ -468,9 +496,9 @@ class FederationClient(FederationBase):
callback: Function to run for each server. Passed a single
argument: the server_name to try.
If the callback raises a CodeMessageException with a 300/400 code,
attempts to perform the operation stop immediately and the exception is
reraised.
If the callback raises a CodeMessageException with a 300/400 code or
an UnsupportedRoomVersionError, attempts to perform the operation
stop immediately and the exception is reraised.
Otherwise, if the callback raises an Exception the error is logged and the
next server tried. Normally the stacktrace is logged but this is
@@ -492,8 +520,7 @@ class FederationClient(FederationBase):
continue
try:
res = await callback(destination)
return res
return await callback(destination)
except InvalidResponseError as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
except UnsupportedRoomVersionError:
@@ -502,17 +529,15 @@ class FederationClient(FederationBase):
synapse_error = e.to_synapse_error()
failover = False
# Failover on an internal server error, or if the destination
# doesn't implemented the endpoint for some reason.
if 500 <= e.code < 600:
failover = True
elif failover_on_unknown_endpoint:
# there is no good way to detect an "unknown" endpoint. Dendrite
# returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
if e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
):
failover = True
elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
e, synapse_error
):
failover = True
if not failover:
raise synapse_error from e
@@ -570,9 +595,8 @@ class FederationClient(FederationBase):
UnsupportedRoomVersionError: if remote responds with
a room version we don't understand.
SynapseError: if the chosen remote server returns a 300/400 code.
RuntimeError: if no servers were reachable.
SynapseError: if the chosen remote server returns a 300/400 code, or
no servers successfully handle the request.
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
@@ -642,27 +666,44 @@ class FederationClient(FederationBase):
``auth_chain``.
Raises:
SynapseError: if the chosen remote server returns a 300/400 code.
RuntimeError: if no servers were reachable.
SynapseError: if the chosen remote server returns a 300/400 code, or
no servers successfully handle the request.
"""
async def send_request(destination) -> Dict[str, Any]:
content = await self._do_send_join(destination, pdu)
logger.debug("Got content: %s", content)
# logger.debug("Got content: %s", content.getvalue())
state = [
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
]
# logger.info("send_join content: %d", len(content))
auth_chain = [
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
]
content.seek(0)
pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
r = get_thread_resource_usage()
logger.info("Memory before state: %s", r.ru_maxrss)
state = []
for i, p in enumerate(ijson.items(content, "state.item")):
state.append(event_from_pdu_json(p, room_version, outlier=True))
if i % 1000 == 999:
await self._clock.sleep(0)
r = get_thread_resource_usage()
logger.info("Memory after state: %s", r.ru_maxrss)
logger.info("Parsed state: %d", len(state))
content.seek(0)
auth_chain = []
for i, p in enumerate(ijson.items(content, "auth_chain.item")):
auth_chain.append(event_from_pdu_json(p, room_version, outlier=True))
if i % 1000 == 999:
await self._clock.sleep(0)
r = get_thread_resource_usage()
logger.info("Memory after: %s", r.ru_maxrss)
logger.info("Parsed auth chain: %d", len(auth_chain))
create_event = None
for e in state:
@@ -673,7 +714,7 @@ class FederationClient(FederationBase):
if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
raise InvalidResponseError("No create event in state")
# the room version should be sane.
create_room_version = create_event.content.get(
@@ -687,12 +728,19 @@ class FederationClient(FederationBase):
% (create_room_version,)
)
valid_pdus = await self._check_sigs_and_hash_and_fetch(
destination,
list(pdus.values()),
outlier=True,
room_version=room_version,
)
valid_pdus = []
for chunk in batch_iter(itertools.chain(state, auth_chain), 1000):
logger.info("Handling next _check_sigs_and_hash_and_fetch chunk")
new_valid_pdus = await self._check_sigs_and_hash_and_fetch(
destination,
chunk,
outlier=True,
room_version=room_version,
)
valid_pdus.extend(new_valid_pdus)
logger.info("_check_sigs_and_hash_and_fetch done")
valid_pdus_map = {p.event_id: p for p in valid_pdus}
@@ -727,6 +775,8 @@ class FederationClient(FederationBase):
% (auth_chain_create_events,)
)
logger.info("Returning from send_join")
return {
"state": signed_state,
"auth_chain": signed_auth,
@@ -746,16 +796,13 @@ class FederationClient(FederationBase):
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise consider it a legitmate error
# and raise.
if not self._is_unknown_endpoint(e):
raise
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
raise NotImplementedError()
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
@@ -802,6 +849,11 @@ class FederationClient(FederationBase):
Returns:
The event as a dict as returned by the remote server
Raises:
SynapseError: if the remote server returns an error or if the server
only supports the v1 endpoint and a room version other than "1"
or "2" is requested.
"""
time_now = self._clock.time_msec()
@@ -817,28 +869,19 @@ class FederationClient(FederationBase):
},
)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, we
# assume that the remote understands the v2 invite API and this
# is a legitimate error.
if err.errcode != Codes.UNKNOWN:
raise err
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API. That's ok provided the room uses old-style event
# IDs.
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint if the room uses old-style event IDs.
# Otherwise consider it a legitmate error and raise.
err = e.to_synapse_error()
if self._is_unknown_endpoint(e, err):
if room_version.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
elif e.code in (403, 429):
raise e.to_synapse_error()
else:
raise
raise err
# Didn't work, try v1 API.
# Note the v1 API returns a tuple of `(200, content)`
@@ -865,9 +908,8 @@ class FederationClient(FederationBase):
pdu: event to be sent
Raises:
SynapseError if the chosen remote server returns a 300/400 code.
RuntimeError if no servers were reachable.
SynapseError: if the chosen remote server returns a 300/400 code, or
no servers successfully handle the request.
"""
async def send_request(destination: str) -> None:
@@ -889,16 +931,11 @@ class FederationClient(FederationBase):
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise consider it a legitmate error
# and raise.
if not self._is_unknown_endpoint(e):
raise
logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")

View File

@@ -76,9 +76,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
# Stream position -> list[user_id]
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
@@ -96,7 +93,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
self.edus = SortedDict() # type: SortedDict[int, Edu]
# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
# stream ID for the next entry into keyed_edu_changed/edus.
self.pos = 1
# map from stream ID to the time that stream entry was generated, so that we
@@ -117,7 +114,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
for queue_name in [
"presence_map",
"presence_changed",
"keyed_edu",
"keyed_edu_changed",
"edus",
@@ -155,23 +151,12 @@ class FederationRemoteSendQueue(AbstractFederationSender):
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
user_ids = {
user_id for uids in self.presence_changed.values() for user_id in uids
}
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]
user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)
user_ids = {user_id for user_id, _ in self.presence_destinations.values()}
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
@@ -244,23 +229,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
"""
# nothing to do here: the replication listener will handle it.
def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Args:
states
"""
pos = self._next_pos()
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = [s for s in states if self.is_mine_id(s.user_id)]
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -325,18 +293,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# of the federation stream.
rows = [] # type: List[Tuple[int, BaseFederationRow]]
# Fetch changed presence
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(state=self.presence_map[user_id])))
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
@@ -427,22 +383,6 @@ class BaseFederationRow:
raise NotImplementedError()
class PresenceRow(
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
):
TypeId = "p"
@staticmethod
def from_data(data):
return PresenceRow(state=UserPresenceState.from_dict(data))
def to_data(self):
return self.state.as_dict()
def add_to_buffer(self, buff):
buff.presence.append(self.state)
class PresenceDestinationsRow(
BaseFederationRow,
namedtuple(
@@ -506,7 +446,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
_rowtypes = (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
@@ -518,7 +457,6 @@ TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
ParsedFederationStreamData = namedtuple(
"ParsedFederationStreamData",
(
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
@@ -543,7 +481,6 @@ def process_rows_for_federation(
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
@@ -559,18 +496,15 @@ def process_rows_for_federation(
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
if buff.presence:
transaction_queue.send_presence(buff.presence)
for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
)
for destination, edu_map in buff.keyed_edus.items():
for edu_map in buff.keyed_edus.values():
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
for destination, edu_list in buff.edus.items():
for edu_list in buff.edus.values():
for edu in edu_list:
transaction_queue.send_edu(edu, None)

View File

@@ -18,14 +18,15 @@ from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set,
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
from synapse.logging.context import preserve_fn
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -33,8 +34,8 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
@@ -79,15 +80,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
@abc.abstractmethod
def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
raise NotImplementedError()
@abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
@@ -176,11 +168,6 @@ class FederationSender(AbstractFederationSender):
),
)
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {} # type: Dict[str, UserPresenceState]
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
@@ -201,8 +188,6 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
@@ -270,27 +255,15 @@ class FederationSender(AbstractFederationSender):
if not events and next_token >= self._last_poked_id:
break
async def get_destinations_for_event(
event: EventBase,
) -> Collection[str]:
"""Computes the destinations to which this event must be sent.
This returns an empty tuple when there are no destinations to send to,
or if this event is not from this homeserver and it is not sending
it on behalf of another server.
Will also filter out destinations which this sender is not responsible for,
if multiple federation senders exist.
"""
async def handle_event(event: EventBase) -> None:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return ()
return
if not event.internal_metadata.should_proactively_send():
return ()
return
destinations = None # type: Optional[Set[str]]
if not event.prev_event_ids():
@@ -325,7 +298,7 @@ class FederationSender(AbstractFederationSender):
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return ()
return
destinations = {
d
@@ -335,15 +308,17 @@ class FederationSender(AbstractFederationSender):
)
}
destinations.discard(self.server_name)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
if destinations:
await self._send_pdu(event, destinations)
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -351,29 +326,24 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).observe((now - ts) / 1000)
return destinations
return ()
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
for event in events:
await handle_event(event)
async def get_federatable_events_and_destinations(
events: Iterable[EventBase],
) -> List[Tuple[EventBase, Collection[str]]]:
with Measure(self.clock, "get_destinations_for_events"):
# Fetch federation destinations per event,
# skip if get_destinations_for_event returns an empty collection,
# return list of event->destinations pairs.
return [
(event, dests)
for (event, dests) in [
(event, await get_destinations_for_event(event))
for event in events
]
if dests
]
events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
events_and_dests = await get_federatable_events_and_destinations(events)
# Send corresponding events to each destination queue
await self._distribute_events(events_and_dests)
await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
for evs in events_by_room.values()
],
consumeErrors=True,
)
)
await self.store.update_federation_out_pos("events", next_token)
@@ -391,7 +361,7 @@ class FederationSender(AbstractFederationSender):
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels("federation_sender").inc(
len({event.room_id for event in events})
len(events_by_room)
)
event_processing_loop_counter.labels("federation_sender").inc()
@@ -403,53 +373,34 @@ class FederationSender(AbstractFederationSender):
finally:
self._is_processing = False
async def _distribute_events(
self,
events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
) -> None:
"""Distribute events to the respective per_destination queues.
async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
Also persists last-seen per-room stream_ordering to 'destination_rooms'.
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
Args:
events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
Every event is paired with its intended destinations (in federation).
"""
# Tuples of room_id + destination to their max-seen stream_ordering
room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]
if not destinations:
return
# List of events to send to each destination
events_by_dest = {} # type: Dict[str, List[EventBase]]
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
# For each event-destinations pair...
for event, destinations in events_and_dests:
assert pdu.internal_metadata.stream_ordering
# (we got this from the database, it's filled)
assert event.internal_metadata.stream_ordering
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
# ...iterate over those destinations..
for destination in destinations:
# ...update their stream-ordering...
room_with_dest_stream_ordering[(event.room_id, destination)] = max(
event.internal_metadata.stream_ordering,
room_with_dest_stream_ordering.get((event.room_id, destination), 0),
)
# ...and add the event to each destination queue.
events_by_dest.setdefault(destination, []).append(event)
# Bulk-store destination_rooms stream_ids
await self.store.bulk_store_destination_rooms_entries(
room_with_dest_stream_ordering
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
pdu.internal_metadata.stream_ordering,
)
for destination, pdus in events_by_dest.items():
logger.debug("Sending %d pdus to %s", len(pdus), destination)
self._get_per_destination_queue(destination).send_pdus(pdus)
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
@@ -546,48 +497,6 @@ class FederationSender(AbstractFederationSender):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update(
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
)
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
@@ -599,6 +508,10 @@ class FederationSender(AbstractFederationSender):
# No-op if presence is disabled.
return
# Ensure we only send out presence states for local users.
for state in states:
assert self.is_mine_id(state.user_id)
for destination in destinations:
if destination == self.server_name:
continue
@@ -608,40 +521,6 @@ class FederationSender(AbstractFederationSender):
continue
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
# We pull the presence router here instead of __init__
# to prevent a dependency cycle:
#
# AuthHandler -> Notifier -> FederationSender
# -> PresenceRouter -> ModuleApi -> AuthHandler
if self._presence_router is None:
self._presence_router = self.hs.get_presence_router()
assert self._presence_router is not None
hosts_and_states = await get_interested_remotes(
self.store,
self._presence_router,
states,
self.state,
)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
continue
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(
self,
destination: str,

View File

@@ -154,22 +154,19 @@ class PerDestinationQueue:
+ len(self._pending_edus_keyed)
)
def send_pdus(self, pdus: Iterable[EventBase]) -> None:
"""Add PDUs to the queue, and start the transmission loop if necessary
def send_pdu(self, pdu: EventBase) -> None:
"""Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdus: pdus to send
pdu: pdu to send
"""
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
self._pending_pdus.extend(pdus)
self._pending_pdus.append(pdu)
else:
self._catchup_last_skipped = max(
pdu.internal_metadata.stream_ordering
for pdu in pdus
if pdu.internal_metadata.stream_ordering is not None
)
assert pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
self.attempt_new_transaction()

View File

@@ -244,7 +244,10 @@ class TransportLayerClient:
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
destination=destination, path=path, data=content
destination=destination,
path=path,
data=content,
return_string_io=True,
)
return response
@@ -254,7 +257,10 @@ class TransportLayerClient:
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
destination=destination, path=path, data=content
destination=destination,
path=path,
data=content,
return_string_io=True,
)
return response

View File

@@ -17,7 +17,7 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, List, Optional, Tuple
from synapse.api.errors import StoreError, SynapseError
from synapse.logging.context import make_deferred_yieldable
@@ -39,28 +39,44 @@ class AccountValidityHandler:
self.sendmail = self.hs.get_sendmail()
self.clock = self.hs.get_clock()
self._account_validity = self.hs.config.account_validity
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
self._account_validity_renew_by_email_enabled = (
hs.config.account_validity.account_validity_renew_by_email_enabled
)
self._account_validity_period = None
if self._account_validity_enabled:
self._account_validity_period = (
hs.config.account_validity.account_validity_period
)
if (
self._account_validity.enabled
and self._account_validity.renew_by_email_enabled
self._account_validity_enabled
and self._account_validity_renew_by_email_enabled
):
# Don't do email-specific configuration if renewal by email is disabled.
self._template_html = self.config.account_validity_template_html
self._template_text = self.config.account_validity_template_text
self._template_html = (
hs.config.account_validity.account_validity_template_html
)
self._template_text = (
hs.config.account_validity.account_validity_template_text
)
account_validity_renew_email_subject = (
hs.config.account_validity.account_validity_renew_email_subject
)
try:
app_name = self.hs.config.email_app_name
app_name = hs.config.email_app_name
self._subject = self._account_validity.renew_email_subject % {
"app": app_name
}
self._subject = account_validity_renew_email_subject % {"app": app_name}
self._from_string = self.hs.config.email_notif_from % {"app": app_name}
self._from_string = hs.config.email_notif_from % {"app": app_name}
except Exception:
# If substitution failed, fall back to the bare strings.
self._subject = self._account_validity.renew_email_subject
self._from_string = self.hs.config.email_notif_from
self._subject = account_validity_renew_email_subject
self._from_string = hs.config.email_notif_from
self._raw_from = email.utils.parseaddr(self._from_string)[1]
@@ -220,50 +236,87 @@ class AccountValidityHandler:
attempts += 1
raise StoreError(500, "Couldn't generate a unique string as refresh string.")
async def renew_account(self, renewal_token: str) -> bool:
async def renew_account(self, renewal_token: str) -> Tuple[bool, bool, int]:
"""Renews the account attached to a given renewal token by pushing back the
expiration date by the current validity period in the server's configuration.
If it turns out that the token is valid but has already been used, then the
token is considered stale. A token is stale if the 'token_used_ts_ms' db column
is non-null.
Args:
renewal_token: Token sent with the renewal request.
Returns:
Whether the provided token is valid.
A tuple containing:
* A bool representing whether the token is valid and unused.
* A bool which is `True` if the token is valid, but stale.
* An int representing the user's expiry timestamp as milliseconds since the
epoch, or 0 if the token was invalid.
"""
try:
user_id = await self.store.get_user_from_renewal_token(renewal_token)
(
user_id,
current_expiration_ts,
token_used_ts,
) = await self.store.get_user_from_renewal_token(renewal_token)
except StoreError:
return False
return False, False, 0
# Check whether this token has already been used.
if token_used_ts:
logger.info(
"User '%s' attempted to use previously used token '%s' to renew account",
user_id,
renewal_token,
)
return False, True, current_expiration_ts
logger.debug("Renewing an account for user %s", user_id)
await self.renew_account_for_user(user_id)
return True
# Renew the account. Pass the renewal_token here so that it is not cleared.
# We want to keep the token around in case the user attempts to renew their
# account with the same token twice (clicking the email link twice).
#
# In that case, the token will be accepted, but the account's expiration ts
# will remain unchanged.
new_expiration_ts = await self.renew_account_for_user(
user_id, renewal_token=renewal_token
)
return True, False, new_expiration_ts
async def renew_account_for_user(
self,
user_id: str,
expiration_ts: Optional[int] = None,
email_sent: bool = False,
renewal_token: Optional[str] = None,
) -> int:
"""Renews the account attached to a given user by pushing back the
expiration date by the current validity period in the server's
configuration.
Args:
renewal_token: Token sent with the renewal request.
user_id: The ID of the user to renew.
expiration_ts: New expiration date. Defaults to now + validity period.
email_sen: Whether an email has been sent for this validity period.
Defaults to False.
email_sent: Whether an email has been sent for this validity period.
renewal_token: Token sent with the renewal request. The user's token
will be cleared if this is None.
Returns:
New expiration date for this account, as a timestamp in
milliseconds since epoch.
"""
now = self.clock.time_msec()
if expiration_ts is None:
expiration_ts = self.clock.time_msec() + self._account_validity.period
expiration_ts = now + self._account_validity_period
await self.store.set_account_validity_for_user(
user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
user_id=user_id,
expiration_ts=expiration_ts,
email_sent=email_sent,
renewal_token=renewal_token,
token_used_ts=now,
)
return expiration_ts

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Union
from prometheus_client import Counter
@@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.metrics import Measure
if TYPE_CHECKING:

View File

@@ -1248,7 +1248,7 @@ class AuthHandler(BaseHandler):
# see if any of our auth providers want to know about this
for provider in self.password_providers:
for token, token_id, device_id in tokens_and_devices:
for token, _, device_id in tokens_and_devices:
await provider.on_logged_out(
user_id=user_id, device_id=device_id, access_token=token
)

View File

@@ -49,7 +49,9 @@ class DeactivateAccountHandler(BaseHandler):
if hs.config.run_background_tasks:
hs.get_reactor().callWhenRunning(self._start_user_parting)
self._account_validity_enabled = hs.config.account_validity.enabled
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
async def deactivate_account(
self,

View File

@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api import errors
from synapse.api.constants import EventTypes
@@ -28,7 +28,6 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
Collection,
JsonDict,
StreamToken,
UserID,
@@ -156,8 +155,7 @@ class DeviceWorkerHandler(BaseHandler):
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
for key, event_id in current_state_ids.items():
etype, state_key = key
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
@@ -179,8 +177,7 @@ class DeviceWorkerHandler(BaseHandler):
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in current_state_ids.items():
etype, state_key = key
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
@@ -198,8 +195,7 @@ class DeviceWorkerHandler(BaseHandler):
for state_dict in prev_state_ids.values():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for key, event_id in current_state_ids.items():
etype, state_key = key
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
@@ -714,7 +710,7 @@ class DeviceListUpdater:
# This can happen since we batch updates
return
for device_id, stream_id, prev_ids, content in pending_updates:
for device_id, stream_id, prev_ids, _ in pending_updates:
logger.debug(
"Handling update %r/%r, ID: %r, prev: %r ",
user_id,
@@ -740,7 +736,7 @@ class DeviceListUpdater:
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
for device_id, stream_id, prev_ids, content in pending_updates:
for device_id, stream_id, _, content in pending_updates:
await self.store.update_remote_device_list_cache_entry(
user_id, device_id, content, stream_id
)
@@ -929,6 +925,10 @@ class DeviceListUpdater:
else:
cached_devices = await self.store.get_cached_devices_for_user(user_id)
if cached_devices == {d["device_id"]: d for d in devices}:
logging.info(
"Skipping device list resync for %s, as our cache matches already",
user_id,
)
devices = []
ignore_devices = True
@@ -944,6 +944,9 @@ class DeviceListUpdater:
await self.store.update_remote_device_list_cache(
user_id, devices, stream_id
)
# mark the cache as valid, whether or not we actually processed any device
# list updates.
await self.store.mark_remote_user_device_cache_as_valid(user_id)
device_ids = [device["device_id"] for device in devices]
# Handle cross-signing keys.

View File

@@ -14,7 +14,7 @@
import logging
import string
from typing import Iterable, List, Optional
from typing import TYPE_CHECKING, Iterable, List, Optional
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
@@ -27,15 +27,19 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.appservice import ApplicationService
from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class DirectoryHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.state = hs.get_state_handler()
@@ -60,7 +64,7 @@ class DirectoryHandler(BaseHandler):
room_id: str,
servers: Optional[Iterable[str]] = None,
creator: Optional[str] = None,
):
) -> None:
# general association creation for both human users and app services
for wchar in string.whitespace:
@@ -74,7 +78,7 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}
if not servers:
@@ -104,8 +108,9 @@ class DirectoryHandler(BaseHandler):
"""
user_id = requester.user.to_string()
room_alias_str = room_alias.to_string()
if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
if len(room_alias_str) > MAX_ALIAS_LENGTH:
raise SynapseError(
400,
"Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
@@ -114,7 +119,7 @@ class DirectoryHandler(BaseHandler):
service = requester.app_service
if service:
if not service.is_interested_in_alias(room_alias.to_string()):
if not service.is_interested_in_alias(room_alias_str):
raise SynapseError(
400,
"This application service has not reserved this kind of alias.",
@@ -138,7 +143,7 @@ class DirectoryHandler(BaseHandler):
raise AuthError(403, "This user is not permitted to create this alias")
if not self.config.is_alias_creation_allowed(
user_id, room_id, room_alias.to_string()
user_id, room_id, room_alias_str
):
# Lets just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
@@ -211,7 +216,7 @@ class DirectoryHandler(BaseHandler):
async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
):
) -> None:
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
@@ -220,7 +225,7 @@ class DirectoryHandler(BaseHandler):
)
await self._delete_association(room_alias)
async def _delete_association(self, room_alias: RoomAlias):
async def _delete_association(self, room_alias: RoomAlias) -> str:
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
@@ -228,17 +233,19 @@ class DirectoryHandler(BaseHandler):
return room_id
async def get_association(self, room_alias: RoomAlias):
async def get_association(self, room_alias: RoomAlias) -> JsonDict:
room_id = None
if self.hs.is_mine(room_alias):
result = await self.get_association_from_room_alias(room_alias)
result = await self.get_association_from_room_alias(
room_alias
) # type: Optional[RoomAliasMapping]
if result:
room_id = result.room_id
servers = result.servers
else:
try:
result = await self.federation.make_query(
fed_result = await self.federation.make_query(
destination=room_alias.domain,
query_type="directory",
args={"room_alias": room_alias.to_string()},
@@ -248,13 +255,13 @@ class DirectoryHandler(BaseHandler):
except CodeMessageException as e:
logging.warning("Error retrieving alias")
if e.code == 404:
result = None
fed_result = None
else:
raise
if result and "room_id" in result and "servers" in result:
room_id = result["room_id"]
servers = result["servers"]
if fed_result and "room_id" in fed_result and "servers" in fed_result:
room_id = fed_result["room_id"]
servers = fed_result["servers"]
if not room_id:
raise SynapseError(
@@ -263,7 +270,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND,
)
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)
@@ -275,7 +282,7 @@ class DirectoryHandler(BaseHandler):
return {"room_id": room_id, "servers": servers}
async def on_directory_query(self, args):
async def on_directory_query(self, args: JsonDict) -> JsonDict:
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
@@ -293,7 +300,7 @@ class DirectoryHandler(BaseHandler):
async def _update_canonical_alias(
self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias
):
) -> None:
"""
Send an updated canonical alias event if the removed alias was set as
the canonical alias or listed in the alt_aliases field.
@@ -344,7 +351,9 @@ class DirectoryHandler(BaseHandler):
ratelimit=False,
)
async def get_association_from_room_alias(self, room_alias: RoomAlias):
async def get_association_from_room_alias(
self, room_alias: RoomAlias
) -> Optional[RoomAliasMapping]:
result = await self.store.get_association_from_room_alias(room_alias)
if not result:
# Query AS to see if it exists
@@ -372,7 +381,7 @@ class DirectoryHandler(BaseHandler):
# either no interested services, or no service with an exclusive lock
return True
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
"""Determine whether a user can delete an alias.
One of the following must be true:
@@ -394,14 +403,13 @@ class DirectoryHandler(BaseHandler):
if not room_id:
return False
res = await self.auth.check_can_change_room_list(
return await self.auth.check_can_change_room_list(
room_id, UserID.from_string(user_id)
)
return res
async def edit_published_room_list(
self, requester: Requester, room_id: str, visibility: str
):
) -> None:
"""Edit the entry of the room in the published room list.
requester
@@ -469,7 +477,7 @@ class DirectoryHandler(BaseHandler):
async def edit_published_appservice_room_list(
self, appservice_id: str, network_id: str, room_id: str, visibility: str
):
) -> None:
"""Add or remove a room from the appservice/network specific public
room list.
@@ -499,5 +507,4 @@ class DirectoryHandler(BaseHandler):
room_id, requester.user.to_string()
)
aliases = await self.store.get_aliases_for_room(room_id)
return aliases
return await self.store.get_aliases_for_room(room_id)

View File

@@ -0,0 +1,86 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.room_versions import RoomVersion
from synapse.types import StateMap
if TYPE_CHECKING:
from synapse.server import HomeServer
class EventAuthHandler:
"""
This class contains methods for authenticating events added to room graphs.
"""
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
async def can_join_without_invite(
self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str
) -> bool:
"""
Check whether a user can join a room without an invite.
When joining a room with restricted joined rules (as defined in MSC3083),
the membership of spaces must be checked during join.
Args:
state_ids: The state of the room as it currently is.
room_version: The room version of the room being joined.
user_id: The user joining the room.
Returns:
True if the user can join the room, false otherwise.
"""
# This only applies to room versions which support the new join rule.
if not room_version.msc3083_join_rules:
return True
# If there's no join rule, then it defaults to invite (so this doesn't apply).
join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
if not join_rules_event_id:
return True
# If the join rule is not restricted, this doesn't apply.
join_rules_event = await self._store.get_event(join_rules_event_id)
if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED:
return True
# If allowed is of the wrong form, then only allow invited users.
allowed_spaces = join_rules_event.content.get("allow", [])
if not isinstance(allowed_spaces, list):
return False
# Get the list of joined rooms and see if there's an overlap.
joined_rooms = await self._store.get_rooms_for_user(user_id)
# Pull out the other room IDs, invalid data gets filtered.
for space in allowed_spaces:
if not isinstance(space, dict):
continue
space_id = space.get("space")
if not isinstance(space_id, str):
continue
# The user was joined to one of the spaces specified, they can join
# this room!
if space_id in joined_rooms:
return True
# The user was not in any of the required spaces.
return False

View File

@@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler):
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
users = await self.store.get_users_in_room(
event.room_id
) # type: Iterable[str]
else:

View File

@@ -146,6 +146,7 @@ class FederationHandler(BaseHandler):
self.is_mine_id = hs.is_mine_id
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._message_handler = hs.get_message_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
@@ -551,7 +552,7 @@ class FederationHandler(BaseHandler):
destination: str,
room_id: str,
event_id: str,
) -> Tuple[List[EventBase], List[EventBase]]:
) -> List[EventBase]:
"""Requests all of the room state at a given event from a remote homeserver.
Args:
@@ -572,11 +573,10 @@ class FederationHandler(BaseHandler):
desired_events = set(state_event_ids + auth_event_ids)
event_map = await self._get_events_from_store_or_dest(
failed_to_fetch = await self._get_events_from_store_or_dest(
destination, room_id, desired_events
)
failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s %s",
@@ -584,55 +584,12 @@ class FederationHandler(BaseHandler):
failed_to_fetch,
)
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
remote_state = [
event_map[e_id] for e_id in state_event_ids if e_id in event_map
]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
auth_chain.sort(key=lambda e: e.depth)
return remote_state, auth_chain
async def _get_events_from_store_or_dest(
self, destination: str, room_id: str, event_ids: Iterable[str]
) -> Dict[str, EventBase]:
"""Fetch events from a remote destination, checking if we already have them.
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
This function *does not* automatically get missing auth events of the
newly fetched events. Callers must include the full auth chain of
of the missing events in the `event_ids` argument, to ensure that any
missing auth events are correctly fetched.
Returns:
map from event_id to event
"""
fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
missing_events = set(event_ids) - fetched_events.keys()
if missing_events:
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
room_id,
)
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(await self.store.get_events(missing_events, allow_rejected=True))
)
# check for events which were in the wrong room.
#
# this can happen if a remote server claims that the state or
@@ -640,7 +597,7 @@ class FederationHandler(BaseHandler):
bad_events = [
(event_id, event.room_id)
for event_id, event in fetched_events.items()
for idx, event in enumerate(remote_state)
if event.room_id != room_id
]
@@ -657,9 +614,49 @@ class FederationHandler(BaseHandler):
room_id,
)
del fetched_events[bad_event_id]
if bad_events:
remote_state = [e for e in remote_state if e.room_id == room_id]
return fetched_events
return remote_state
async def _get_events_from_store_or_dest(
self, destination: str, room_id: str, event_ids: Iterable[str]
) -> Set[str]:
"""Fetch events from a remote destination, checking if we already have them.
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
This function *does not* automatically get missing auth events of the
newly fetched events. Callers must include the full auth chain of
of the missing events in the `event_ids` argument, to ensure that any
missing auth events are correctly fetched.
Returns:
map from event_id to event
"""
have_events = await self.store.have_seen_events(event_ids)
missing_events = set(event_ids) - have_events
if not missing_events:
return set()
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
room_id,
)
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)
new_events = await self.store.have_seen_events(missing_events)
return missing_events - new_events
async def _get_state_after_missing_prev_event(
self,
@@ -962,27 +959,23 @@ class FederationHandler(BaseHandler):
# For each edge get the current state.
auth_events = {}
state_events = {}
events_to_state = {}
for e_id in edges:
state, auth = await self._get_state_for_room(
state = await self._get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id,
)
auth_events.update({a.event_id: a for a in auth})
auth_events.update({s.event_id: s for s in state})
state_events.update({s.event_id: s for s in state})
events_to_state[e_id] = state
required_auth = {
a_id
for event in events
+ list(state_events.values())
+ list(auth_events.values())
for event in events + list(state_events.values())
for a_id in event.auth_event_ids()
}
auth_events = await self.store.get_events(required_auth, allow_rejected=True)
auth_events.update(
{e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
)
@@ -1451,7 +1444,7 @@ class FederationHandler(BaseHandler):
# room stuff after join currently doesn't work on workers.
assert self.config.worker.worker_app is None
logger.debug("Joining %s to %s", joinee, room_id)
logger.info("Joining %s to %s", joinee, room_id)
origin, event, room_version_obj = await self._make_and_verify_event(
target_hosts,
@@ -1462,6 +1455,8 @@ class FederationHandler(BaseHandler):
params={"ver": KNOWN_ROOM_VERSIONS},
)
logger.info("make_join done from %s", origin)
# This shouldn't happen, because the RoomMemberHandler has a
# linearizer lock which only allows one operation per user per room
# at a time - so this is just paranoia.
@@ -1481,10 +1476,13 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
logger.info("Sending join")
ret = await self.federation_client.send_join(
host_list, event, room_version_obj
)
logger.info("send join done")
origin = ret["origin"]
state = ret["state"]
auth_chain = ret["auth_chain"]
@@ -1509,10 +1507,14 @@ class FederationHandler(BaseHandler):
room_version=room_version_obj,
)
logger.info("Persisting auth true")
max_stream_id = await self._persist_auth_tree(
origin, room_id, auth_chain, state, event, room_version_obj
)
logger.info("Persisted auth true")
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
await self._replication.wait_for_stream_position(
@@ -1673,8 +1675,40 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
# Calculate the event context.
context = await self.state_handler.compute_event_context(event)
context = await self._auth_and_persist_event(origin, event, context)
# Get the state before the new event.
prev_state_ids = await context.get_prev_state_ids()
# Check if the user is already in the room or invited to the room.
user_id = event.state_key
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
newly_joined = True
user_is_invited = False
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
user_is_invited = prev_member_event.membership == Membership.INVITE
# If the member is not already in the room, and not invited, check if
# they should be allowed access via membership in a space.
if (
newly_joined
and not user_is_invited
and not await self._event_auth_handler.can_join_without_invite(
prev_state_ids,
event.room_version,
user_id,
)
):
raise AuthError(
403,
"You do not belong to any of the required spaces to join this room.",
)
# Persist the event.
await self._auth_and_persist_event(origin, event, context)
logger.debug(
"on_send_join_request: After _auth_and_persist_event: %s, sigs: %s",
@@ -1682,8 +1716,6 @@ class FederationHandler(BaseHandler):
event.signatures,
)
prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
auth_chain = await self.store.get_auth_chain(event.room_id, state_ids)
@@ -2006,7 +2038,7 @@ class FederationHandler(BaseHandler):
state: Optional[Iterable[EventBase]] = None,
auth_events: Optional[MutableStateMap[EventBase]] = None,
backfilled: bool = False,
) -> EventContext:
) -> None:
"""
Process an event by performing auth checks and then persisting to the database.
@@ -2028,9 +2060,6 @@ class FederationHandler(BaseHandler):
event is an outlier), may be the auth events claimed by the remote
server.
backfilled: True if the event was backfilled.
Returns:
The event context.
"""
context = await self._check_event_auth(
origin,
@@ -2060,8 +2089,6 @@ class FederationHandler(BaseHandler):
)
raise
return context
async def _auth_and_persist_events(
self,
origin: str,
@@ -2140,6 +2167,8 @@ class FederationHandler(BaseHandler):
ctx = await self.state_handler.compute_event_context(e)
events_to_context[e.event_id] = ctx
logger.info("Computed contexts")
event_map = {
e.event_id: e for e in itertools.chain(auth_events, state, [event])
}
@@ -2181,6 +2210,8 @@ class FederationHandler(BaseHandler):
else:
logger.info("Failed to find auth event %r", e_id)
logger.info("Got missing events")
for e in itertools.chain(auth_events, state, [event]):
auth_for_e = {
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
@@ -2205,6 +2236,8 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
logger.info("Authed events")
await self.persist_events_and_notify(
room_id,
[
@@ -2213,10 +2246,14 @@ class FederationHandler(BaseHandler):
],
)
logger.info("Persisted events")
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)
logger.info("Computed context")
return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)
@@ -2956,7 +2993,7 @@ class FederationHandler(BaseHandler):
try:
# for each sig on the third_party_invite block of the actual invite
for server, signature_block in signed["signatures"].items():
for key_name, encoded_signature in signature_block.items():
for key_name in signature_block.keys():
if not key_name.startswith("ed25519:"):
continue

View File

@@ -15,10 +15,9 @@
# limitations under the License.
"""Utilities for interacting with Identity Servers"""
import logging
import urllib.parse
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple
from synapse.api.errors import (
CodeMessageException,
@@ -34,17 +33,24 @@ from synapse.http.site import SynapseRequest
from synapse.types import JsonDict, Requester
from synapse.util import json_decoder
from synapse.util.hash import sha256_and_url_safe_base64
from synapse.util.stringutils import assert_valid_client_secret, random_string
from synapse.util.stringutils import (
assert_valid_client_secret,
random_string,
valid_id_server_location,
)
from ._base import BaseHandler
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
id_server_scheme = "https://"
class IdentityHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
# An HTTP client for contacting trusted URLs.
@@ -77,7 +83,7 @@ class IdentityHandler(BaseHandler):
request: SynapseRequest,
medium: str,
address: str,
):
) -> None:
"""Used to ratelimit requests to `/requestToken` by IP and address.
Args:
@@ -172,6 +178,11 @@ class IdentityHandler(BaseHandler):
server with, if necessary. Required if use_v2 is true
use_v2: Whether to use v2 Identity Service API endpoints. Defaults to True
Raises:
SynapseError: On any of the following conditions
- the supplied id_server is not a valid identity server name
- we failed to contact the supplied identity server
Returns:
The response from the identity server
"""
@@ -181,6 +192,12 @@ class IdentityHandler(BaseHandler):
if id_access_token is None:
use_v2 = False
if not valid_id_server_location(id_server):
raise SynapseError(
400,
"id_server must be a valid hostname with optional port and path components",
)
# Decide which API endpoint URLs to use
headers = {}
bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
@@ -269,12 +286,21 @@ class IdentityHandler(BaseHandler):
id_server: Identity server to unbind from
Raises:
SynapseError: If we failed to contact the identity server
SynapseError: On any of the following conditions
- the supplied id_server is not a valid identity server name
- we failed to contact the supplied identity server
Returns:
True on success, otherwise False if the identity
server doesn't support unbinding
"""
if not valid_id_server_location(id_server):
raise SynapseError(
400,
"id_server must be a valid hostname with optional port and path components",
)
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")

View File

@@ -15,7 +15,7 @@
# limitations under the License.
import logging
import random
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
@@ -66,7 +66,7 @@ logger = logging.getLogger(__name__)
class MessageHandler:
"""Contains some read only APIs to get state about a room"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
@@ -91,7 +91,7 @@ class MessageHandler:
room_id: str,
event_type: str,
state_key: str,
) -> dict:
) -> Optional[EventBase]:
"""Get data from a room.
Args:
@@ -115,6 +115,10 @@ class MessageHandler:
data = await self.state.get_current_state(room_id, event_type, state_key)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
# If the membership is not JOIN, then the event ID should exist.
assert (
membership_event_id is not None
), "check_user_in_room_or_world_readable returned invalid data"
room_state = await self.state_store.get_state_for_events(
[membership_event_id], StateFilter.from_types([key])
)
@@ -186,10 +190,12 @@ class MessageHandler:
event = last_events[0]
if visible_events:
room_state = await self.state_store.get_state_for_events(
room_state_events = await self.state_store.get_state_for_events(
[event.event_id], state_filter=state_filter
)
room_state = room_state[event.event_id]
room_state = room_state_events[
event.event_id
] # type: Mapping[Any, EventBase]
else:
raise AuthError(
403,
@@ -210,10 +216,14 @@ class MessageHandler:
)
room_state = await self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = await self.state_store.get_state_for_events(
# If the membership is not JOIN, then the event ID should exist.
assert (
membership_event_id is not None
), "check_user_in_room_or_world_readable returned invalid data"
room_state_events = await self.state_store.get_state_for_events(
[membership_event_id], state_filter=state_filter
)
room_state = room_state[membership_event_id]
room_state = room_state_events[membership_event_id]
now = self.clock.time_msec()
events = await self._event_serializer.serialize_events(
@@ -248,7 +258,7 @@ class MessageHandler:
"Getting joined members after leaving is not implemented"
)
users_with_profile = await self.state.get_current_users_in_room(room_id)
users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
@@ -1098,7 +1108,7 @@ class EventCreationHandler:
# it's not a self-redaction (to avoid having to look up whether the
# user is actually admin or not).
is_admin_redaction = False
if event.type == EventTypes.Redaction:
if event.type == EventTypes.Redaction and event.redacts:
original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1185,7 +1195,7 @@ class EventCreationHandler:
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
if event.type == EventTypes.Redaction:
if event.type == EventTypes.Redaction and event.redacts:
original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1391,7 +1401,7 @@ class EventCreationHandler:
]
for k in immutable_fields:
if getattr(builder, k, None) != original_event.get(k):
if getattr(builder, k, None) != getattr(original_event, k, None):
raise Exception(
"Third party rules module created an invalid event: "
"cannot change field " + k

View File

@@ -15,7 +15,7 @@
import inspect
import logging
from typing import TYPE_CHECKING, Dict, Generic, List, Optional, TypeVar, Union
from urllib.parse import urlencode
from urllib.parse import urlencode, urlparse
import attr
import pymacaroons
@@ -37,10 +37,7 @@ from twisted.web.client import readBody
from twisted.web.http_headers import Headers
from synapse.config import ConfigError
from synapse.config.oidc_config import (
OidcProviderClientSecretJwtKey,
OidcProviderConfig,
)
from synapse.config.oidc import OidcProviderClientSecretJwtKey, OidcProviderConfig
from synapse.handlers.sso import MappingException, UserAttributes
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
@@ -71,8 +68,8 @@ logger = logging.getLogger(__name__)
#
# Here we have the names of the cookies, and the options we use to set them.
_SESSION_COOKIES = [
(b"oidc_session", b"Path=/_synapse/client/oidc; HttpOnly; Secure; SameSite=None"),
(b"oidc_session_no_samesite", b"Path=/_synapse/client/oidc; HttpOnly"),
(b"oidc_session", b"HttpOnly; Secure; SameSite=None"),
(b"oidc_session_no_samesite", b"HttpOnly"),
]
#: A token exchanged from the token endpoint, as per RFC6749 sec 5.1. and
@@ -282,6 +279,13 @@ class OidcProvider:
self._config = provider
self._callback_url = hs.config.oidc_callback_url # type: str
# Calculate the prefix for OIDC callback paths based on the public_baseurl.
# We'll insert this into the Path= parameter of any session cookies we set.
public_baseurl_path = urlparse(hs.config.server.public_baseurl).path
self._callback_path_prefix = (
public_baseurl_path.encode("utf-8") + b"_synapse/client/oidc"
)
self._oidc_attribute_requirements = provider.attribute_requirements
self._scopes = provider.scopes
self._user_profile_method = provider.user_profile_method
@@ -782,8 +786,13 @@ class OidcProvider:
for cookie_name, options in _SESSION_COOKIES:
request.cookies.append(
b"%s=%s; Max-Age=3600; %s"
% (cookie_name, cookie.encode("utf-8"), options)
b"%s=%s; Max-Age=3600; Path=%s; %s"
% (
cookie_name,
cookie.encode("utf-8"),
self._callback_path_prefix,
options,
)
)
metadata = await self.load_metadata()
@@ -960,6 +969,11 @@ class OidcProvider:
# and attempt to match it.
attributes = await oidc_response_to_user_attributes(failures=0)
if attributes.localpart is None:
# If no localpart is returned then we will generate one, so
# there is no need to search for existing users.
return None
user_id = UserID(attributes.localpart, self._server_name).to_string()
users = await self._store.get_users_by_id_case_insensitive(user_id)
if users:

File diff suppressed because it is too large Load Diff

View File

@@ -475,7 +475,7 @@ class RoomCreationHandler(BaseHandler):
):
await self.room_member_handler.update_membership(
requester,
UserID.from_string(old_event["state_key"]),
UserID.from_string(old_event.state_key),
new_room_id,
"ban",
ratelimit=False,
@@ -1327,7 +1327,7 @@ class RoomShutdownHandler:
new_room_id = None
logger.info("Shutting down room %r", room_id)
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:

View File

@@ -19,7 +19,7 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse import types
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -28,7 +28,6 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
@@ -64,6 +63,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
self.member_linearizer = Linearizer(name="member")
@@ -178,62 +178,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id)
async def _can_join_without_invite(
self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str
) -> bool:
"""
Check whether a user can join a room without an invite.
When joining a room with restricted joined rules (as defined in MSC3083),
the membership of spaces must be checked during join.
Args:
state_ids: The state of the room as it currently is.
room_version: The room version of the room being joined.
user_id: The user joining the room.
Returns:
True if the user can join the room, false otherwise.
"""
# This only applies to room versions which support the new join rule.
if not room_version.msc3083_join_rules:
return True
# If there's no join rule, then it defaults to public (so this doesn't apply).
join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
if not join_rules_event_id:
return True
# If the join rule is not restricted, this doesn't apply.
join_rules_event = await self.store.get_event(join_rules_event_id)
if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED:
return True
# If allowed is of the wrong form, then only allow invited users.
allowed_spaces = join_rules_event.content.get("allow", [])
if not isinstance(allowed_spaces, list):
return False
# Get the list of joined rooms and see if there's an overlap.
joined_rooms = await self.store.get_rooms_for_user(user_id)
# Pull out the other room IDs, invalid data gets filtered.
for space in allowed_spaces:
if not isinstance(space, dict):
continue
space_id = space.get("space")
if not isinstance(space_id, str):
continue
# The user was joined to one of the spaces specified, they can join
# this room!
if space_id in joined_rooms:
return True
# The user was not in any of the required spaces.
return False
async def _local_membership_update(
self,
requester: Requester,
@@ -302,7 +246,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if (
newly_joined
and not user_is_invited
and not await self._can_join_without_invite(
and not await self.event_auth_handler.can_join_without_invite(
prev_state_ids, event.room_version, user_id
)
):
@@ -1100,7 +1044,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.distributor = hs.get_distributor()

View File

@@ -18,6 +18,7 @@ from typing import (
Any,
Awaitable,
Callable,
Collection,
Dict,
Iterable,
List,
@@ -40,7 +41,7 @@ from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http import get_request_user_agent
from synapse.http.server import respond_with_html, respond_with_redirect
from synapse.http.site import SynapseRequest
from synapse.types import Collection, JsonDict, UserID, contains_invalid_mxid_characters
from synapse.types import JsonDict, UserID, contains_invalid_mxid_characters
from synapse.util.async_helpers import Linearizer
from synapse.util.stringutils import random_string

View File

@@ -14,7 +14,17 @@
# limitations under the License.
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
FrozenSet,
List,
Optional,
Set,
Tuple,
)
import attr
from prometheus_client import Counter
@@ -28,7 +38,6 @@ from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
JsonDict,
MutableStateMap,
Requester,
@@ -1181,7 +1190,7 @@ class SyncHandler:
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.state.get_current_users_in_room(room_id)
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)
# TODO: Check that these users are actually new, i.e. either they
@@ -1197,7 +1206,7 @@ class SyncHandler:
# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.state.get_current_users_in_room(room_id)
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)
# Remove any users that we still share a room with.
@@ -1352,7 +1361,7 @@ class SyncHandler:
extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import Any
from typing import TYPE_CHECKING, Any
from twisted.web.client import PartialDownloadError
@@ -22,13 +22,16 @@ from synapse.api.errors import Codes, LoginError, SynapseError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.util import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class UserInteractiveAuthChecker:
"""Abstract base class for an interactive auth checker"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
pass
def is_enabled(self) -> bool:
@@ -57,10 +60,10 @@ class UserInteractiveAuthChecker:
class DummyAuthChecker(UserInteractiveAuthChecker):
AUTH_TYPE = LoginType.DUMMY
def is_enabled(self):
def is_enabled(self) -> bool:
return True
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return True
@@ -70,24 +73,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker):
def is_enabled(self):
return True
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return True
class RecaptchaAuthChecker(UserInteractiveAuthChecker):
AUTH_TYPE = LoginType.RECAPTCHA
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._enabled = bool(hs.config.recaptcha_private_key)
self._http_client = hs.get_proxied_http_client()
self._url = hs.config.recaptcha_siteverify_api
self._secret = hs.config.recaptcha_private_key
def is_enabled(self):
def is_enabled(self) -> bool:
return self._enabled
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
try:
user_response = authdict["response"]
except KeyError:
@@ -132,11 +135,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
class _BaseThreepidAuthChecker:
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastore()
async def _check_threepid(self, medium, authdict):
async def _check_threepid(self, medium: str, authdict: dict) -> dict:
if "threepid_creds" not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@@ -206,31 +209,31 @@ class _BaseThreepidAuthChecker:
class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
AUTH_TYPE = LoginType.EMAIL_IDENTITY
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
UserInteractiveAuthChecker.__init__(self, hs)
_BaseThreepidAuthChecker.__init__(self, hs)
def is_enabled(self):
def is_enabled(self) -> bool:
return self.hs.config.threepid_behaviour_email in (
ThreepidBehaviour.REMOTE,
ThreepidBehaviour.LOCAL,
)
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return await self._check_threepid("email", authdict)
class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
AUTH_TYPE = LoginType.MSISDN
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
UserInteractiveAuthChecker.__init__(self, hs)
_BaseThreepidAuthChecker.__init__(self, hs)
def is_enabled(self):
def is_enabled(self) -> bool:
return bool(self.hs.config.account_threepid_delegate_msisdn)
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return await self._check_threepid("msisdn", authdict)

View File

@@ -44,7 +44,6 @@ class UserDirectoryHandler(StateDeltasHandler):
super().__init__(hs)
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@@ -302,10 +301,12 @@ class UserDirectoryHandler(StateDeltasHandler):
# ignore the change
return
users_with_profile = await self.state.get_current_users_in_room(room_id)
other_users_in_room_with_profiles = (
await self.store.get_users_in_room_with_profiles(room_id)
)
# Remove every user from the sharing tables for that room.
for user_id in users_with_profile.keys():
for user_id in other_users_in_room_with_profiles.keys():
await self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
@@ -314,7 +315,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# which when ran over an entire room, will result in the same values
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
for user_id, profile in users_with_profile.items():
for user_id, profile in other_users_in_room_with_profiles.items():
await self._handle_new_user(room_id, user_id, profile)
async def _handle_new_user(
@@ -336,7 +337,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id
)
# Now we update users who share rooms with users.
users_with_profile = await self.state.get_current_users_in_room(room_id)
other_users_in_room = await self.store.get_users_in_room(room_id)
if is_public:
await self.store.add_users_in_public_rooms(room_id, (user_id,))
@@ -352,14 +353,14 @@ class UserDirectoryHandler(StateDeltasHandler):
# We don't care about appservice users.
if not is_appservice:
for other_user_id in users_with_profile:
for other_user_id in other_users_in_room:
if user_id == other_user_id:
continue
to_insert.add((user_id, other_user_id))
# Next we need to update for every local user in the room
for other_user_id in users_with_profile:
for other_user_id in other_users_in_room:
if user_id == other_user_id:
continue

View File

@@ -33,6 +33,7 @@ import treq
from canonicaljson import encode_canonical_json
from netaddr import AddrFormatError, IPAddress, IPSet
from prometheus_client import Counter
from typing_extensions import Protocol
from zope.interface import implementer, provider
from OpenSSL import SSL
@@ -754,6 +755,16 @@ def _timeout_to_request_timed_out_error(f: Failure):
return f
class ByteWriteable(Protocol):
"""The type of object which must be passed into read_body_with_max_size.
Typically this is a file object.
"""
def write(self, data: bytes) -> int:
pass
class BodyExceededMaxSize(Exception):
"""The maximum allowed size of the HTTP body was exceeded."""
@@ -790,7 +801,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
transport = None # type: Optional[ITCPTransport]
def __init__(
self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
self, stream: ByteWriteable, deferred: defer.Deferred, max_size: Optional[int]
):
self.stream = stream
self.deferred = deferred
@@ -830,7 +841,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
def read_body_with_max_size(
response: IResponse, stream: BinaryIO, max_size: Optional[int]
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
) -> defer.Deferred:
"""
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.

View File

@@ -1,5 +1,4 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
import codecs
import logging
import random
import sys
import typing
import urllib.parse
from io import BytesIO
from io import BytesIO, StringIO
from typing import Callable, Dict, List, Optional, Tuple, Union
import attr
@@ -72,6 +73,9 @@ incoming_responses_counter = Counter(
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
)
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
# need a generous limit here.
MAX_RESPONSE_SIZE = 100 * 1024 * 1024
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
@@ -150,6 +154,7 @@ async def _handle_json_response(
request: MatrixFederationRequest,
response: IResponse,
start_ms: int,
return_string_io=False,
) -> JsonDict:
"""
Reads the JSON body of a response, with a timeout
@@ -167,12 +172,27 @@ async def _handle_json_response(
try:
check_content_type_is_json(response.headers)
# Use the custom JSON decoder (partially re-implements treq.json_content).
d = treq.text_content(response, encoding="utf-8")
d.addCallback(json_decoder.decode)
buf = StringIO()
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
body = await make_deferred_yieldable(d)
await make_deferred_yieldable(d)
if return_string_io:
body = buf
else:
body = json_decoder.decode(buf.getvalue())
except BodyExceededMaxSize as e:
# The response was too big.
logger.warning(
"{%s} [%s] JSON response exceeded max size %i - %s %s",
request.txn_id,
request.destination,
MAX_RESPONSE_SIZE,
request.method,
request.uri.decode("ascii"),
)
raise RequestSendFailed(e, can_retry=False) from e
except ValueError as e:
# The JSON content was invalid.
logger.warning(
@@ -206,18 +226,31 @@ async def _handle_json_response(
time_taken_secs = reactor.seconds() - start_ms / 1000
logger.info(
"{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
"{%s} [%s] Completed request: %d %s in %.2f secs got %dB - %s %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
time_taken_secs,
len(buf.getvalue()),
request.method,
request.uri.decode("ascii"),
)
return body
class BinaryIOWrapper:
"""A wrapper for a TextIO which converts from bytes on the fly."""
def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
self.decoder = codecs.getincrementaldecoder(encoding)(errors)
self.file = file
def write(self, b: Union[bytes, bytearray]) -> int:
self.file.write(self.decoder.decode(b))
return len(b)
class MatrixFederationHttpClient:
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@@ -652,6 +685,7 @@ class MatrixFederationHttpClient:
ignore_backoff: bool = False,
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
return_string_io=False,
) -> Union[JsonDict, list]:
"""Sends the specified json data using PUT
@@ -726,7 +760,12 @@ class MatrixFederationHttpClient:
_sec_timeout = self.default_timeout
body = await _handle_json_response(
self.reactor, _sec_timeout, request, response, start_ms
self.reactor,
_sec_timeout,
request,
response,
start_ms,
return_string_io=return_string_io,
)
return body

View File

@@ -14,13 +14,14 @@
import contextlib
import logging
import time
from typing import Optional, Tuple, Type, Union
from typing import Optional, Tuple, Union
import attr
from zope.interface import implementer
from twisted.internet.interfaces import IAddress
from twisted.internet.interfaces import IAddress, IReactorTime
from twisted.python.failure import Failure
from twisted.web.resource import IResource
from twisted.web.server import Request, Site
from synapse.config.server import ListenerConfig
@@ -49,6 +50,7 @@ class SynapseRequest(Request):
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
* A limit to the size of request which will be accepted
It also provides a method `processing`, which returns a context manager. If this
method is called, the request won't be logged until the context manager is closed;
@@ -59,8 +61,9 @@ class SynapseRequest(Request):
logcontext: the log context for this request
"""
def __init__(self, channel, *args, **kw):
def __init__(self, channel, *args, max_request_body_size=1024, **kw):
Request.__init__(self, channel, *args, **kw)
self._max_request_body_size = max_request_body_size
self.site = channel.site # type: SynapseSite
self._channel = channel # this is used by the tests
self.start_time = 0.0
@@ -97,6 +100,18 @@ class SynapseRequest(Request):
self.site.site_tag,
)
def handleContentChunk(self, data):
# we should have a `content` by now.
assert self.content, "handleContentChunk() called before gotLength()"
if self.content.tell() + len(data) > self._max_request_body_size:
logger.warning(
"Aborting connection from %s because the request exceeds maximum size",
self.client,
)
self.transport.abortConnection()
return
super().handleContentChunk(data)
@property
def requester(self) -> Optional[Union[Requester, str]]:
return self._requester
@@ -485,29 +500,55 @@ class _XForwardedForAddress:
class SynapseSite(Site):
"""
Subclass of a twisted http Site that does access logging with python's
standard logging
Synapse-specific twisted http Site
This does two main things.
First, it replaces the requestFactory in use so that we build SynapseRequests
instead of regular t.w.server.Requests. All of the constructor params are really
just parameters for SynapseRequest.
Second, it inhibits the log() method called by Request.finish, since SynapseRequest
does its own logging.
"""
def __init__(
self,
logger_name,
site_tag,
logger_name: str,
site_tag: str,
config: ListenerConfig,
resource,
resource: IResource,
server_version_string,
*args,
**kwargs,
max_request_body_size: int,
reactor: IReactorTime,
):
Site.__init__(self, resource, *args, **kwargs)
"""
Args:
logger_name: The name of the logger to use for access logs.
site_tag: A tag to use for this site - mostly in access logs.
config: Configuration for the HTTP listener corresponding to this site
resource: The base of the resource tree to be used for serving requests on
this site
server_version_string: A string to present for the Server header
max_request_body_size: Maximum request body length to allow before
dropping the connection
reactor: reactor to be used to manage connection timeouts
"""
Site.__init__(self, resource, reactor=reactor)
self.site_tag = site_tag
assert config.http_options is not None
proxied = config.http_options.x_forwarded
self.requestFactory = (
XForwardedForRequest if proxied else SynapseRequest
) # type: Type[Request]
request_class = XForwardedForRequest if proxied else SynapseRequest
def request_factory(channel, queued) -> Request:
return request_class(
channel, max_request_body_size=max_request_body_size, queued=queued
)
self.requestFactory = request_factory # type: ignore
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii")

View File

@@ -226,11 +226,11 @@ class RemoteHandler(logging.Handler):
old_buffer = self._buffer
self._buffer = deque()
for i in range(buffer_split):
for _ in range(buffer_split):
self._buffer.append(old_buffer.popleft())
end_buffer = []
for i in range(buffer_split):
for _ in range(buffer_split):
end_buffer.append(old_buffer.pop())
self._buffer.extend(reversed(end_buffer))

View File

@@ -258,7 +258,8 @@ class LoggingContext:
child to the parent
Args:
name (str): Name for the context for debugging.
name: Name for the context for logging. If this is omitted, it is
inherited from the parent context.
parent_context (LoggingContext|None): The parent of the new context
"""
@@ -282,7 +283,6 @@ class LoggingContext:
request: Optional[ContextRequest] = None,
) -> None:
self.previous_context = current_context()
self.name = name
# track the resources used by this context so far
self._resource_usage = ContextResourceUsage()
@@ -314,10 +314,17 @@ class LoggingContext:
# the request param overrides the request from the parent context
self.request = request
# if we don't have a `name`, but do have a parent context, use its name.
if self.parent_context and name is None:
name = str(self.parent_context)
if name is None:
raise ValueError(
"LoggingContext must be given either a name or a parent context"
)
self.name = name
def __str__(self) -> str:
if self.request:
return self.request.request_id
return "%s@%x" % (self.name, id(self))
return self.name
@classmethod
def current_context(cls) -> LoggingContextOrSentinel:
@@ -694,17 +701,13 @@ def nested_logging_context(suffix: str) -> LoggingContext:
"Starting nested logging context from sentinel context: metrics will be lost"
)
parent_context = None
prefix = ""
request = None
else:
assert isinstance(curr_context, LoggingContext)
parent_context = curr_context
prefix = str(parent_context.name)
request = parent_context.request
prefix = str(curr_context)
return LoggingContext(
prefix + "-" + suffix,
parent_context=parent_context,
request=request,
)
@@ -895,7 +898,7 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
parent_context = curr_context
def g():
with LoggingContext(parent_context=parent_context):
with LoggingContext(str(curr_context), parent_context=parent_context):
return f(*args, **kwargs)
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))

View File

@@ -535,6 +535,13 @@ class ReactorLastSeenMetric:
REGISTRY.register(ReactorLastSeenMetric())
# The minimum time in seconds between GCs for each generation, regardless of the current GC
# thresholds and counts.
MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
# The time (in seconds since the epoch) of the last time we did a GC for each generation.
_last_gc = [0.0, 0.0, 0.0]
def runUntilCurrentTimer(reactor, func):
@functools.wraps(func)
@@ -575,11 +582,16 @@ def runUntilCurrentTimer(reactor, func):
return ret
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
# one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
# promote an object into gen 2, and we don't want to handle the same
# object multiple times.
threshold = gc.get_threshold()
counts = gc.get_count()
for i in (2, 1, 0):
if threshold[i] < counts[i]:
# We check if we need to do one based on a straightforward
# comparison between the threshold and count. We also do an extra
# check to make sure that we don't a GC too often.
if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
if i == 0:
logger.debug("Collecting gc %d", i)
else:
@@ -589,6 +601,8 @@ def runUntilCurrentTimer(reactor, func):
unreachable = gc.collect(i)
end = time.time()
_last_gc[i] = end
gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)
@@ -615,6 +629,7 @@ try:
except AttributeError:
pass
__all__ = [
"MetricsResource",
"generate_latest",

View File

@@ -241,19 +241,24 @@ class BackgroundProcessLoggingContext(LoggingContext):
processes.
"""
__slots__ = ["_id", "_proc"]
__slots__ = ["_proc"]
def __init__(self, name: str, id: Optional[Union[int, str]] = None):
super().__init__(name)
self._id = id
def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None):
"""
Args:
name: The name of the background process. Each distinct `name` gets a
separate prometheus time series.
instance_id: an identifer to add to `name` to distinguish this instance of
the named background process in the logs. If this is `None`, one is
made up based on id(self).
"""
if instance_id is None:
instance_id = id(self)
super().__init__("%s-%s" % (name, instance_id))
self._proc = _BackgroundProcess(name, self)
def __str__(self) -> str:
if self._id is not None:
return "%s-%s" % (self.name, self._id)
return "%s@%x" % (self.name, id(self))
def start(self, rusage: "Optional[resource._RUsage]"):
"""Log context has started running (again)."""

Some files were not shown because too many files have changed in this diff Show More