1
0

Compare commits

...

204 Commits

Author SHA1 Message Date
Matthew Hodgson
c48e2f3111 missing word :| 2018-05-01 19:18:21 +01:00
Matthew Hodgson
5c2214f4c7 fix markdown 2018-05-01 19:03:35 +01:00
Matthew Hodgson
8e6bd0e324 changelog for 0.28.1 2018-05-01 18:28:23 +01:00
Neil Johnson
8570bb84cc Update __init__.py
bump version
2018-05-01 18:22:53 +01:00
Richard van der Hoff
d5eee5d601 Merge commit '33f469b' into release-v0.28.1 2018-05-01 18:14:18 +01:00
Richard van der Hoff
d858f3bd4e Miscellaneous fixes to python_dependencies
* add some doc about wtf this thing does
* pin Twisted to < 18.4
* add explicit dep on six (fixes #3089)
2018-05-01 18:13:54 +01:00
Richard van der Hoff
33f469ba19 Apply some limits to depth to counter abuse
* When creating a new event, cap its depth to 2^63 - 1
* When receiving events, reject any without a sensible depth

As per https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI
2018-05-01 17:54:19 +01:00
Neil Johnson
28dd536e80 update changelog and bump version to 0.28.0 2018-04-26 15:51:39 +01:00
Neil Johnson
8721580303 Merge branch 'develop' of https://github.com/matrix-org/synapse into release-v0.28.0-rc1 2018-04-26 15:44:54 +01:00
Erik Johnston
0ced8b5b47 Merge pull request #3134 from matrix-org/erikj/fix_admin_media_api
Fix media admin APIs
2018-04-26 12:02:40 +01:00
Erik Johnston
7ec8e798b4 Fix media admin APIs 2018-04-26 11:31:22 +01:00
Erik Johnston
a5ad88913c Merge pull request #3130 from matrix-org/erikj/fix_quarantine_room
Fix quarantine media admin API
2018-04-25 17:54:12 +01:00
Erik Johnston
22881b3d69 Also fix reindexing of search 2018-04-25 15:32:04 +01:00
Erik Johnston
ba3166743c Fix quarantine media admin API 2018-04-25 15:11:18 +01:00
Neil Johnson
1bb83d5d41 Merge branch 'master' into develop 2018-04-24 15:52:43 +01:00
Neil Johnson
13a2beabca Update CHANGES.rst
fix formatting on line break
2018-04-24 15:43:30 +01:00
Neil Johnson
2c3e995f38 Bump version and update changelog 2018-04-24 15:33:22 +01:00
Neil Johnson
8e8b06715f Revert "Bump version and update changelog"
This reverts commit 08b29d4574.
2018-04-24 13:58:45 +01:00
Neil Johnson
08b29d4574 Bump version and update changelog 2018-04-24 13:56:12 +01:00
Richard van der Hoff
77ebef9d43 Merge pull request #3118 from matrix-org/rav/reject_prev_events
Reject events which have lots of prev_events
2018-04-23 17:51:38 +01:00
Richard van der Hoff
9b9c38373c Remove spurious param 2018-04-23 12:00:06 +01:00
Richard van der Hoff
286e20f2bc Merge pull request #3109 from NotAFile/py3-tests-fix
Make tests py3 compatible
2018-04-23 11:59:03 +01:00
Richard van der Hoff
dc875d2712 Merge pull request #3106 from NotAFile/py3-six-itervalues-1
Use six.itervalues in some places
2018-04-20 15:43:52 +01:00
Richard van der Hoff
8dc4a6144b Merge pull request #3107 from NotAFile/py3-bool-nonzero
add __bool__ alias to __nonzero__ methods
2018-04-20 15:43:39 +01:00
Richard van der Hoff
d06a9ea5f7 Merge pull request #3104 from NotAFile/py3-unittest-config
Add some more variables to the unittest config
2018-04-20 15:35:58 +01:00
Richard van der Hoff
c09a6daf09 Merge pull request #3110 from NotAFile/py3-six-queue
Replace Queue with six.moves.queue
2018-04-20 15:35:00 +01:00
Richard van der Hoff
692a3cc806 Merge pull request #3103 from NotAFile/py3-baseexcepton-message
Use str(e) instead of e.message
2018-04-20 15:34:49 +01:00
Erik Johnston
366dd893fc Merge pull request #3100 from silkeh/readme-srv-cname
Clarify that SRV may not point to a CNAME
2018-04-20 15:18:44 +01:00
Erik Johnston
bdb7714d13 Merge pull request #3125 from matrix-org/erikj/add_contrib_docs
Document contrib directory
2018-04-20 13:02:24 +01:00
Erik Johnston
67dabe143d Document contrib directory 2018-04-20 11:47:38 +01:00
Richard van der Hoff
3de7d9fe99 accept stupid events over backfill 2018-04-20 11:41:03 +01:00
Richard van der Hoff
11a67b7c9d Merge pull request #3093 from matrix-org/rav/response_cache_wrap
Refactor ResponseCache usage
2018-04-20 11:31:17 +01:00
Richard van der Hoff
0c280d4d99 Reinstate linearizer for federation_server.on_context_state_request 2018-04-20 11:10:04 +01:00
Richard van der Hoff
bc381d5798 Merge pull request #3117 from matrix-org/rav/refactor_have_events
Refactor store.have_events
2018-04-20 10:26:12 +01:00
Richard van der Hoff
b1dfbc3c40 Refactor store.have_events
It turns out that most of the time we were calling have_events, we were only
using half of the result. Replace have_events with have_seen_events and
get_rejection_reasons, so that we can see what's going on a bit more clearly.
2018-04-20 10:25:56 +01:00
Richard van der Hoff
dacf3a50ac Merge pull request #3113 from matrix-org/rav/fix_huge_prev_events
Avoid creating events with huge numbers of prev_events
2018-04-18 11:27:56 +01:00
Richard van der Hoff
1f4b498b73 Add some comments 2018-04-18 00:15:36 +01:00
Richard van der Hoff
e585228860 Check events on backfill too 2018-04-18 00:06:42 +01:00
Richard van der Hoff
9b7794262f Reject events which have too many auth_events or prev_events
... this should protect us from being dossed by people making silly events
(deliberately or otherwise)
2018-04-18 00:06:42 +01:00
Richard van der Hoff
639480e14a Avoid creating events with huge numbers of prev_events
In most cases, we limit the number of prev_events for a given event to 10
events. This fixes a particular code path which created events with huge
numbers of prev_events.
2018-04-16 18:41:37 +01:00
Adrian Tschira
878995e660 Replace Queue with six.moves.queue
and a six.range change which I missed the last time

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-16 00:46:21 +02:00
Adrian Tschira
a1a3c9660f Make tests py3 compatible
This is a mixed commit that fixes various small issues

 * print parentheses
 * 01 is invalid syntax (it was octal in py2)
 * [x for i in 1, 2] is invalid syntax
 * six moves

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-16 00:39:32 +02:00
Matthew Hodgson
512633ef44 fix spurious changelog dup 2018-04-15 22:45:06 +01:00
Adrian Tschira
f63ff73c7f add __bool__ alias to __nonzero__ methods
Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-15 20:40:47 +02:00
Adrian Tschira
36c59ce669 Use six.itervalues in some places
There's more where that came from

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-15 20:39:43 +02:00
Adrian Tschira
cb9cdfecd0 Add some more variables to the unittest config
These worked accidentally before (python2 doesn't complain if you
compare incompatible types) but under py3 this blows up spectacularly

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-15 20:36:39 +02:00
Adrian Tschira
1515560f5c Use str(e) instead of e.message
Doing this I learned e.message was pretty shortlived, added in 2.6,
they realized it was a bad idea and deprecated it in 2.7

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-15 20:32:42 +02:00
Silke
c4bdbc2bd2 Clarify that SRV may not point to a CNAME
Signed-off-by: Silke Hofstra <silke@slxh.eu>
2018-04-14 14:55:25 +02:00
Neil Johnson
154b44c249 Merge branch 'master' of https://github.com/matrix-org/synapse into develop 2018-04-13 17:07:54 +01:00
Matthew Hodgson
0d8c50df44 Merge pull request #3099 from matrix-org/matthew/fix-federation-domain-whitelist
fix federation_domain_whitelist
2018-04-13 15:51:13 +01:00
Matthew Hodgson
78a9698650 fix federation_domain_whitelist
we were checking the wrong server_name on inbound requests
2018-04-13 15:47:43 +01:00
Matthew Hodgson
25b0ba30b1 revert last to PR properly 2018-04-13 15:46:37 +01:00
Matthew Hodgson
f8d46cad3c correctly auth inbound federation_domain_whitelist reqs 2018-04-13 15:41:52 +01:00
Neil Johnson
d4b2e05852 Merge branch 'master' of https://github.com/matrix-org/synapse into develop 2018-04-13 12:16:27 +01:00
Neil Johnson
eb53439c4a Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse 2018-04-13 12:14:57 +01:00
Neil Johnson
51d628d28d Bump version and Change log 2018-04-13 12:08:19 +01:00
Neil Johnson
df77837a33 Merge pull request #3095 from matrix-org/rav/bump_canonical_json
Update canonicaljson dependency
2018-04-13 12:04:46 +01:00
Richard van der Hoff
d3347ad485 Revert "Use sortedcontainers instead of blist"
This reverts commit 9fbe70a7dc.

It turns out that sortedcontainers.SortedDict is not an exact match for
blist.sorteddict; in particular, `popitem()` removes things from the opposite
end of the dict.

This is trivial to fix, but I want to add some unit tests, and potentially some
more thought about it, before we do so.
2018-04-13 11:16:43 +01:00
Richard van der Hoff
fac3f9e678 Bump canonicaljson to 1.1.3
1.1.2 was a bit broken too :/
2018-04-13 10:21:38 +01:00
Richard van der Hoff
60f6014bb7 ResponseCache: fix handling of completed results
Turns out that ObservableDeferred.observe doesn't return a deferred if the
result is already completed. Fix handling and improve documentation.
2018-04-13 07:32:29 +01:00
Richard van der Hoff
119596ab8f Update canonicaljson dependency
1.1.0 and 1.1.1 were broken, so we're updating this to help people make sure
they don't end up on a broken version.

Also, 1.1.0 is speedier...
2018-04-12 17:31:44 +01:00
Richard van der Hoff
b78395b7fe Refactor ResponseCache usage
Adds a `.wrap` method to ResponseCache which wraps up the boilerplate of a
(get, set) pair, and then use it throughout the codebase.

This will be largely non-functional, but does include the following functional
changes:

* federation_server.on_context_state_request: drops use of _server_linearizer
  which looked redundant and could cause incorrect cache misses by yielding
  between the get and the set.
* RoomListHandler.get_remote_public_room_list(): fixes logcontext leaks
* the wrap function includes some logging. I'm hoping this won't be too noisy
  on production.
2018-04-12 13:02:15 +01:00
Richard van der Hoff
d5c74b9f6c Merge pull request #3092 from matrix-org/rav/response_cache_metrics
Add metrics for ResponseCache
2018-04-12 12:59:36 +01:00
Erik Johnston
0f13f30fca Merge pull request #3090 from matrix-org/erikj/processed_event_lag
Add metrics for event processing lag
2018-04-12 12:18:57 +01:00
Erik Johnston
415aeefd89 Format docstring 2018-04-12 12:07:09 +01:00
Erik Johnston
19ceb4851f Merge branch 'develop' of github.com:matrix-org/synapse into erikj/processed_event_lag 2018-04-12 11:36:07 +01:00
Richard van der Hoff
261124396e Merge pull request #3059 from matrix-org/rav/doc_response_cache
Document the behaviour of ResponseCache
2018-04-12 11:22:30 +01:00
Erik Johnston
23a7f9d7f4 Doc we raise on unknown event 2018-04-12 11:20:51 +01:00
Erik Johnston
d7bf3a68f0 s/list/tuple 2018-04-12 11:19:04 +01:00
Erik Johnston
f67e906e18 Set all metrics at the same time 2018-04-12 11:18:19 +01:00
Erik Johnston
971059a733 Merge pull request #3088 from matrix-org/erikj/as_parallel
Send events to ASes concurrently
2018-04-12 10:42:36 +01:00
Erik Johnston
e939f3bca6 Fix tests 2018-04-11 14:37:11 +01:00
Erik Johnston
4dae4a97ed Track last processed event received_ts 2018-04-11 14:27:09 +01:00
Erik Johnston
92e34615c5 Track where event stream processing have gotten up to 2018-04-11 12:13:40 +01:00
Erik Johnston
ab825aa328 Add GaugeMetric 2018-04-11 12:13:40 +01:00
Richard van der Hoff
233699c42e Merge pull request #2760 from Valodim/pypy
Synapse on PyPy
2018-04-11 11:20:01 +01:00
Neil Johnson
427e6c4059 Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse 2018-04-11 10:59:00 +01:00
Neil Johnson
781cd8c54f bump version/changelog 2018-04-11 10:54:43 +01:00
Neil Johnson
9ef0b179e0 Merge commit '11d2609da70af797405241cdf7d9df19db5628f2' of https://github.com/matrix-org/synapse into release-v0.27.0 2018-04-11 10:51:59 +01:00
Erik Johnston
121591568b Send events to ASes concurrently 2018-04-11 09:56:00 +01:00
Richard van der Hoff
b3384232a0 Add metrics for ResponseCache 2018-04-10 23:14:47 +01:00
Matthew Hodgson
360d899a64 Merge pull request #3086 from matrix-org/r30_stats
fix typo
2018-04-10 17:46:37 +01:00
Neil Johnson
d54cfbb7a8 fix typo 2018-04-10 17:38:16 +01:00
Erik Johnston
eaa2ebf20b Merge pull request #3079 from matrix-org/erikj/limit_concurrent_sends
Limit concurrent event sends for a room
2018-04-10 16:43:58 +01:00
Erik Johnston
9daf82278f Merge pull request #3078 from matrix-org/erikj/federation_sender
Send federation events concurrently
2018-04-10 16:43:48 +01:00
Neil Johnson
dd723267b2 Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse into develop 2018-04-10 15:03:29 +01:00
Erik Johnston
a060dfa132 Use run_in_background instead 2018-04-10 14:25:11 +01:00
Erik Johnston
f8e8ec013b Note why we're limiting concurrent event sends 2018-04-10 14:00:46 +01:00
Erik Johnston
1246d23710 Preserve log contexts correctly 2018-04-10 12:04:32 +01:00
Erik Johnston
d49cbf712f Log event ID on exception 2018-04-10 12:03:41 +01:00
Erik Johnston
ce72d590ed Merge pull request #3082 from matrix-org/erikj/urlencode_paths
URL quote path segments over federation
2018-04-10 11:31:16 +01:00
Erik Johnston
11d2609da7 Ensure slashes are escaped 2018-04-10 11:24:40 +01:00
Erik Johnston
dab87b84a3 URL quote path segments over federation 2018-04-10 11:16:08 +01:00
Vincent Breitmoser
6d7f0f8dd3 Don't disable GC when running on PyPy
PyPy's incminimark GC can't be triggered manually. From what I observed
there are no obvious issues with just letting it run normally. And
unlike CPython, it actually returns unused RAM to the system.

Signed-off-by: Vincent Breitmoser <look@my.amazin.horse>
2018-04-10 11:35:34 +02:00
Vincent Breitmoser
f4284d943a In DomainSpecificString, override __repr__ in addition to __str__
For some reason, string interpolation on a DomainSpecificString object
like "%r" % (domainSpecificStringObj) fails under PyPy, because the
default __repr__ implementation wants to iterate over the object. I'm
not sure why that happens, but overriding __repr__ instead of __str__
fixes this problem, and is arguably the more appropriate thing to do
anyways.
2018-04-10 11:35:29 +02:00
Richard van der Hoff
d1e56cfcd1 Fix pep8 error on psycopg2cffi hack 2018-04-10 11:35:29 +02:00
Vincent Breitmoser
89de934981 Use psycopg2cffi module instead of psycopg2 if running on pypy
The psycopg2 package isn't available for PyPy.  This commit adds a check
if the runtime is PyPy, and if it is uses psycopg2cffi module in favor
of psycopg2. This is almost a drop-in replacement, except for one place
where an additional cast to string is required.
2018-04-10 11:29:52 +02:00
Vincent Breitmoser
9fbe70a7dc Use sortedcontainers instead of blist
This commit drop-in replaces blist with SortedContainers. They are
written in pure python so work with pypy, but perform as good as
native implementations, at least in a couple benchmarks:

http://www.grantjenks.com/docs/sortedcontainers/performance.html
2018-04-10 11:29:51 +02:00
Richard van der Hoff
a3599dda97 Merge pull request #2996 from krombel/allow_auto_join_rooms
move handling of auto_join_rooms to RegisterHandler
2018-04-10 01:11:00 +01:00
Richard van der Hoff
87478c5a60 Merge pull request #3061 from NotAFile/add-some-byte-strings
Add b prefixes to some strings that are bytes in py3
2018-04-09 23:54:05 +01:00
Richard van der Hoff
c508b2f2f0 Merge pull request #3073 from NotAFile/use-six-reraise
Replace old-style raise with six.reraise
2018-04-09 23:53:40 +01:00
Richard van der Hoff
37354b55c9 Merge pull request #2938 from dklug/develop
Return 401 for invalid access_token on logout
2018-04-09 23:52:56 +01:00
Richard van der Hoff
0e9aa1d091 Merge pull request #3074 from NotAFile/fix-py3-prints
use python3-compatible prints
2018-04-09 23:44:41 +01:00
Richard van der Hoff
8eaa141d8f Merge pull request #3075 from NotAFile/six-type-checks
Replace some type checks with six type checks
2018-04-09 23:40:44 +01:00
Richard van der Hoff
664adb4236 Merge pull request #3016 from silkeh/improve-service-lookups
Improve handling of SRV records for federation connections
2018-04-09 23:40:06 +01:00
Richard van der Hoff
aea3a93611 Merge pull request #3069 from krombel/update_prometheus_config
update prometheus dashboard to use new metric names
2018-04-09 23:37:18 +01:00
Neil Johnson
41e0611895 remove errant print 2018-04-09 18:44:20 +01:00
Neil Johnson
61b439c904 Fix msec to sec, again 2018-04-09 18:43:48 +01:00
Neil Johnson
87770300d5 Fix msec to sec 2018-04-09 18:38:59 +01:00
Neil Johnson
9a311adfea v0.27.3-rc2 2018-04-09 17:52:08 +01:00
Neil Johnson
64bc2162ef Fix psycopg2 interpolation 2018-04-09 17:50:36 +01:00
Richard van der Hoff
d2c6f4d626 Merge pull request #3080 from matrix-org/rav/fix_500_on_rejoin
Return a 404 rather than a 500 on rejoining empty rooms
2018-04-09 17:32:36 +01:00
Neil Johnson
5232d3bfb1 version bump v0.27.3-rc2 2018-04-09 17:25:57 +01:00
Neil Johnson
5e785d4d5b Merge branch 'develop' of https://github.com/matrix-org/synapse into release-v0.27.0 2018-04-09 17:21:34 +01:00
Erik Johnston
6e025a97b4 Handle all events in a room correctly 2018-04-09 16:02:48 +01:00
Neil Johnson
414b2b3bd1 Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse into release-v0.27.0 2018-04-09 16:02:08 +01:00
Neil Johnson
b151eb14a2 Update CHANGES.rst 2018-04-09 16:01:59 +01:00
Neil Johnson
64cebbc730 Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse into release-v0.27.0 2018-04-09 16:00:51 +01:00
Neil Johnson
d9ae2bc826 bump version to release candidate 2018-04-09 16:00:31 +01:00
Neil Johnson
21d5a2a08e Update CHANGES.rst 2018-04-09 15:55:41 +01:00
Neil Johnson
c115deed12 Update CHANGES.rst 2018-04-09 15:54:32 +01:00
Neil Johnson
072fb59446 bump version 2018-04-09 13:49:25 +01:00
Neil Johnson
89dda61315 Merge branch 'develop' into release-v0.27.0 2018-04-09 13:48:15 +01:00
Neil Johnson
687f3451bd 0.27.3 2018-04-09 13:44:05 +01:00
Richard van der Hoff
13decdbf96 Revert "Merge pull request #3066 from matrix-org/rav/remove_redundant_metrics"
We aren't ready to release this yet, so I'm reverting it for now.

This reverts commit d1679a4ed7, reversing
changes made to e089100c62.
2018-04-09 12:59:12 +01:00
Richard van der Hoff
f3ef60662f Return a 404 rather than a 500 on rejoining empty rooms
Filter ourselves out of the server list before checking for an empty remote
host list, to fix 500 error

Fixes #2141
2018-04-09 12:56:22 +01:00
Erik Johnston
e5082494eb Limit concurrent event sends for a room 2018-04-09 12:07:39 +01:00
Erik Johnston
56b0589865 Use create_and_send_nonmember_event everywhere 2018-04-09 12:04:18 +01:00
Erik Johnston
11974f3787 Send federation events concurrently 2018-04-09 11:47:10 +01:00
Erik Johnston
145d14656b Handle exceptions in get_hosts_for_room when sending events over federation 2018-04-09 11:47:01 +01:00
Adrian Tschira
e54c202b81 Replace some type checks with six type checks
Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-07 01:02:32 +02:00
Adrian Tschira
b0500d3774 use python3-compatible prints 2018-04-06 23:35:27 +02:00
Adrian Tschira
4f40d058cc Replace old-style raise with six.reraise
The old style raise is invalid syntax in python3. As noted in the docs,
this adds one more frame in the traceback, but I think this is
acceptable:

    <ipython-input-7-bcc5cba3de3f> in <module>()
         16     except:
         17         pass
    ---> 18     six.reraise(*x)

    /usr/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
        691             if value.__traceback__ is not tb:
        692                 raise value.with_traceback(tb)
    --> 693             raise value
        694         finally:
        695             value = None

    <ipython-input-7-bcc5cba3de3f> in <module>()
          9
         10 try:
    ---> 11     x()
         12 except:
         13     x = sys.exc_info()

Also note that this uses six, which is not formally a dependency yet,
but is included indirectly since most packages depend on it.

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-06 23:06:24 +02:00
Luke Barnard
135fc5b9cd Merge pull request #3046 from matrix-org/dbkr/join_group
Implement group join API
2018-04-06 16:24:32 +01:00
Luke Barnard
020a501354 de-lint, quote consistency 2018-04-06 16:02:06 +01:00
Luke Barnard
db2fd801f7 Explicitly grab individual columns from group object 2018-04-06 15:57:25 +01:00
Erik Johnston
e8b03cab1b Merge pull request #3071 from matrix-org/erikj/resp_size_metrics
Add response size metrics
2018-04-06 15:49:12 +01:00
Richard van der Hoff
8844f95c32 Merge pull request #3072 from matrix-org/rav/fix_port_script
postgres port script: fix state_groups_pkey error
2018-04-06 15:47:40 +01:00
Luke Barnard
7945435587 When exposing group state, return is_openly_joinable
as opposed to join_policy, which is really only pertinent to the
synapse implementation of the group server.

By doing this we keep the group server concept extensible by
allowing arbitrarily complex rules for deciding whether a group
is openly joinable.
2018-04-06 15:43:27 +01:00
Luke Barnard
6bd1b7053e By default, join policy is "invite" 2018-04-06 15:43:27 +01:00
Luke Barnard
b4478e586f add_user -> _add_user 2018-04-06 15:43:27 +01:00
Luke Barnard
112c2253e2 pep8 2018-04-06 15:43:27 +01:00
Luke Barnard
6850f8aea3 Get group_info from existing call to check_group_is_ours 2018-04-06 15:43:27 +01:00
Luke Barnard
cd087a265d Don't use redundant inlineCallbacks 2018-04-06 15:43:27 +01:00
Luke Barnard
87c864b698 join_rule -> join_policy 2018-04-06 15:43:27 +01:00
Luke Barnard
ae85c7804e is_joinable -> join_rule 2018-04-06 15:43:27 +01:00
Luke Barnard
f8d1917fce Fix federation client set_group_joinable typo 2018-04-06 15:43:27 +01:00
Luke Barnard
6eb3aa94b6 Factor out add_user from accept_invite and join_group 2018-04-06 15:43:27 +01:00
David Baker
edb45aae38 pep8 2018-04-06 15:43:27 +01:00
David Baker
b370fe61c0 Implement group join API 2018-04-06 15:43:27 +01:00
Richard van der Hoff
6a9777ba02 Port script: Set up state_group_id_seq
Fixes https://github.com/matrix-org/synapse/issues/3050.
2018-04-06 15:33:30 +01:00
Richard van der Hoff
01579384cc Port script: clean up a bit
Improve logging and comments. Group all the stuff to do with inspecting tables
together rather than creating the port tables in the middle.
2018-04-06 15:33:30 +01:00
Richard van der Hoff
e01ba5bda3 Port script: avoid nasty errors when setting up
We really shouldn't spit out "Failed to create port table", it looks scary.
2018-04-06 15:33:30 +01:00
Erik Johnston
7b824f1475 Add response size metrics 2018-04-06 13:20:11 +01:00
Erik Johnston
35ff941172 Merge pull request #3070 from krombel/group_join_put_instead_post
use PUT instead of POST for federating groups/m.join_policy
2018-04-06 12:11:16 +01:00
Krombel
1d71f484d4 use PUT instead of POST for federating groups/m.join_policy 2018-04-06 12:54:09 +02:00
Richard van der Hoff
15e8ed874f more verbosity in synctl 2018-04-06 09:28:36 +01:00
Krombel
c7ede92d0b make prometheus config compliant to v0.28 2018-04-05 23:34:01 +02:00
Richard van der Hoff
551422051b Merge pull request #2886 from turt2live/travis/new-worker-docs
Add a blurb explaining the main synapse worker
2018-04-05 17:33:09 +01:00
Richard van der Hoff
c7f0969731 Merge pull request #2986 from jplatte/join_reponse_room_id
Add room_id to the response of `rooms/{roomId}/join`
2018-04-05 17:29:06 +01:00
Richard van der Hoff
3449da3bc7 Merge pull request #3068 from matrix-org/rav/fix_cache_invalidation
Improve database cache performance
2018-04-05 17:21:44 +01:00
Richard van der Hoff
d1679a4ed7 Merge pull request #3066 from matrix-org/rav/remove_redundant_metrics
Remove redundant metrics which were deprecated in 0.27.0.
2018-04-05 17:21:18 +01:00
Richard van der Hoff
01afc563c3 Fix overzealous cache invalidation
Fixes an issue where a cache invalidation would invalidate *all* pending
entries, rather than just the entry that we intended to invalidate.
2018-04-05 16:24:04 +01:00
Luke Barnard
e089100c62 Merge pull request #3045 from matrix-org/dbkr/group_joinable
Add joinability for groups
2018-04-05 15:57:49 +01:00
Neil Johnson
68b0ee4e8d Merge pull request #3041 from matrix-org/r30_stats
R30 stats
2018-04-05 15:37:37 +01:00
Richard van der Hoff
22284a6f65 Merge pull request #3060 from matrix-org/rav/kill_event_content
Remove uses of events.content
2018-04-05 15:02:17 +01:00
Luke Barnard
917380e89d NON NULL -> NOT NULL 2018-04-05 14:32:12 +01:00
Luke Barnard
104c0bc1d5 Use "/settings/" (plural) 2018-04-05 14:07:16 +01:00
Luke Barnard
700e5e7198 Use DEFAULT join_policy of "invite" in db 2018-04-05 14:01:17 +01:00
Luke Barnard
b214a04ffc Document set_group_join_policy 2018-04-05 13:29:16 +01:00
Neil Johnson
0e5f479fc0 Review comments
Use iteritems over item to loop over dict
formatting
2018-04-05 12:16:46 +01:00
Richard van der Hoff
518f6de088 Remove redundant metrics which were deprecated in 0.27.0. 2018-04-04 19:46:28 +01:00
Travis Ralston
88964b987e Merge remote-tracking branch 'matrix-org/develop' into travis/new-worker-docs 2018-04-04 08:46:56 -06:00
Travis Ralston
204fc98520 Document the additional routes for the event_creator worker
Fixes https://github.com/matrix-org/synapse/issues/3018

Signed-off-by: Travis Ralston <travpc@gmail.com>
2018-04-04 08:46:17 -06:00
Travis Ralston
301b339494 Move the mention of the main synapse worker higher up
Signed-off-by: Travis Ralston <travpc@gmail.com>
2018-04-04 08:45:51 -06:00
Adrian Tschira
6168351877 Add b prefixes to some strings that are bytes in py3
This has no effect on python2

Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-04-04 13:48:51 +02:00
Silke
72251d1b97 Remove address resolution of hosts in SRV records
Signed-off-by: Silke Hofstra <silke@slxh.eu>
2018-04-04 12:26:50 +02:00
Richard van der Hoff
a9a74101a4 Document the behaviour of ResponseCache
it looks like everything that uses ResponseCache expects to have to
`make_deferred_yieldable` its results. It's debatable whether that is the best
approach, but let's document it for now to avoid further confusion.
2018-04-04 09:06:22 +01:00
Luke Barnard
eb8d8d6f57 Use join_policy API instead of joinable
The API is now under
 /groups/$group_id/setting/m.join_policy

and expects a JSON blob of the shape

```json
{
  "m.join_policy": {
    "type": "invite"
  }
}
```

where "invite" could alternatively be "open".
2018-04-03 16:16:40 +01:00
Richard van der Hoff
2fe3f848b9 Remove uses of events.content 2018-03-29 23:17:12 +01:00
Neil Johnson
b4e37c6f50 pep8 2018-03-29 17:27:39 +01:00
Neil Johnson
9ee44a372d Remove need for sqlite specific query 2018-03-29 16:45:34 +01:00
Neil Johnson
dc7c020b33 fix pep8 errors 2018-03-28 17:25:15 +01:00
Neil Johnson
16aeb41547 Update README.rst
update docker hub url
2018-03-28 16:47:56 +01:00
David Baker
c5de6987c2 This should probably be a PUT 2018-03-28 16:44:11 +01:00
Neil Johnson
241e4e8687 remove twisted deferral cruft 2018-03-28 16:25:53 +01:00
David Baker
929b34963d OK, smallint it is then 2018-03-28 14:53:55 +01:00
David Baker
a838444a70 Grr. Copy the definition from is_admin 2018-03-28 14:50:30 +01:00
Neil Johnson
4262aba17b bump schema version 2018-03-28 14:40:03 +01:00
Neil Johnson
86932be2cb Support multi client R30 for psql 2018-03-28 14:36:53 +01:00
David Baker
32260baa41 pep8 2018-03-28 14:29:42 +01:00
David Baker
a164270833 Make column definition that works on both dbs 2018-03-28 14:23:00 +01:00
David Baker
352e1ff9ed Add schema delta file 2018-03-28 14:07:57 +01:00
David Baker
79452edeee Add joinability for groups
Adds API to set the 'joinable' flag, and corresponding flag in the
table.
2018-03-28 14:03:37 +01:00
Krombel
6152e253d8 Merge branch 'develop' of into allow_auto_join_rooms 2018-03-28 14:45:28 +02:00
Neil Johnson
792d340572 rename stat to future proof 2018-03-28 12:25:02 +01:00
Neil Johnson
788e69098c Add user_ips last seen index 2018-03-28 12:03:13 +01:00
Neil Johnson
0f890f477e No need to cast in count_daily_users 2018-03-28 11:49:57 +01:00
Neil Johnson
a32d2548d9 query and call for r30 stats 2018-03-28 10:39:13 +01:00
Neil Johnson
9187e0762f count_daily_users failed if db was sqlite due to type failure - presumably this prevcented all sqlite homeservers reporting home 2018-03-28 10:02:32 +01:00
Krombel
91ea0202e6 move handling of auto_join_rooms to RegisterHandler
Currently the handling of auto_join_rooms only works when a user
registers itself via public register api. Registrations via
registration_shared_secret and ModuleApi do not work

This auto_joins the users in the registration handler which enables
the auto join feature for all 3 registration paths.

This is related to issue #2725

Signed-Off-by: Matthias Kesler <krombel@krombel.de>
2018-03-14 16:45:37 +01:00
Jonas Platte
47ce527f45 Add room_id to the response of rooms/{roomId}/join
Fixes #2349
2018-03-13 14:48:12 +01:00
dklug
af7ed8e1ef Return 401 for invalid access_token on logout
Signed-off-by: Duncan Klug <dklug@ucmerced.edu>
2018-03-02 22:01:27 -08:00
Travis Ralston
923d9300ed Add a blurb explaining the main synapse worker
Signed-off-by: Travis Ralston <travpc@gmail.com>
2018-02-17 21:53:46 -07:00
85 changed files with 1743 additions and 634 deletions

View File

@@ -1,3 +1,143 @@
Changes in synapse v0.28.1 (2018-05-01)
=======================================
SECURITY UPDATE
* Clamp the allowed values of event depth received over federation to be
[0, 2^63 - 1]. This mitigates an attack where malicious events
injected with depth = 2^63 - 1 render rooms unusable. Depth is used to
determine the cosmetic ordering of events within a room, and so the ordering
of events in such a room will default to using stream_ordering rather than depth
(topological_ordering).
This is a temporary solution to mitigate abuse in the wild, whilst a long term solution
is being implemented to improve how the depth parameter is used.
Full details at
https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI
* Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API.
Changes in synapse v0.28.0 (2018-04-26)
=======================================
Bug Fixes:
* Fix quarantine media admin API and search reindex (PR #3130)
* Fix media admin APIs (PR #3134)
Changes in synapse v0.28.0-rc1 (2018-04-24)
===========================================
Minor performance improvement to federation sending and bug fixes.
(Note: This release does not include state resolutions discussed in matrix live)
Features:
* Add metrics for event processing lag (PR #3090)
* Add metrics for ResponseCache (PR #3092)
Changes:
* Synapse on PyPy (PR #2760) Thanks to @Valodim!
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
* Document the behaviour of ResponseCache (PR #3059)
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
* Send federation events concurrently (PR #3078)
* Limit concurrent event sends for a room (PR #3079)
* Improve R30 stat definition (PR #3086)
* Send events to ASes concurrently (PR #3088)
* Refactor ResponseCache usage (PR #3093)
* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
* Refactor store.have_events (PR #3117)
Bug Fixes:
* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
* fix federation_domain_whitelist (PR #3099)
* Avoid creating events with huge numbers of prev_events (PR #3113)
* Reject events which have lots of prev_events (PR #3118)
Changes in synapse v0.27.4 (2018-04-13)
======================================
Changes:
* Update canonicaljson dependency (#3095)
Changes in synapse v0.27.3 (2018-04-11)
======================================
Bug fixes:
* URL quote path segments over federation (#3082)
Changes in synapse v0.27.3-rc2 (2018-04-09)
==========================================
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
Changes in synapse v0.27.3-rc1 (2018-04-09)
=======================================
Notable changes include API support for joinability of groups. Also new metrics
and phone home stats. Phone home stats include better visibility of system usage
so we can tweak synpase to work better for all users rather than our own experience
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
overal growth of the Matrix ecosystem. It is defined as:-
Counts the number of native 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days"
Features:
* Add joinability for groups (PR #3045)
* Implement group join API (PR #3046)
* Add counter metrics for calculating state delta (PR #3033)
* R30 stats (PR #3041)
* Measure time it takes to calculate state group ID (PR #3043)
* Add basic performance statistics to phone home (PR #3044)
* Add response size metrics (PR #3071)
* phone home cache size configurations (PR #3063)
Changes:
* Add a blurb explaining the main synapse worker (PR #2886) Thanks to @turt2live!
* Replace old style error catching with 'as' keyword (PR #3000) Thanks to @NotAFile!
* Use .iter* to avoid copies in StateHandler (PR #3006)
* Linearize calls to _generate_user_id (PR #3029)
* Remove last usage of ujson (PR #3030)
* Use simplejson throughout (PR #3048)
* Use static JSONEncoders (PR #3049)
* Remove uses of events.content (PR #3060)
* Improve database cache performance (PR #3068)
Bug fixes:
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
* Fix replication after switch to simplejson (PR #3015)
* 404 correctly on missing paths via NoResource (PR #3022)
* Fix error when claiming e2e keys from offline servers (PR #3034)
* fix tests/storage/test_user_directory.py (PR #3042)
* use PUT instead of POST for federating groups/m.join_policy (PR #3070) Thanks to @krombel!
* postgres port script: fix state_groups_pkey error (PR #3072)
Changes in synapse v0.27.2 (2018-03-26)
=======================================

View File

@@ -157,8 +157,8 @@ if you prefer.
In case of problems, please see the _`Troubleshooting` section below.
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
@@ -614,6 +614,9 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
$ dig -t srv _matrix._tcp.example.com
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
Note that the server hostname cannot be an alias (CNAME record): it has to point
directly to the server hosting the synapse instance.
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
its user-ids, by setting ``server_name``::

10
contrib/README.rst Normal file
View File

@@ -0,0 +1,10 @@
Community Contributions
=======================
Everything in this directory are projects submitted by the community that may be useful
to others. As such, the project maintainers cannot guarantee support, stability
or backwards compatibility of these projects.
Files in this directory should *not* be relied on directly, as they may not
continue to work or exist in future. If you wish to use any of these files then
they should be copied to avoid them breaking from underneath you.

View File

@@ -22,6 +22,8 @@ import argparse
from synapse.events import FrozenEvent
from synapse.util.frozenutils import unfreeze
from six import string_types
def make_graph(file_name, room_id, file_prefix, limit):
print "Reading lines"
@@ -58,7 +60,7 @@ def make_graph(file_name, room_id, file_prefix, limit):
for key, value in unfreeze(event.get_dict()["content"]).items():
if value is None:
value = "<null>"
elif isinstance(value, basestring):
elif isinstance(value, string_types):
pass
else:
value = json.dumps(value)

View File

@@ -202,11 +202,11 @@ new PromConsole.Graph({
<h1>Requests</h1>
<h3>Requests by Servlet</h3>
<div id="synapse_http_server_requests_servlet"></div>
<div id="synapse_http_server_request_count_servlet"></div>
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_requests_servlet"),
expr: "rate(synapse_http_server_requests:servlet[2m])",
node: document.querySelector("#synapse_http_server_request_count_servlet"),
expr: "rate(synapse_http_server_request_count:servlet[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -215,11 +215,11 @@ new PromConsole.Graph({
})
</script>
<h4>&nbsp;(without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
<div id="synapse_http_server_requests_servlet_minus_events"></div>
<div id="synapse_http_server_request_count_servlet_minus_events"></div>
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
node: document.querySelector("#synapse_http_server_request_count_servlet_minus_events"),
expr: "rate(synapse_http_server_request_count:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -233,7 +233,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_time_avg"),
expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
expr: "rate(synapse_http_server_response_time_seconds[2m]) / rate(synapse_http_server_response_count[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -276,7 +276,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_ru_utime"),
expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
expr: "rate(synapse_http_server_response_ru_utime_seconds[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -291,7 +291,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
expr: "rate(synapse_http_server_response_db_txn_duration_seconds[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -306,7 +306,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_send_time_avg"),
expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
expr: "rate(synapse_http_server_response_time_second{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,

View File

@@ -1,10 +1,10 @@
synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])

View File

@@ -5,19 +5,19 @@ groups:
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- record: "synapse_federation_transaction_queue_pendingPdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- record: 'synapse_http_server_requests:method'
- record: 'synapse_http_server_request_count:method'
labels:
servlet: ""
expr: "sum(synapse_http_server_requests) by (method)"
- record: 'synapse_http_server_requests:servlet'
expr: "sum(synapse_http_server_request_count) by (method)"
- record: 'synapse_http_server_request_count:servlet'
labels:
method: ""
expr: 'sum(synapse_http_server_requests) by (servlet)'
expr: 'sum(synapse_http_server_request_count) by (servlet)'
- record: 'synapse_http_server_requests:total'
- record: 'synapse_http_server_request_count:total'
labels:
servlet: ""
expr: 'sum(synapse_http_server_requests:by_method) by (servlet)'
expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
- record: 'synapse_cache:hit_ratio_5m'
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'

View File

@@ -2,6 +2,9 @@
# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
# rather than in a user home directory or similar under virtualenv.
# **NOTE:** This is an example service file that may change in the future. If you
# wish to use this please copy rather than symlink it.
[Unit]
Description=Synapse Matrix homeserver
@@ -12,6 +15,7 @@ Group=synapse
WorkingDirectory=/var/lib/synapse
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
# EnvironmentFile=-/etc/sysconfig/synapse # Can be used to e.g. set SYNAPSE_CACHE_FACTOR
[Install]
WantedBy=multi-user.target

View File

@@ -55,7 +55,12 @@ synapse process.)
You then create a set of configs for the various worker processes. These
should be worker configuration files, and should be stored in a dedicated
subdirectory, to allow synctl to manipulate them.
subdirectory, to allow synctl to manipulate them. An additional configuration
for the master synapse process will need to be created because the process will
not be started automatically. That configuration should look like this::
worker_app: synapse.app.homeserver
daemonize: true
Each worker configuration file inherits the configuration of the main homeserver
configuration file. You can then override configuration specific to that worker,
@@ -230,9 +235,11 @@ file. For example::
``synapse.app.event_creator``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles non-state event creation. It can handle REST endpoints matching::
Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +30,8 @@ import time
import traceback
import yaml
from six import string_types
logger = logging.getLogger("synapse_port_db")
@@ -250,6 +253,12 @@ class Porter(object):
@defer.inlineCallbacks
def handle_table(self, table, postgres_size, table_size, forward_chunk,
backward_chunk):
logger.info(
"Table %s: %i/%i (rows %i-%i) already ported",
table, postgres_size, table_size,
backward_chunk+1, forward_chunk-1,
)
if not table_size:
return
@@ -467,31 +476,10 @@ class Porter(object):
self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
)
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
)
tables = set(sqlite_tables) & set(postgres_tables)
self.progress.set_state("Creating tables")
logger.info("Found %d tables", len(tables))
self.progress.set_state("Creating port tables")
def create_port_table(txn):
txn.execute(
"CREATE TABLE port_from_sqlite3 ("
"CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
" table_name varchar(100) NOT NULL UNIQUE,"
" forward_rowid bigint NOT NULL,"
" backward_rowid bigint NOT NULL"
@@ -517,18 +505,33 @@ class Porter(object):
"alter_table", alter_table
)
except Exception as e:
logger.info("Failed to create port table: %s", e)
pass
try:
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
except Exception as e:
logger.info("Failed to create port table: %s", e)
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
self.progress.set_state("Setting up")
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
)
# Set up tables.
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
)
tables = set(sqlite_tables) & set(postgres_tables)
logger.info("Found %d tables", len(tables))
# Step 3. Figure out what still needs copying
self.progress.set_state("Checking on port progress")
setup_res = yield defer.gatherResults(
[
self.setup_table(table)
@@ -539,7 +542,8 @@ class Porter(object):
consumeErrors=True,
)
# Process tables.
# Step 4. Do the copying.
self.progress.set_state("Copying to postgres")
yield defer.gatherResults(
[
self.handle_table(*res)
@@ -548,6 +552,9 @@ class Porter(object):
consumeErrors=True,
)
# Step 5. Do final post-processing
yield self._setup_state_group_id_seq()
self.progress.done()
except:
global end_error_exec_info
@@ -569,7 +576,7 @@ class Porter(object):
def conv(j, col):
if j in bool_cols:
return bool(col)
elif isinstance(col, basestring) and "\0" in col:
elif isinstance(col, string_types) and "\0" in col:
logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col)
raise BadValueException();
return col
@@ -707,6 +714,16 @@ class Porter(object):
defer.returnValue((done, remaining + done))
def _setup_state_group_id_seq(self):
def r(txn):
txn.execute("SELECT MAX(id) FROM state_groups")
next_id = txn.fetchone()[0]+1
txn.execute(
"ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
(next_id,),
)
return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
##############################################
###### The following is simply UI stuff ######

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.27.2"
__version__ = "0.28.1"

View File

@@ -204,8 +204,8 @@ class Auth(object):
ip_addr = self.hs.get_ip_from_request(request)
user_agent = request.requestHeaders.getRawHeaders(
"User-Agent",
default=[""]
b"User-Agent",
default=[b""]
)[0]
if user and access_token and ip_addr:
self.store.insert_client_ip(
@@ -672,7 +672,7 @@ def has_access_token(request):
bool: False if no access_token was given, True otherwise.
"""
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
return bool(query_params) or bool(auth_headers)
@@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401):
AuthError: If there isn't an access_token in the request.
"""
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
query_params = request.args.get(b"access_token")
if auth_headers:
# Try the get the access_token from a "Authorization: Bearer"
# header

View File

@@ -16,6 +16,9 @@
"""Contains constants from the specification."""
# the "depth" field on events is limited to 2**63 - 1
MAX_DEPTH = 2**63 - 1
class Membership(object):

View File

@@ -18,6 +18,7 @@
import logging
import simplejson as json
from six import iteritems
logger = logging.getLogger(__name__)
@@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
A dict representing the error response JSON.
"""
err = {"error": msg, "errcode": code}
for key, value in kwargs.iteritems():
for key, value in iteritems(kwargs):
err[key] = value
return err

View File

@@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet):
# They're actually trying to upload something, proxy to main synapse.
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
headers = {
"Authorization": auth_headers,
}

View File

@@ -430,6 +430,10 @@ def run(hs):
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in r30_results.iteritems():
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR

View File

@@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
from six import iteritems
logger = logging.getLogger("synapse.app.synchrotron")
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
def get_currently_syncing_users(self):
return [
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]

View File

@@ -252,6 +252,7 @@ def main():
for running_pid in running_pids:
while pid_running(running_pid):
time.sleep(0.2)
write("All processes exited; now restarting...")
if action == "start" or action == "restart":
if start_stop_synapse:

View File

@@ -21,6 +21,8 @@ from twisted.internet import defer
import logging
import re
from six import string_types
logger = logging.getLogger(__name__)
@@ -146,7 +148,7 @@ class ApplicationService(object):
)
regex = regex_obj.get("regex")
if isinstance(regex, basestring):
if isinstance(regex, string_types):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError(

View File

@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@@ -73,7 +72,8 @@ class ApplicationServiceApi(SimpleHttpClient):
super(ApplicationServiceApi, self).__init__(hs)
self.clock = hs.get_clock()
self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
timeout_ms=HOUR_IN_MS)
@defer.inlineCallbacks
def query_user(self, service, user_id):
@@ -193,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(None)
key = (service.id, protocol)
result = self.protocol_meta_cache.get(key)
if not result:
result = self.protocol_meta_cache.set(
key, preserve_fn(_get)()
)
return make_deferred_yieldable(result)
return self.protocol_meta_cache.wrap(key, _get)
@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):

View File

@@ -19,6 +19,8 @@ import os
import yaml
from textwrap import dedent
from six import integer_types
class ConfigError(Exception):
pass
@@ -49,7 +51,7 @@ Missing mandatory `server_name` config option.
class Config(object):
@staticmethod
def parse_size(value):
if isinstance(value, int) or isinstance(value, long):
if isinstance(value, integer_types):
return value
sizes = {"K": 1024, "M": 1024 * 1024}
size = 1
@@ -61,7 +63,7 @@ class Config(object):
@staticmethod
def parse_duration(value):
if isinstance(value, int) or isinstance(value, long):
if isinstance(value, integer_types):
return value
second = 1000
minute = 60 * second
@@ -288,22 +290,22 @@ class Config(object):
)
obj.invoke_all("generate_files", config)
config_file.write(config_bytes)
print (
print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
" certificates. Please review this file and customise it"
" to your needs."
) % (config_path, server_name)
print (
) % (config_path, server_name))
print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
print (
print((
"Config file %r already exists. Generating any missing key"
" files."
) % (config_path,)
) % (config_path,))
generate_keys = True
parser = argparse.ArgumentParser(

View File

@@ -21,6 +21,8 @@ import urllib
import yaml
import logging
from six import string_types
logger = logging.getLogger(__name__)
@@ -89,14 +91,14 @@ def _load_appservice(hostname, as_info, config_filename):
"id", "as_token", "hs_token", "sender_localpart"
]
for field in required_string_fields:
if not isinstance(as_info.get(field), basestring):
if not isinstance(as_info.get(field), string_types):
raise KeyError("Required string field: '%s' (%s)" % (
field, config_filename,
))
# 'url' must either be a string or explicitly null, not missing
# to avoid accidentally turning off push for ASes.
if (not isinstance(as_info.get("url"), basestring) and
if (not isinstance(as_info.get("url"), string_types) and
as_info.get("url", "") is not None):
raise KeyError(
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
@@ -128,7 +130,7 @@ def _load_appservice(hostname, as_info, config_filename):
"Expected namespace entry in %s to be an object,"
" but got %s", ns, regex_obj
)
if not isinstance(regex_obj.get("regex"), basestring):
if not isinstance(regex_obj.get("regex"), string_types):
raise ValueError(
"Missing/bad type 'regex' key in %s", regex_obj
)

View File

@@ -352,7 +352,7 @@ class Keyring(object):
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e.message),
type(e).__name__, str(e),
)
defer.returnValue({})
@@ -384,7 +384,7 @@ class Keyring(object):
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e.message),
type(e).__name__, str(e),
)
if not keys:
@@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request):
except IOError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
server_name, type(e).__name__, str(e),
)
raise SynapseError(
502,
@@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request):
except Exception as e:
logger.exception(
"Got Exception when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e.message),
server_name, type(e).__name__, str(e),
)
raise SynapseError(
401,

View File

@@ -14,7 +14,10 @@
# limitations under the License.
import logging
from synapse.api.errors import SynapseError
import six
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import SynapseError, Codes
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
@@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
FrozenEvent
Raises:
SynapseError: if the pdu is missing required fields
SynapseError: if the pdu is missing required fields or is otherwise
not a valid matrix event
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
assert_params_in_request(pdu_json, ('event_id', 'type'))
assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
Codes.BAD_JSON)
if depth < 0:
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event = FrozenEvent(
pdu_json
)

View File

@@ -394,7 +394,7 @@ class FederationClient(FederationBase):
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_events(event_ids)
seen_events = yield self.store.have_seen_events(event_ids)
signed_events = []
failed_to_fetch = set()

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,9 +31,10 @@ import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
from six import iteritems
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -65,7 +67,7 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
@defer.inlineCallbacks
@log_function
@@ -212,16 +214,17 @@ class FederationServer(FederationBase):
if not in_room:
raise AuthError(403, "Host not in room.")
result = self._state_resp_cache.get((room_id, event_id))
if not result:
with (yield self._server_linearizer.queue((origin, room_id))):
d = self._state_resp_cache.set(
(room_id, event_id),
preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
)
resp = yield make_deferred_yieldable(d)
else:
resp = yield make_deferred_yieldable(result)
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (yield self._server_linearizer.queue((origin, room_id))):
resp = yield self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id, event_id,
)
defer.returnValue((200, resp))
@@ -425,9 +428,9 @@ class FederationServer(FederationBase):
"Claimed one-time-keys: %s",
",".join((
"%s for %s:%s" % (key_id, user_id, device_id)
for user_id, user_keys in json_result.iteritems()
for device_id, device_keys in user_keys.iteritems()
for key_id, _ in device_keys.iteritems()
for user_id, user_keys in iteritems(json_result)
for device_id, device_keys in iteritems(user_keys)
for key_id, _ in iteritems(device_keys)
)),
)
@@ -494,13 +497,33 @@ class FederationServer(FederationBase):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
(The error will then be logged and sent back to the sender (which
probably won't do anything with it), and other events in the
transaction will be processed as normal).
It is likely that we'll then receive other events which refer to
this rejected_event in their prev_events, etc. When that happens,
we'll attempt to fetch the rejected event again, which will presumably
fail, so those second-generation events will also get rejected.
Eventually, we get to the point where there are more than 10 events
between any new events and the original rejected event. Since we
only try to backfill 10 events deep on received pdu, we then accept the
new event, possibly introducing a discontinuity in the DAG, with new
forward extremities, so normal service is approximately returned,
until we try to backfill across the discontinuity.
Args:
origin (str): server which sent the pdu
pdu (FrozenEvent): received pdu
Returns (Deferred): completes with None
Raises: FederationError if the signatures / hash do not match
"""
Raises: FederationError if the signatures / hash do not match, or
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):

View File

@@ -40,6 +40,8 @@ from collections import namedtuple
import logging
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
for uids in self.presence_changed.itervalues()
for uids in itervalues(self.presence_changed)
for user_id in uids
)
@@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
for ((destination, edu_key), pos) in keyed_edus.iteritems():
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
for (destination, pos) in device_messages.iteritems():
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
destination=destination,
)))
@@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
for destination, edu_list in buff.edus.iteritems():
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
for destination, failure_list in buff.failures.iteritems():
for destination, failure_list in iteritems(buff.failures):
for failure in failure_list:
transaction_queue.send_failure(destination, failure)

View File

@@ -169,7 +169,7 @@ class TransactionQueue(object):
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=20,
last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +177,33 @@ class TransactionQueue(object):
if not events and next_token >= self._last_poked_id:
break
for event in events:
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
continue
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
destinations = set(destinations)
if send_on_behalf_of is not None:
@@ -207,12 +216,44 @@ class TransactionQueue(object):
self._send_pdu(event, destinations)
events_processed_counter.inc_by(len(events))
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
],
consumeErrors=True
))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
events_processed_counter.inc_by(len(events))
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
finally:
self._is_processing = False

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
import urllib
logger = logging.getLogger(__name__)
@@ -49,7 +51,7 @@ class TransportLayerClient(object):
logger.debug("get_room_state dest=%s, room=%s",
destination, room_id)
path = PREFIX + "/state/%s/" % room_id
path = _create_path(PREFIX, "/state/%s/", room_id)
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@@ -71,7 +73,7 @@ class TransportLayerClient(object):
logger.debug("get_room_state_ids dest=%s, room=%s",
destination, room_id)
path = PREFIX + "/state_ids/%s/" % room_id
path = _create_path(PREFIX, "/state_ids/%s/", room_id)
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@@ -93,7 +95,7 @@ class TransportLayerClient(object):
logger.debug("get_pdu dest=%s, event_id=%s",
destination, event_id)
path = PREFIX + "/event/%s/" % (event_id, )
path = _create_path(PREFIX, "/event/%s/", event_id)
return self.client.get_json(destination, path=path, timeout=timeout)
@log_function
@@ -119,7 +121,7 @@ class TransportLayerClient(object):
# TODO: raise?
return
path = PREFIX + "/backfill/%s/" % (room_id,)
path = _create_path(PREFIX, "/backfill/%s/", room_id)
args = {
"v": event_tuples,
@@ -157,9 +159,11 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
path=path,
data=json_data,
json_data_callback=json_data_callback,
long_retries=True,
@@ -177,7 +181,7 @@ class TransportLayerClient(object):
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail,
ignore_backoff=False):
path = PREFIX + "/query/%s" % query_type
path = _create_path(PREFIX, "/query/%s", query_type)
content = yield self.client.get_json(
destination=destination,
@@ -222,7 +226,7 @@ class TransportLayerClient(object):
"make_membership_event called with membership='%s', must be one of %s" %
(membership, ",".join(valid_memberships))
)
path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
ignore_backoff = False
retry_on_dns_fail = False
@@ -248,7 +252,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -261,7 +265,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_leave(self, destination, room_id, event_id, content):
path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -280,7 +284,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -322,7 +326,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def exchange_third_party_invite(self, destination, room_id, event_dict):
path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
response = yield self.client.put_json(
destination=destination,
@@ -335,7 +339,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
content = yield self.client.get_json(
destination=destination,
@@ -347,7 +351,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_query_auth(self, destination, room_id, event_id, content):
path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
content = yield self.client.post_json(
destination=destination,
@@ -409,7 +413,7 @@ class TransportLayerClient(object):
Returns:
A dict containg the device keys.
"""
path = PREFIX + "/user/devices/" + user_id
path = _create_path(PREFIX, "/user/devices/%s", user_id)
content = yield self.client.get_json(
destination=destination,
@@ -459,7 +463,7 @@ class TransportLayerClient(object):
@log_function
def get_missing_events(self, destination, room_id, earliest_events,
latest_events, limit, min_depth, timeout):
path = PREFIX + "/get_missing_events/%s" % (room_id,)
path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
content = yield self.client.post_json(
destination=destination,
@@ -479,7 +483,7 @@ class TransportLayerClient(object):
def get_group_profile(self, destination, group_id, requester_user_id):
"""Get a group profile
"""
path = PREFIX + "/groups/%s/profile" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
return self.client.get_json(
destination=destination,
@@ -498,7 +502,7 @@ class TransportLayerClient(object):
requester_user_id (str)
content (dict): The new profile of the group
"""
path = PREFIX + "/groups/%s/profile" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
return self.client.post_json(
destination=destination,
@@ -512,7 +516,7 @@ class TransportLayerClient(object):
def get_group_summary(self, destination, group_id, requester_user_id):
"""Get a group summary
"""
path = PREFIX + "/groups/%s/summary" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
return self.client.get_json(
destination=destination,
@@ -525,7 +529,7 @@ class TransportLayerClient(object):
def get_rooms_in_group(self, destination, group_id, requester_user_id):
"""Get all rooms in a group
"""
path = PREFIX + "/groups/%s/rooms" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
return self.client.get_json(
destination=destination,
@@ -538,7 +542,7 @@ class TransportLayerClient(object):
content):
"""Add a room to a group
"""
path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
return self.client.post_json(
destination=destination,
@@ -552,7 +556,10 @@ class TransportLayerClient(object):
config_key, content):
"""Update room in group
"""
path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
path = _create_path(
PREFIX, "/groups/%s/room/%s/config/%s",
group_id, room_id, config_key,
)
return self.client.post_json(
destination=destination,
@@ -565,7 +572,7 @@ class TransportLayerClient(object):
def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
"""Remove a room from a group
"""
path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
return self.client.delete_json(
destination=destination,
@@ -578,7 +585,7 @@ class TransportLayerClient(object):
def get_users_in_group(self, destination, group_id, requester_user_id):
"""Get users in a group
"""
path = PREFIX + "/groups/%s/users" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/users", group_id,)
return self.client.get_json(
destination=destination,
@@ -591,7 +598,7 @@ class TransportLayerClient(object):
def get_invited_users_in_group(self, destination, group_id, requester_user_id):
"""Get users that have been invited to a group
"""
path = PREFIX + "/groups/%s/invited_users" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
return self.client.get_json(
destination=destination,
@@ -604,7 +611,23 @@ class TransportLayerClient(object):
def accept_group_invite(self, destination, group_id, user_id, content):
"""Accept a group invite
"""
path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
path = _create_path(
PREFIX, "/groups/%s/users/%s/accept_invite",
group_id, user_id,
)
return self.client.post_json(
destination=destination,
path=path,
data=content,
ignore_backoff=True,
)
@log_function
def join_group(self, destination, group_id, user_id, content):
"""Attempts to join a group
"""
path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -617,7 +640,7 @@ class TransportLayerClient(object):
def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
"""Invite a user to a group
"""
path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -633,7 +656,7 @@ class TransportLayerClient(object):
invited.
"""
path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -647,7 +670,7 @@ class TransportLayerClient(object):
user_id, content):
"""Remove a user fron a group
"""
path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -664,7 +687,7 @@ class TransportLayerClient(object):
kicked from the group.
"""
path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -679,7 +702,7 @@ class TransportLayerClient(object):
the attestations
"""
path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -694,11 +717,12 @@ class TransportLayerClient(object):
"""Update a room entry in a group summary
"""
if category_id:
path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
path = _create_path(
PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
group_id, category_id, room_id,
)
else:
path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
return self.client.post_json(
destination=destination,
@@ -714,11 +738,12 @@ class TransportLayerClient(object):
"""Delete a room entry in a group summary
"""
if category_id:
path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
path = _create_path(
PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
group_id, category_id, room_id,
)
else:
path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
return self.client.delete_json(
destination=destination,
@@ -731,7 +756,7 @@ class TransportLayerClient(object):
def get_group_categories(self, destination, group_id, requester_user_id):
"""Get all categories in a group
"""
path = PREFIX + "/groups/%s/categories" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
return self.client.get_json(
destination=destination,
@@ -744,7 +769,7 @@ class TransportLayerClient(object):
def get_group_category(self, destination, group_id, requester_user_id, category_id):
"""Get category info in a group
"""
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
return self.client.get_json(
destination=destination,
@@ -758,7 +783,7 @@ class TransportLayerClient(object):
content):
"""Update a category in a group
"""
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
return self.client.post_json(
destination=destination,
@@ -773,7 +798,7 @@ class TransportLayerClient(object):
category_id):
"""Delete a category in a group
"""
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
return self.client.delete_json(
destination=destination,
@@ -786,7 +811,7 @@ class TransportLayerClient(object):
def get_group_roles(self, destination, group_id, requester_user_id):
"""Get all roles in a group
"""
path = PREFIX + "/groups/%s/roles" % (group_id,)
path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
return self.client.get_json(
destination=destination,
@@ -799,7 +824,7 @@ class TransportLayerClient(object):
def get_group_role(self, destination, group_id, requester_user_id, role_id):
"""Get a roles info
"""
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
return self.client.get_json(
destination=destination,
@@ -813,7 +838,7 @@ class TransportLayerClient(object):
content):
"""Update a role in a group
"""
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
return self.client.post_json(
destination=destination,
@@ -827,7 +852,7 @@ class TransportLayerClient(object):
def delete_group_role(self, destination, group_id, requester_user_id, role_id):
"""Delete a role in a group
"""
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
return self.client.delete_json(
destination=destination,
@@ -842,11 +867,12 @@ class TransportLayerClient(object):
"""Update a users entry in a group
"""
if role_id:
path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
path = _create_path(
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
group_id, role_id, user_id,
)
else:
path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
return self.client.post_json(
destination=destination,
@@ -856,17 +882,33 @@ class TransportLayerClient(object):
ignore_backoff=True,
)
@log_function
def set_group_join_policy(self, destination, group_id, requester_user_id,
content):
"""Sets the join policy for a group
"""
path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
return self.client.put_json(
destination=destination,
path=path,
args={"requester_user_id": requester_user_id},
data=content,
ignore_backoff=True,
)
@log_function
def delete_group_summary_user(self, destination, group_id, requester_user_id,
user_id, role_id):
"""Delete a users entry in a group
"""
if role_id:
path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
path = _create_path(
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
group_id, role_id, user_id,
)
else:
path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
return self.client.delete_json(
destination=destination,
@@ -889,3 +931,22 @@ class TransportLayerClient(object):
data=content,
ignore_backoff=True,
)
def _create_path(prefix, path, *args):
"""Creates a path from the prefix, path template and args. Ensures that
all args are url encoded.
Example:
_create_path(PREFIX, "/event/%s/", event_id)
Args:
prefix (str)
path (str): String template for the path
args: ([str]): Args to insert into path. Each arg will be url encoded
Returns:
str
"""
return prefix + path % tuple(urllib.quote(arg, "") for arg in args)

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -93,12 +94,6 @@ class Authenticator(object):
"signatures": {},
}
if (
self.federation_domain_whitelist is not None and
self.server_name not in self.federation_domain_whitelist
):
raise FederationDeniedError(self.server_name)
if content is not None:
json_request["content"] = content
@@ -137,6 +132,12 @@ class Authenticator(object):
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
if (
self.federation_domain_whitelist is not None and
origin not in self.federation_domain_whitelist
):
raise FederationDeniedError(origin)
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
@@ -802,6 +803,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
class FederationGroupsJoinServlet(BaseFederationServlet):
"""Attempt to join a group
"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
if get_domain_from_id(user_id) != origin:
raise SynapseError(403, "user_id doesn't match origin")
new_content = yield self.handler.join_group(
group_id, user_id, content,
)
defer.returnValue((200, new_content))
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
"""Leave or kick a user from the group
"""
@@ -1124,6 +1142,24 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
defer.returnValue((200, resp))
class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
"""Sets whether a group is joinable without an invite or knock
"""
PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, group_id):
requester_user_id = parse_string_from_args(query, "requester_user_id")
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
new_content = yield self.handler.set_group_join_policy(
group_id, requester_user_id, content
)
defer.returnValue((200, new_content))
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -1163,6 +1199,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsInvitedUsersServlet,
FederationGroupsInviteServlet,
FederationGroupsAcceptInviteServlet,
FederationGroupsJoinServlet,
FederationGroupsRemoveUserServlet,
FederationGroupsSummaryRoomsServlet,
FederationGroupsCategoriesServlet,
@@ -1172,6 +1209,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsSummaryUsersServlet,
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet,
)

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -205,6 +206,28 @@ class GroupsServerHandler(object):
defer.returnValue({})
@defer.inlineCallbacks
def set_group_join_policy(self, group_id, requester_user_id, content):
"""Sets the group join policy.
Currently supported policies are:
- "invite": an invite must be received and accepted in order to join.
- "open": anyone can join.
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
join_policy = _parse_join_policy_from_contents(content)
if join_policy is None:
raise SynapseError(
400, "No value specified for 'm.join_policy'"
)
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
defer.returnValue({})
@defer.inlineCallbacks
def get_group_categories(self, group_id, requester_user_id):
"""Get all categories in a group (as seen by user)
@@ -381,9 +404,16 @@ class GroupsServerHandler(object):
yield self.check_group_is_ours(group_id, requester_user_id)
group_description = yield self.store.get_group(group_id)
group = yield self.store.get_group(group_id)
if group:
cols = [
"name", "short_description", "long_description",
"avatar_url", "is_public",
]
group_description = {key: group[key] for key in cols}
group_description["is_openly_joinable"] = group["join_policy"] == "open"
if group_description:
defer.returnValue(group_description)
else:
raise SynapseError(404, "Unknown group")
@@ -654,6 +684,40 @@ class GroupsServerHandler(object):
else:
raise SynapseError(502, "Unknown state returned by HS")
@defer.inlineCallbacks
def _add_user(self, group_id, user_id, content):
"""Add a user to a group based on a content dict.
See accept_invite, join_group.
"""
if not self.hs.is_mine_id(user_id):
local_attestation = self.attestations.create_attestation(
group_id, user_id,
)
remote_attestation = content["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
user_id=user_id,
group_id=group_id,
)
else:
local_attestation = None
remote_attestation = None
is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_group(
group_id, user_id,
is_admin=False,
is_public=is_public,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
defer.returnValue(local_attestation)
@defer.inlineCallbacks
def accept_invite(self, group_id, requester_user_id, content):
"""User tries to accept an invite to the group.
@@ -670,30 +734,27 @@ class GroupsServerHandler(object):
if not is_invited:
raise SynapseError(403, "User not invited to group")
if not self.hs.is_mine_id(requester_user_id):
local_attestation = self.attestations.create_attestation(
group_id, requester_user_id,
)
remote_attestation = content["attestation"]
local_attestation = yield self._add_user(group_id, requester_user_id, content)
yield self.attestations.verify_attestation(
remote_attestation,
user_id=requester_user_id,
group_id=group_id,
)
else:
local_attestation = None
remote_attestation = None
defer.returnValue({
"state": "join",
"attestation": local_attestation,
})
is_public = _parse_visibility_from_contents(content)
@defer.inlineCallbacks
def join_group(self, group_id, requester_user_id, content):
"""User tries to join the group.
yield self.store.add_user_to_group(
group_id, requester_user_id,
is_admin=False,
is_public=is_public,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
This will error if the group requires an invite/knock to join
"""
group_info = yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True
)
if group_info['join_policy'] != "open":
raise SynapseError(403, "Group is not publicly joinable")
local_attestation = yield self._add_user(group_id, requester_user_id, content)
defer.returnValue({
"state": "join",
@@ -835,6 +896,31 @@ class GroupsServerHandler(object):
})
def _parse_join_policy_from_contents(content):
"""Given a content for a request, return the specified join policy or None
"""
join_policy_dict = content.get("m.join_policy")
if join_policy_dict:
return _parse_join_policy_dict(join_policy_dict)
else:
return None
def _parse_join_policy_dict(join_policy_dict):
"""Given a dict for the "m.join_policy" config return the join policy specified
"""
join_policy_type = join_policy_dict.get("type")
if not join_policy_type:
return "invite"
if join_policy_type not in ("invite", "open"):
raise SynapseError(
400, "Synapse only supports 'invite'/'open' join rule"
)
return join_policy_type
def _parse_visibility_from_contents(content):
"""Given a content for a request parse out whether the entity should be
public or not

View File

@@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)
import logging
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events:
break
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@defer.inlineCallbacks
def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
continue # no services need notifying
return # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
@@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
service, event
)
events_processed_counter.inc_by(len(events))
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
events_processed_counter.inc_by(len(events))
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
finally:
self.is_processing = False

View File

@@ -15,8 +15,14 @@
# limitations under the License.
"""Contains handlers for federation events."""
import httplib
import itertools
import logging
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
from twisted.internet import defer
import itertools
import logging
logger = logging.getLogger(__name__)
@@ -115,6 +117,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
# could cause a huge state resolution or cascade of event fetches.
try:
self._sanity_check_event(pdu)
except SynapseError as err:
raise FederationError(
"ERROR",
err.code,
err.msg,
affected=pdu.event_id,
)
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@@ -149,10 +164,6 @@ class FederationHandler(BaseHandler):
auth_chain = []
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
fetch_state = False
# Get missing pdus if necessary.
@@ -168,7 +179,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -196,8 +207,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.iterkeys())
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@@ -248,8 +258,7 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
@@ -361,9 +370,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
seen_ids = set(
(yield self.store.have_events(event_ids)).keys()
)
seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -527,9 +534,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. This may return
be successfull and still return no events if the other side has no new
events to offer.
This will attempt to get more events from the remote. If the other side
has no new events to offer, this will return an empty list.
As the events are received, we check their signatures, and also do some
sanity-checking on them. If any of the backfilled events are invalid,
this method throws a SynapseError.
TODO: make this more useful to distinguish failures of the remote
server from invalid events (there is probably no point in trying to
re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +555,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
# ideally we'd sanity check the events here for excess prev_events etc,
# but it's hard to reject events at this point without completely
# breaking backfill in the same way that it is currently broken by
# events whose signature we cannot verify (#3121).
#
# So for now we accept the events anyway. #3124 tracks this.
#
# for ev in events:
# self._sanity_check_event(ev)
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -633,7 +657,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
seen_events = yield self.store.have_events(
seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -843,6 +867,38 @@ class FederationHandler(BaseHandler):
defer.returnValue(False)
def _sanity_check_event(self, ev):
"""
Do some early sanity checks of a received event
In particular, checks it doesn't have an excessive number of
prev_events or auth_events, which could cause a huge state resolution
or cascade of event fetches.
Args:
ev (synapse.events.EventBase): event to be checked
Returns: None
Raises:
SynapseError if the event does not pass muster
"""
if len(ev.prev_events) > 20:
logger.warn("Rejecting event %s which has %i prev_events",
ev.event_id, len(ev.prev_events))
raise SynapseError(
httplib.BAD_REQUEST,
"Too many prev_events",
)
if len(ev.auth_events) > 10:
logger.warn("Rejecting event %s which has %i auth_events",
ev.event_id, len(ev.auth_events))
raise SynapseError(
httplib.BAD_REQUEST,
"Too many auth_events",
)
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
have_events = yield self.store.have_events(
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
seen_remotes = yield self.store.have_events(
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
if e.event_id in seen_remotes.keys():
if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
have_events = yield self.store.have_events(
have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
@@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
seen_remotes = yield self.store.have_events(
seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
if ev.event_id in seen_remotes.keys():
if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -90,6 +91,8 @@ class GroupsLocalHandler(object):
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
set_group_join_policy = _create_rerouter("set_group_join_policy")
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@@ -226,7 +229,45 @@ class GroupsLocalHandler(object):
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
raise NotImplementedError() # TODO
if self.is_mine_id(group_id):
yield self.groups_server_handler.join_group(
group_id, user_id, content
)
local_attestation = None
remote_attestation = None
else:
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
res = yield self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content,
)
remote_attestation = res["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
group_id=group_id,
user_id=user_id,
server_name=get_domain_from_id(group_id),
)
# TODO: Check that the group is public and we're being added publically
is_publicised = content.get("publicise", False)
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=False,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
is_publicised=is_publicised,
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
defer.returnValue({})
@defer.inlineCallbacks
def accept_invite(self, group_id, user_id, content):

View File

@@ -16,7 +16,7 @@
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
@@ -37,7 +37,6 @@ from ._base import BaseHandler
from canonicaljson import encode_canonical_json
import logging
import random
import simplejson
logger = logging.getLogger(__name__)
@@ -433,7 +432,7 @@ class EventCreationHandler(object):
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_event_ids=None):
prev_events_and_hashes=None):
"""
Given a dict from a client, create a new event.
@@ -447,47 +446,52 @@ class EventCreationHandler(object):
event_dict (dict): An entire event
token_id (str)
txn_id (str)
prev_event_ids (list): The prev event ids to use when creating the event
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Returns:
Tuple of created event (FrozenEvent), Context
"""
builder = self.event_builder_factory.new(event_dict)
with (yield self.limiter.queue(builder.room_id)):
self.validator.validate_new(builder)
self.validator.validate_new(builder)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
try:
if "displayname" not in content:
content["displayname"] = yield profile.get_displayname(target)
if "avatar_url" not in content:
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
target, e
)
try:
if "displayname" not in content:
content["displayname"] = yield profile.get_displayname(target)
if "avatar_url" not in content:
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
target, e
)
if token_id is not None:
builder.internal_metadata.token_id = token_id
if token_id is not None:
builder.internal_metadata.token_id = token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
)
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_events_and_hashes=prev_events_and_hashes,
)
defer.returnValue((event, context))
@@ -557,64 +561,80 @@ class EventCreationHandler(object):
See self.create_event and self.send_nonmember_event.
"""
event, context = yield self.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
)
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, basestring):
spam_error = "Spam is not permitted here"
raise SynapseError(
403, spam_error, Codes.FORBIDDEN
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
# a situation where event persistence can't keep up, causing
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (yield self.limiter.queue(event_dict["room_id"])):
event, context = yield self.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
)
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
)
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, basestring):
spam_error = "Spam is not permitted here"
raise SynapseError(
403, spam_error, Codes.FORBIDDEN
)
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
)
defer.returnValue(event)
@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
depth = prev_max_depth + 1
else:
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
builder.room_id,
def create_new_client_event(self, builder, requester=None,
prev_events_and_hashes=None):
"""Create a new event for a local client
Args:
builder (EventBuilder):
requester (synapse.types.Requester|None):
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Returns:
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
if prev_events_and_hashes is not None:
assert len(prev_events_and_hashes) <= 10, \
"Attempting to create an event with %i prev_events" % (
len(prev_events_and_hashes),
)
else:
prev_events_and_hashes = \
yield self.store.get_prev_events_for_room(builder.room_id)
# We want to limit the max number of prev events we point to in our
# new event
if len(latest_ret) > 10:
# Sort by reverse depth, so we point to the most recent.
latest_ret.sort(key=lambda a: -a[2])
new_latest_ret = latest_ret[:5]
if prev_events_and_hashes:
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
else:
depth = 1
# We also randomly point to some of the older events, to make
# sure that we don't completely ignore the older events.
if latest_ret[5:]:
sample_size = min(5, len(latest_ret[5:]))
new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
latest_ret = new_latest_ret
if latest_ret:
depth = max([d for _, _, d in latest_ret]) + 1
else:
depth = 1
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in latest_ret
]
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
builder.prev_events = prev_events
builder.depth = depth

View File

@@ -23,7 +23,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID
from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler):
token = None
attempts += 1
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
# rather than there being consistent matrix-wide ones, so we don't.
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue((user_id, access_token))
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
)

View File

@@ -20,7 +20,6 @@ from ._base import BaseHandler
from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
self.response_cache = ResponseCache(hs, "room_list")
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
@@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler):
)
key = (limit, since_token, network_tuple)
result = self.response_cache.get(key)
if not result:
logger.info("No cached result, calculating one.")
result = self.response_cache.set(
key,
preserve_fn(self._get_public_room_list)(
limit, since_token, network_tuple=network_tuple
)
)
else:
logger.info("Using cached deferred result.")
return make_deferred_yieldable(result)
return self.response_cache.wrap(
key,
self._get_public_room_list,
limit, since_token, network_tuple=network_tuple,
)
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
@@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler):
server_name, limit, since_token, include_all_networks,
third_party_instance_id,
)
result = self.remote_response_cache.get(key)
if not result:
result = self.remote_response_cache.set(
key,
repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
)
return result
return self.remote_response_cache.wrap(
key,
repl_layer.get_public_rooms,
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
class RoomListNextBatch(namedtuple("RoomListNextBatch", (

View File

@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
prev_event_ids,
prev_events_and_hashes,
txn_id=None,
ratelimit=True,
content=None,
@@ -175,7 +175,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
)
# Check if this event matches the previous membership event for the user.
@@ -314,7 +314,12 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server",
)
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
@@ -403,7 +408,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
)
defer.returnValue(res)
@@ -852,6 +857,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
# and if it is the only entry we'd like to return a 404 rather than a
# 500.
remote_room_hosts = [
host for host in remote_room_hosts if host != self.hs.hostname
]
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")

View File

@@ -15,7 +15,7 @@
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
__bool__ = __nonzero__ # python3
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
__bool__ = __nonzero__ # python3
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
or self.state
or self.account_data
)
__bool__ = __nonzero__ # python3
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
__bool__ = __nonzero__ # python3
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
__bool__ = __nonzero__ # python3
class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
def __nonzero__(self):
return bool(self.changed or self.left)
__bool__ = __nonzero__ # python3
class SyncResult(collections.namedtuple("SyncResult", [
@@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.device_lists or
self.groups
)
__bool__ = __nonzero__ # python3
class SyncHandler(object):
@@ -169,7 +176,7 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs)
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -180,15 +187,11 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
result = self.response_cache.get(sync_config.request_key)
if not result:
result = self.response_cache.set(
sync_config.request_key,
preserve_fn(self._wait_for_sync_for_user)(
sync_config, since_token, timeout, full_state
)
)
return make_deferred_yieldable(result)
return self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,

View File

@@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
@@ -33,7 +31,7 @@ SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
#
# "host" is actually a dotted-quad or ipv6 address string. Except when there's
# "host" is the hostname acquired from the SRV record. Except when there's
# no SRV record, in which case it is the original hostname.
_Server = collections.namedtuple(
"_Server", "priority weight host port expires"
@@ -297,20 +295,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
payload = answer.payload
hosts = yield _get_hosts_for_srv_record(
dns_client, str(payload.target)
)
for (ip, ttl) in hosts:
host_ttl = min(answer.ttl, ttl)
servers.append(_Server(
host=ip,
port=int(payload.port),
priority=int(payload.priority),
weight=int(payload.weight),
expires=int(clock.time()) + host_ttl,
))
servers.append(_Server(
host=str(payload.target),
port=int(payload.port),
priority=int(payload.priority),
weight=int(payload.weight),
expires=int(clock.time()) + answer.ttl,
))
servers.sort()
cache[service_name] = list(servers)
@@ -328,81 +319,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
raise e
defer.returnValue(servers)
@defer.inlineCallbacks
def _get_hosts_for_srv_record(dns_client, host):
"""Look up each of the hosts in a SRV record
Args:
dns_client (twisted.names.dns.IResolver):
host (basestring): host to look up
Returns:
Deferred[list[(str, int)]]: a list of (host, ttl) pairs
"""
ip4_servers = []
ip6_servers = []
def cb(res):
# lookupAddress and lookupIP6Address return a three-tuple
# giving the answer, authority, and additional sections of the
# response.
#
# we only care about the answers.
return res[0]
def eb(res, record_type):
if res.check(DNSNameError):
return []
logger.warn("Error looking up %s for %s: %s", record_type, host, res)
return res
# no logcontexts here, so we can safely fire these off and gatherResults
d1 = dns_client.lookupAddress(host).addCallbacks(
cb, eb, errbackArgs=("A", ))
d2 = dns_client.lookupIPV6Address(host).addCallbacks(
cb, eb, errbackArgs=("AAAA", ))
results = yield defer.DeferredList(
[d1, d2], consumeErrors=True)
# if all of the lookups failed, raise an exception rather than blowing out
# the cache with an empty result.
if results and all(s == defer.FAILURE for (s, _) in results):
defer.returnValue(results[0][1])
for (success, result) in results:
if success == defer.FAILURE:
continue
for answer in result:
if not answer.payload:
continue
try:
if answer.type == dns.A:
ip = answer.payload.dottedQuad()
ip4_servers.append((ip, answer.ttl))
elif answer.type == dns.AAAA:
ip = socket.inet_ntop(
socket.AF_INET6, answer.payload.address,
)
ip6_servers.append((ip, answer.ttl))
else:
# the most likely candidate here is a CNAME record.
# rfc2782 says srvs may not point to aliases.
logger.warn(
"Ignoring unexpected DNS record type %s for %s",
answer.type, host,
)
continue
except Exception as e:
logger.warn("Ignoring invalid DNS response for %s: %s",
host, e)
continue
# keep the ipv4 results before the ipv6 results, mostly to match historical
# behaviour.
defer.returnValue(ip4_servers + ip6_servers)

View File

@@ -286,7 +286,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None,
def put_json(self, destination, path, args={}, data={},
json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
@@ -296,6 +297,7 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): query params
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
@@ -342,6 +344,7 @@ class MatrixFederationHttpClient(object):
path,
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
query_bytes=encode_query_args(args),
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -373,6 +376,7 @@ class MatrixFederationHttpClient(object):
giving up. None indicates no timeout.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
args (dict): query params
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.

View File

@@ -113,6 +113,11 @@ response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
)
_next_request_id = 0
@@ -324,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource):
register_paths, so will return (possibly via Deferred) either
None, or a tuple of (http code, response body).
"""
if request.method == "OPTIONS":
if request.method == b"OPTIONS":
return _options_handler, {}
# Loop through all the registered callbacks to check if the method
@@ -426,6 +431,8 @@ class RequestMetrics(object):
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
class RootRedirect(resource.Resource):
"""Redirects the root '/' path to another path."""
@@ -536,7 +543,7 @@ def finish_request(request):
def _request_user_agent_is_curl(request):
user_agents = request.requestHeaders.getRawHeaders(
"User-Agent", default=[]
b"User-Agent", default=[]
)
for user_agent in user_agents:
if "curl" in user_agent:

View File

@@ -20,7 +20,7 @@ import logging
import re
import time
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
class SynapseRequest(Request):
@@ -43,12 +43,12 @@ class SynapseRequest(Request):
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
r'\1<redacted>\3',
br'\1<redacted>\3',
self.uri
)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def started_processing(self):
self.site.access_logger.info(

View File

@@ -17,12 +17,13 @@ import logging
import functools
import time
import gc
import platform
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric,
MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector
@@ -30,6 +31,7 @@ from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
all_metrics = []
all_collectors = []
@@ -63,6 +65,13 @@ class Metrics(object):
"""
return self._register(CounterMetric, *args, **kwargs)
def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
"""
Returns:
@@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
def runUntilCurrentTimer(func):
@@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
if running_on_pypy:
return ret
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
threshold = gc.get_threshold()
@@ -206,6 +244,7 @@ try:
# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
gc.disable()
if not running_on_pypy:
gc.disable()
except AttributeError:
pass

View File

@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty list).
# (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}
# Scalar metrics are never empty
@@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
)
class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""
def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}
def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
self.guages[values] = v
def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the

View File

@@ -144,6 +144,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
__bool__ = __nonzero__ # python3
class Notifier(object):

View File

@@ -1,5 +1,6 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,15 +19,31 @@ from distutils.version import LooseVersion
logger = logging.getLogger(__name__)
# this dict maps from python package name to a list of modules we expect it to
# provide.
#
# the key is a "requirement specifier", as used as a parameter to `pip
# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
#
# the value is a sequence of strings; each entry should be the name of the
# python module, optionally followed by a version assertion which can be either
# ">=<ver>" or "==<ver>".
#
# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"Twisted>=16.0.0": ["twisted>=16.0.0"],
# we break under Twisted 18.4
# (https://github.com/matrix-org/synapse/issues/3135)
"Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
@@ -39,6 +56,7 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {

View File

@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet):
self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while
self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id):
result = self.response_cache.get(event_id)
if not result:
result = self.response_cache.set(
event_id,
self._handle_request(request)
)
else:
logger.warn("Returning cached response")
return make_deferred_yieldable(result)
return self.response_cache.wrap(
event_id,
self._handle_request,
request
)
@preserve_fn
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):

View File

@@ -44,7 +44,10 @@ class LogoutRestServlet(ClientV1RestServlet):
requester = yield self.auth.get_user_by_req(request)
except AuthError:
# this implies the access token has already been deleted.
pass
defer.returnValue((401, {
"errcode": "M_UNKNOWN_TOKEN",
"error": "Access Token unknown or expired"
}))
else:
if requester.device_id is None:
# the acccess token wasn't associated with a device.

View File

@@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet):
admin = register_json.get("admin", None)
# Its important to check as we use null bytes as HMAC field separators
if "\x00" in user:
if b"\x00" in user:
raise SynapseError(400, "Invalid user")
if "\x00" in password:
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
# str() because otherwise hmac complains that 'unicode' does not

View File

@@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
content=content,
)
else:
event, context = yield self.event_creation_hander.create_event(
event = yield self.event_creation_hander.create_and_send_nonmember_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id,
)
yield self.event_creation_hander.send_nonmember_event(
requester, event, context,
)
ret = {}
if event:
ret = {"event_id": event.event_id}
@@ -655,7 +650,12 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
content=event_content,
)
defer.returnValue((200, {}))
return_value = {}
if membership_action == "join":
return_value["room_id"] = room_id
defer.returnValue((200, return_value))
def _has_3pid_invite_keys(self, content):
for key in {"id_server", "medium", "address"}:

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -401,6 +402,32 @@ class GroupInvitedUsersServlet(RestServlet):
defer.returnValue((200, result))
class GroupSettingJoinPolicyServlet(RestServlet):
"""Set group join policy
"""
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/settings/m.join_policy$")
def __init__(self, hs):
super(GroupSettingJoinPolicyServlet, self).__init__()
self.auth = hs.get_auth()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.set_group_join_policy(
group_id,
requester_user_id,
content,
)
defer.returnValue((200, result))
class GroupCreateServlet(RestServlet):
"""Create a group
"""
@@ -738,6 +765,7 @@ def register_servlets(hs, http_server):
GroupInvitedUsersServlet(hs).register(http_server)
GroupUsersServlet(hs).register(http_server)
GroupRoomServlet(hs).register(http_server)
GroupSettingJoinPolicyServlet(hs).register(http_server)
GroupCreateServlet(hs).register(http_server)
GroupAdminRoomsServlet(hs).register(http_server)
GroupAdminRoomsConfigServlet(hs).register(http_server)

View File

@@ -20,7 +20,6 @@ import synapse
import synapse.types
from synapse.api.auth import get_access_token_from_request, has_access_token
from synapse.api.constants import LoginType
from synapse.types import RoomID, RoomAlias
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
@@ -405,14 +404,6 @@ class RegisterRestServlet(RestServlet):
generate_token=False,
)
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = synapse.types.create_requester(registered_user_id)
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
self.auth_handler.set_session_data(
@@ -445,29 +436,6 @@ class RegisterRestServlet(RestServlet):
def on_OPTIONS(self, _):
return 200, {}
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield self.room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield self.room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
action="join",
)
@defer.inlineCallbacks
def _do_appservice_registration(self, username, as_token, body):
user_id = yield self.registration_handler.appservice_register(

View File

@@ -16,6 +16,8 @@
from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
import six
from ._base import Responder
from synapse.util.file_consumer import BackgroundFileConsumer
@@ -119,7 +121,7 @@ class MediaStorage(object):
os.remove(fname)
except Exception:
pass
raise t, v, tb
six.reraise(t, v, tb)
if not finished_called:
raise Exception("Finished callback not called")

View File

@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -244,13 +242,12 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows]
@defer.inlineCallbacks
def count_daily_users(self):
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
def _count_users(txn):
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
sql = """
SELECT COALESCE(count(*), 0) FROM (
@@ -264,8 +261,91 @@ class DataStore(RoomMemberStore, RoomStore,
count, = txn.fetchone()
return count
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
return self.runInteraction("count_users", _count_users)
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days ago
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days apart
Returns counts globaly for a given user as well as breaking
by platform
"""
def _count_r30_users(txn):
thirty_days_in_secs = 86400 * 30
now = int(self._clock.time())
thirty_days_ago_in_secs = now - thirty_days_in_secs
sql = """
SELECT platform, COALESCE(count(*), 0) FROM (
SELECT
users.name, platform, users.creation_ts * 1000,
MAX(uip.last_seen)
FROM users
INNER JOIN (
SELECT
user_id,
last_seen,
CASE
WHEN user_agent LIKE '%%Android%%' THEN 'android'
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
ELSE 'unknown'
END
AS platform
FROM user_ips
) uip
ON users.name = uip.user_id
AND users.appservice_id is NULL
AND users.creation_ts < ?
AND uip.last_seen/1000 > ?
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
GROUP BY users.name, platform, users.creation_ts
) u GROUP BY platform
"""
results = {}
txn.execute(sql, (thirty_days_ago_in_secs,
thirty_days_ago_in_secs))
for row in txn:
if row[0] is 'unknown':
pass
results[row[0]] = row[1]
sql = """
SELECT COALESCE(count(*), 0) FROM (
SELECT users.name, users.creation_ts * 1000,
MAX(uip.last_seen)
FROM users
INNER JOIN (
SELECT
user_id,
last_seen
FROM user_ips
) uip
ON users.name = uip.user_id
AND appservice_id is NULL
AND users.creation_ts < ?
AND uip.last_seen/1000 > ?
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
GROUP BY users.name, users.creation_ts
) u
"""
txn.execute(sql, (thirty_days_ago_in_secs,
thirty_days_ago_in_secs))
count, = txn.fetchone()
results['all'] = count
return results
return self.runInteraction("count_r30_users", _count_r30_users)
def get_users(self):
"""Function to reterive a list of users in users table.

View File

@@ -376,7 +376,7 @@ class SQLBaseStore(object):
Returns:
A list of dicts where the key is the column header.
"""
col_headers = list(intern(column[0]) for column in cursor.description)
col_headers = list(intern(str(column[0])) for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor
)

View File

@@ -48,6 +48,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["user_id", "device_id", "last_seen"],
)
self.register_background_index_update(
"user_ips_last_seen_index",
index_name="user_ips_last_seen",
table="user_ips",
columns=["user_id", "last_seen"],
)
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

View File

@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
import importlib
import platform
SUPPORTED_MODULE = {
@@ -31,6 +32,10 @@ def create_engine(database_config):
engine_class = SUPPORTED_MODULE.get(name, None)
if engine_class:
# pypy requires psycopg2cffi rather than psycopg2
if (name == "psycopg2" and
platform.python_implementation() == "PyPy"):
name = "psycopg2cffi"
module = importlib.import_module(name)
return engine_class(module, database_config)

View File

@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from twisted.internet import defer
@@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
import logging
from Queue import PriorityQueue, Empty
from six.moves.queue import PriorityQueue, Empty
from six.moves import range
logger = logging.getLogger(__name__)
@@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
front_list = list(front)
chunks = [
front_list[x:x + 100]
for x in xrange(0, len(front), 100)
for x in range(0, len(front), 100)
]
for chunk in chunks:
txn.execute(
@@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
retcol="event_id",
)
@defer.inlineCallbacks
def get_prev_events_for_room(self, room_id):
"""
Gets a subset of the current forward extremities in the given room.
Limits the result to 10 extremities, so that we can avoid creating
events which refer to hundreds of prev_events.
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
if len(res) > 10:
# Sort by reverse depth, so we point to the most recent.
res.sort(key=lambda a: -a[2])
# we use half of the limit for the actual most recent events, and
# the other half to randomly point to some of the older events, to
# make sure that we don't completely ignore the older events.
res = res[0:5] + random.sample(res[5:], 5)
defer.returnValue(res)
def get_latest_event_ids_and_hashes_in_room(self, room_id):
"""
Gets the current forward extremities in the given room
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
return self.runInteraction(
"get_latest_event_ids_and_hashes_in_room",
self._get_latest_event_ids_and_hashes_in_room,
@@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
room_id,
)
@defer.inlineCallbacks
def get_max_depth_of_events(self, event_ids):
sql = (
"SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
) % (",".join(["?"] * len(event_ids)),)
rows = yield self._execute(
"get_max_depth_of_events", None,
sql, *event_ids
)
if rows:
defer.returnValue(rows[0][0])
else:
defer.returnValue(1)
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,

View File

@@ -16,6 +16,7 @@
from collections import OrderedDict, deque, namedtuple
from functools import wraps
import itertools
import logging
import simplejson as json
@@ -444,6 +445,9 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
for event, context in chunk:
if context.app_service:
origin_type = "local"
@@ -1317,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(set(r["event_id"] for r in rows))
def have_events(self, event_ids):
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
@@ -1345,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
return res
return self.runInteraction(
"have_events", f,
)
return self.runInteraction("get_rejection_reasons", f)
@defer.inlineCallbacks
def count_daily_messages(self):

View File

@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore):
def get_received_ts(self, event_id):
"""Get received_ts (when it was persisted) for the event.
Raises an exception for unknown events.
Args:
event_id (str)
Returns:
Deferred[int|None]: Timestamp in milliseconds, or None for events
that were persisted before received_ts was implemented.
"""
return self._simple_select_one_onecol(
table="events",
keyvalues={
"event_id": event_id,
},
retcol="received_ts",
desc="get_received_ts",
)
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +30,24 @@ _DEFAULT_ROLE_ID = ""
class GroupServerStore(SQLBaseStore):
def set_group_join_policy(self, group_id, join_policy):
"""Set the join policy of a group.
join_policy can be one of:
* "invite"
* "open"
"""
return self._simple_update_one(
table="groups",
keyvalues={
"group_id": group_id,
},
updatevalues={
"join_policy": join_policy,
},
desc="set_group_join_policy",
)
def get_group(self, group_id):
return self._simple_select_one(
table="groups",
@@ -36,10 +55,11 @@ class GroupServerStore(SQLBaseStore):
"group_id": group_id,
},
retcols=(
"name", "short_description", "long_description", "avatar_url", "is_public"
"name", "short_description", "long_description",
"avatar_url", "is_public", "join_policy",
),
allow_none=True,
desc="is_user_in_group",
desc="get_group",
)
def get_users_in_group(self, group_id, include_private=False):

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,7 +26,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 47
SCHEMA_VERSION = 48
dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
# Convert the IDs to MXC URIs
for media_id in local_mxcs:
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
@@ -594,7 +594,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
SELECT stream_ordering, content FROM events
SELECT stream_ordering, json FROM events
JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@@ -606,8 +607,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
next_token = None
for stream_ordering, content_json in txn:
next_token = stream_ordering
content = json.loads(content_json)
event_json = json.loads(content_json)
content = event_json["content"]
content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url")
@@ -618,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
if hostname == self.hostname:
if hostname == self.hs.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))

View File

@@ -645,8 +645,9 @@ class RoomMemberStore(RoomMemberWorkerStore):
def add_membership_profile_txn(txn):
sql = ("""
SELECT stream_ordering, event_id, events.room_id, content
SELECT stream_ordering, event_id, events.room_id, event_json.json
FROM events
INNER JOIN event_json USING (event_id)
INNER JOIN room_memberships USING (event_id)
WHERE ? <= stream_ordering AND stream_ordering < ?
AND type = 'm.room.member'
@@ -667,7 +668,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
event_id = row["event_id"]
room_id = row["room_id"]
try:
content = json.loads(row["content"])
event_json = json.loads(row["json"])
content = event_json['content']
except Exception:
continue

View File

@@ -0,0 +1,17 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('user_ips_last_seen_index', '{}');

View File

@@ -0,0 +1,22 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This isn't a real ENUM because sqlite doesn't support it
* and we use a default of NULL for inserted rows and interpret
* NULL at the python store level as necessary so that existing
* rows are given the correct default policy.
*/
ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';

View File

@@ -75,8 +75,9 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id, room_id, type, content, "
"SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events"
" JOIN event_json USING (room_id, event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -104,7 +105,8 @@ class SearchStore(BackgroundUpdateStore):
stream_ordering = row["stream_ordering"]
origin_server_ts = row["origin_server_ts"]
try:
content = json.loads(row["content"])
event_json = json.loads(row["json"])
content = event_json["content"]
except Exception:
continue

View File

@@ -169,7 +169,7 @@ class DomainSpecificString(
except Exception:
return False
__str__ = to_string
__repr__ = to_string
class UserID(DomainSpecificString):

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -39,12 +40,11 @@ _CacheSentinel = object()
class CacheEntry(object):
__slots__ = [
"deferred", "sequence", "callbacks", "invalidated"
"deferred", "callbacks", "invalidated"
]
def __init__(self, deferred, sequence, callbacks):
def __init__(self, deferred, callbacks):
self.deferred = deferred
self.sequence = sequence
self.callbacks = set(callbacks)
self.invalidated = False
@@ -62,7 +62,6 @@ class Cache(object):
"max_entries",
"name",
"keylen",
"sequence",
"thread",
"metrics",
"_pending_deferred_cache",
@@ -80,7 +79,6 @@ class Cache(object):
self.name = name
self.keylen = keylen
self.sequence = 0
self.thread = None
self.metrics = register_cache(name, self.cache)
@@ -113,11 +111,10 @@ class Cache(object):
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
if val.sequence == self.sequence:
val.callbacks.update(callbacks)
if update_metrics:
self.metrics.inc_hits()
return val.deferred
val.callbacks.update(callbacks)
if update_metrics:
self.metrics.inc_hits()
return val.deferred
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
if val is not _CacheSentinel:
@@ -137,12 +134,9 @@ class Cache(object):
self.check_thread()
entry = CacheEntry(
deferred=value,
sequence=self.sequence,
callbacks=callbacks,
)
entry.callbacks.update(callbacks)
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry:
existing_entry.invalidate()
@@ -150,13 +144,25 @@ class Cache(object):
self._pending_deferred_cache[key] = entry
def shuffle(result):
if self.sequence == entry.sequence:
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry is entry:
self.cache.set(key, result, entry.callbacks)
else:
entry.invalidate()
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry is entry:
self.cache.set(key, result, entry.callbacks)
else:
# oops, the _pending_deferred_cache has been updated since
# we started our query, so we are out of date.
#
# Better put back whatever we took out. (We do it this way
# round, rather than peeking into the _pending_deferred_cache
# and then removing on a match, to make the common case faster)
if existing_entry is not None:
self._pending_deferred_cache[key] = existing_entry
# we're not going to put this entry into the cache, so need
# to make sure that the invalidation callbacks are called.
# That was probably done when _pending_deferred_cache was
# updated, but it's possible that `set` was called without
# `invalidate` being previously called, in which case it may
# not have been. Either way, let's double-check now.
entry.invalidate()
return result
@@ -168,25 +174,29 @@ class Cache(object):
def invalidate(self, key):
self.check_thread()
self.cache.pop(key, None)
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
# for future queries and (b) stop it being persisted as a proper entry
# in self.cache.
entry = self._pending_deferred_cache.pop(key, None)
# run the invalidation callbacks now, rather than waiting for the
# deferred to resolve.
if entry:
entry.invalidate()
self.cache.pop(key, None)
def invalidate_many(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
self.sequence += 1
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
@@ -194,8 +204,10 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
self.sequence += 1
self.cache.clear()
for entry in self._pending_deferred_cache.itervalues():
entry.invalidate()
self._pending_deferred_cache.clear()
class _CacheDescriptorBase(object):

View File

@@ -12,8 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class ResponseCache(object):
@@ -24,20 +31,68 @@ class ResponseCache(object):
used rather than trying to compute a new response.
"""
def __init__(self, hs, timeout_ms=0):
def __init__(self, hs, name, timeout_ms=0):
self.pending_result_cache = {} # Requests that haven't finished yet.
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.
self._name = name
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
cache_name=name,
)
def size(self):
return len(self.pending_result_cache)
def get(self, key):
"""Look up the given key.
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if the request has completed, the actual
result. You will probably want to make_deferred_yieldable the result.
If there is no entry for the key, returns None. It is worth noting that
this means there is no way to distinguish a completed result of None
from an absent cache entry.
Args:
key (hashable):
Returns:
twisted.internet.defer.Deferred|None|E: None if there is no entry
for this key; otherwise either a deferred result or the result
itself.
"""
result = self.pending_result_cache.get(key)
if result is not None:
self._metrics.inc_hits()
return result.observe()
else:
self._metrics.inc_misses()
return None
def set(self, key, deferred):
"""Set the entry for the given key to the given deferred.
*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
logcontext.run_in_background).
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if *deferred* was already complete, the actual
result. You will probably want to make_deferred_yieldable the result.
Args:
key (hashable):
deferred (twisted.internet.defer.Deferred[T):
Returns:
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
result.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
@@ -53,3 +108,52 @@ class ResponseCache(object):
result.addBoth(remove)
return result.observe()
def wrap(self, key, callback, *args, **kwargs):
"""Wrap together a *get* and *set* call, taking care of logcontexts
First looks up the key in the cache, and if it is present makes it
follow the synapse logcontext rules and returns it.
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
follow the synapse logcontext rules, and adds the result to the cache.
Example usage:
@defer.inlineCallbacks
def handle_request(request):
# etc
defer.returnValue(result)
result = yield response_cache.wrap(
key,
handle_request,
request,
)
Args:
key (hashable): key to get/set in the cache
callback (callable): function to call if the key is not found in
the cache
*args: positional parameters to pass to the callback, if it is used
**kwargs: named paramters to pass to the callback, if it is used
Returns:
twisted.internet.defer.Deferred: yieldable result
"""
result = self.get(key)
if not result:
logger.info("[%s]: no cached result for [%s], calculating new one",
self._name, key)
d = run_in_background(callback, *args, **kwargs)
result = self.set(key, d)
elif not isinstance(result, defer.Deferred) or result.called:
logger.info("[%s]: using completed cached result for [%s]",
self._name, key)
else:
logger.info("[%s]: using incomplete cached result for [%s]",
self._name, key)
return make_deferred_yieldable(result)

View File

@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import Queue
from six.moves import queue
class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
# Queue of slices of bytes to be written. When producer calls
# unregister a final None is sent.
self._bytes_queue = Queue.Queue()
self._bytes_queue = queue.Queue()
# Deferred that is resolved when finished writing
self._finished_deferred = None

View File

@@ -92,6 +92,7 @@ class LoggingContext(object):
def __nonzero__(self):
return False
__bool__ = __nonzero__ # python3
sentinel = Sentinel()

View File

@@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase):
def setUp(self):
self.dir = tempfile.mkdtemp()
print self.dir
print(self.dir)
self.file = os.path.join(self.dir, "homeserver.yaml")
def tearDown(self):

View File

@@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1)],
)
yield async.sleep(01)
yield async.sleep(1)
self.http_client.post_json.assert_not_called()
res_deferreds_2[0].addBoth(self.check_context, None)

View File

@@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store)
self.mock_store.get_received_ts.return_value = 0
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
hs.get_clock.return_value = MockClock()

View File

@@ -123,6 +123,7 @@ class EventStreamPermissionsTestCase(RestTestCase):
self.ratelimiter.send_message.return_value = (True, 0)
hs.config.enable_registration_captcha = False
hs.config.enable_registration = True
hs.config.auto_join_rooms = []
hs.get_handlers().federation_handler = Mock()

View File

@@ -480,9 +480,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
ApplicationServiceStore(None, hs)
e = cm.exception
self.assertIn(f1, e.message)
self.assertIn(f2, e.message)
self.assertIn("id", e.message)
self.assertIn(f1, str(e))
self.assertIn(f2, str(e))
self.assertIn("id", str(e))
@defer.inlineCallbacks
def test_duplicate_as_tokens(self):
@@ -504,6 +504,6 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
ApplicationServiceStore(None, hs)
e = cm.exception
self.assertIn(f1, e.message)
self.assertIn(f2, e.message)
self.assertIn("as_token", e.message)
self.assertIn(f1, str(e))
self.assertIn(f2, str(e))
self.assertIn("as_token", str(e))

View File

@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import tests.unittest
import tests.utils
class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield tests.utils.setup_test_homeserver()
self.store = hs.get_datastore()
@defer.inlineCallbacks
def test_get_prev_events_for_room(self):
room_id = '@ROOM:local'
# add a bunch of events and hashes to act as forward extremities
def insert_event(txn, i):
event_id = '$event_%i:local' % i
txn.execute((
"INSERT INTO events ("
" room_id, event_id, type, depth, topological_ordering,"
" content, processed, outlier) "
"VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
), (room_id, event_id, i, i, True, False))
txn.execute((
'INSERT INTO event_forward_extremities (room_id, event_id) '
'VALUES (?, ?)'
), (room_id, event_id))
txn.execute((
'INSERT INTO event_reference_hashes '
'(event_id, algorithm, hash) '
"VALUES (?, 'sha256', ?)"
), (event_id, 'ffff'))
for i in range(0, 11):
yield self.store.runInteraction("insert", insert_event, i)
# this should get the last five and five others
r = yield self.store.get_prev_events_for_room(room_id)
self.assertEqual(10, len(r))
for i in range(0, 5):
el = r[i]
depth = el[2]
self.assertEqual(10 - i, depth)
for i in range(5, 5):
el = r[i]
depth = el[2]
self.assertLessEqual(5, depth)

View File

@@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase):
def test_signal_catch(self):
self.dist.declare("alarm")
observers = [Mock() for i in 1, 2]
observers = [Mock() for i in (1, 2)]
for o in observers:
self.dist.observe("alarm", o)

View File

@@ -33,8 +33,6 @@ class DnsTestCase(unittest.TestCase):
service_name = "test_service.example.com"
host_name = "example.com"
ip_address = "127.0.0.1"
ip6_address = "::1"
answer_srv = dns.RRHeader(
type=dns.SRV,
@@ -43,29 +41,9 @@ class DnsTestCase(unittest.TestCase):
)
)
answer_a = dns.RRHeader(
type=dns.A,
payload=dns.Record_A(
address=ip_address,
)
)
answer_aaaa = dns.RRHeader(
type=dns.AAAA,
payload=dns.Record_AAAA(
address=ip6_address,
)
)
dns_client_mock.lookupService.return_value = defer.succeed(
([answer_srv], None, None),
)
dns_client_mock.lookupAddress.return_value = defer.succeed(
([answer_a], None, None),
)
dns_client_mock.lookupIPV6Address.return_value = defer.succeed(
([answer_aaaa], None, None),
)
cache = {}
@@ -74,13 +52,10 @@ class DnsTestCase(unittest.TestCase):
)
dns_client_mock.lookupService.assert_called_once_with(service_name)
dns_client_mock.lookupAddress.assert_called_once_with(host_name)
dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name)
self.assertEquals(len(servers), 2)
self.assertEquals(len(servers), 1)
self.assertEquals(servers, cache[service_name])
self.assertEquals(servers[0].host, ip_address)
self.assertEquals(servers[1].host, ip6_address)
self.assertEquals(servers[0].host, host_name)
@defer.inlineCallbacks
def test_from_cache_expired_and_dns_fail(self):

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial
import logging
import mock
@@ -25,6 +27,50 @@ from tests import unittest
logger = logging.getLogger(__name__)
class CacheTestCase(unittest.TestCase):
def test_invalidate_all(self):
cache = descriptors.Cache("testcache")
callback_record = [False, False]
def record_callback(idx):
callback_record[idx] = True
# add a couple of pending entries
d1 = defer.Deferred()
cache.set("key1", d1, partial(record_callback, 0))
d2 = defer.Deferred()
cache.set("key2", d2, partial(record_callback, 1))
# lookup should return the deferreds
self.assertIs(cache.get("key1"), d1)
self.assertIs(cache.get("key2"), d2)
# let one of the lookups complete
d2.callback("result2")
self.assertEqual(cache.get("key2"), "result2")
# now do the invalidation
cache.invalidate_all()
# lookup should return none
self.assertIsNone(cache.get("key1", None))
self.assertIsNone(cache.get("key2", None))
# both callbacks should have been callbacked
self.assertTrue(
callback_record[0], "Invalidation callback for key1 not called",
)
self.assertTrue(
callback_record[1], "Invalidation callback for key2 not called",
)
# letting the other lookup complete should do nothing
d1.callback("result1")
self.assertIsNone(cache.get("key1", None))
class DescriptorTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_cache(self):

View File

@@ -20,7 +20,7 @@ from mock import NonCallableMock
from synapse.util.file_consumer import BackgroundFileConsumer
from tests import unittest
from StringIO import StringIO
from six import StringIO
import threading

View File

@@ -18,6 +18,7 @@ from tests import unittest
from twisted.internet import defer
from synapse.util.async import Linearizer
from six.moves import range
class LinearizerTestCase(unittest.TestCase):
@@ -58,7 +59,7 @@ class LinearizerTestCase(unittest.TestCase):
logcontext.LoggingContext.current_context(), lc)
func(0, sleep=True)
for i in xrange(1, 100):
for i in range(1, 100):
func(i)
return func(1000)

View File

@@ -59,6 +59,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
config.email_enable_notifs = False
config.block_non_admin_invites = False
config.federation_domain_whitelist = None
config.federation_rc_reject_limit = 10
config.federation_rc_sleep_limit = 10
config.federation_rc_concurrent = 10
config.filter_timeline_limit = 5000
config.user_directory_search_all_users = False
# disable user directory updates, because they get done in the
@@ -212,7 +216,7 @@ class MockHttpResource(HttpServer):
headers = {}
if federation_auth:
headers["Authorization"] = ["X-Matrix origin=test,key=,sig="]
headers[b"Authorization"] = ["X-Matrix origin=test,key=,sig="]
mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)
# return the right path if the event requires it