1
0

Compare commits

...

200 Commits

Author SHA1 Message Date
Will Hunt 212d4abeab Add unit tests 2018-06-27 17:11:13 +01:00
Will Hunt 2df75cec69 event.event_id => event.sender 2018-06-27 16:04:33 +01:00
Will Hunt ae6eb102b1 Make flake happy 2018-06-25 17:24:00 +01:00
Will Hunt 8d926d78ff Add app_service_server_blacklist option 2018-06-25 17:16:01 +01:00
Erik Johnston 947fea67cb Need to pass reactor to endpoint fac 2018-06-25 15:22:57 +01:00
Amber Brown 36cb570641 Use towncrier to build the changelog (#3425) 2018-06-25 14:42:27 +01:00
Erik Johnston 33fdcfa957 Merge pull request #3441 from matrix-org/erikj/redo_erasure
Fix user erasure and re-enable
2018-06-25 14:37:01 +01:00
Erik Johnston eb50c44eaf Add UserErasureWorkerStore to workers 2018-06-25 14:22:24 +01:00
Amber Brown 07cad26d65 Remove all global reactor imports & pass it around explicitly (#3424) 2018-06-25 14:08:28 +01:00
Erik Johnston 244484bf3c Revert "Revert "Merge pull request #3431 from matrix-org/rav/erasure_visibility""
This reverts commit 1d009013b3.
2018-06-25 13:42:55 +01:00
Richard van der Hoff 1d009013b3 Revert "Merge pull request #3431 from matrix-org/rav/erasure_visibility"
This reverts commit ce0d911156, reversing
changes made to b4a5d767a9.
2018-06-22 16:35:10 +01:00
Richard van der Hoff 516f884176 Merge pull request #3435 from matrix-org/rav/fix_event_push_actions_tablescan
Fix event_push_actions tablescan when reinserting events
2018-06-22 15:57:22 +01:00
Mark Haines 9850f66abe Deleting from event_push_actions needs to use an index 2018-06-22 15:54:48 +01:00
Richard van der Hoff fe8d2968e3 Merge pull request #3434 from matrix-org/rav/search_logging
Add some logging to /search and /context queries
2018-06-22 15:51:11 +01:00
Erik Johnston 28ddc6cfbe Also log number of events for serach context 2018-06-22 15:42:11 +01:00
Erik Johnston 4b4cec3989 Add some logging to search queries 2018-06-22 15:42:11 +01:00
Richard van der Hoff 200e11c5bf Merge pull request #3432 from matrix-org/rav/joined_hosts_cache_non_iterable
Make _get_joined_hosts_cache cache non-iterable
2018-06-22 15:18:51 +01:00
Erik Johnston f8272813a9 Make _get_joined_hosts_cache cache non-iterable 2018-06-22 15:12:26 +01:00
Richard van der Hoff 1d7ad11747 Merge pull request #3430 from matrix-org/rav/configurable_push_action_rotation
Make push actions rotation configurable
2018-06-22 15:07:31 +01:00
Erik Johnston ce0d911156 Merge pull request #3431 from matrix-org/rav/erasure_visibility
Support hiding events from deleted users
2018-06-22 15:06:44 +01:00
Erik Johnston b4a5d767a9 Merge pull request #3428 from matrix-org/erikj/persisted_pdu
Simplify get_persisted_pdu
2018-06-22 14:47:43 +01:00
Erik Johnston f79abda87f Merge pull request #3427 from matrix-org/erikj/remove_filters
remove dead filter_events_for_clients
2018-06-22 14:45:42 +01:00
Erik Johnston 75dc3ddeab Make push actions rotation configurable 2018-06-22 14:44:37 +01:00
Richard van der Hoff bb018d0b5b Merge pull request #3383 from matrix-org/rav/hacky_speedup_state_group_cache
Improve StateGroupCache efficiency for wildcard lookups
2018-06-22 14:01:55 +01:00
Richard van der Hoff 43e02c409d Disable partial state group caching for wildcard lookups
When _get_state_for_groups is given a wildcard filter, just do a complete
lookup. Hopefully this will give us the best of both worlds by not filling up
the ram if we only need one or two keys, but also making the cache still work
for the federation reader usecase.
2018-06-22 11:52:07 +01:00
Richard van der Hoff 240f192523 Merge pull request #3382 from matrix-org/rav/optimise_state_groups
Optimise state_group_cache update
2018-06-22 11:20:20 +01:00
Richard van der Hoff 70e6501913 Merge pull request #3419 from matrix-org/rav/events_per_request
Log number of events fetched from DB
2018-06-22 11:17:56 +01:00
Richard van der Hoff 0495fe0035 Indirect evt_count updates via method call
so that we can stub it for the sentinel and not have a billion failing UTs
2018-06-22 10:42:28 +01:00
Amber Brown 9a685d60ae Merge pull request #3418 from matrix-org/rav/fix_metric_desc
Fix description of "python_gc_time" metric
2018-06-22 09:43:26 +01:00
Amber Brown 77ac14b960 Pass around the reactor explicitly (#3385) 2018-06-22 09:37:10 +01:00
Richard van der Hoff cbbfaa4be8 Fix description of "python_gc_time" metric 2018-06-21 10:02:42 +01:00
Amber Brown c2eff937ac Populate synapse_federation_client_sent_pdu_destinations:count again (#3386) 2018-06-21 09:39:58 +01:00
Amber Brown 99b77aa829 Fix tcp protocol metrics naming (#3410) 2018-06-21 09:39:27 +01:00
Richard van der Hoff b088aafcae Log number of events fetched from DB
When we finish processing a request, log the number of events we fetched from
the database to handle it.

[I'm trying to figure out which requests are responsible for large amounts of
event cache churn. It may turn out to be more helpful to add counts to the
prometheus per-request/block metrics, but that is an extension to this code
anyway.]
2018-06-21 06:15:03 +01:00
Richard van der Hoff aff3d76920 Merge pull request #3416 from matrix-org/rav/restart_indicator
Write a clear restart indicator in logs
2018-06-20 18:14:08 +01:00
Richard van der Hoff 02bfc581f8 Merge pull request #3399 from costacruise/master
Add error code to room creation error
2018-06-20 17:26:25 +01:00
Richard van der Hoff 245d53d32a Write a clear restart indicator in logs
I'm fed up with never being able to find the point a server restarted in the
logs.
2018-06-20 15:33:14 +01:00
Amber Brown f6c4d74f96 Fix inflight requests metric (incorrect name & traceback) (#3413) 2018-06-20 11:18:57 +01:00
Matthew Hodgson ccfdaf68be spell gauge correctly 2018-06-16 07:10:34 +01:00
Richard van der Hoff 9a793f861c Merge branch 'master' into develop 2018-06-14 16:36:01 +01:00
Richard van der Hoff 53969e1960 Merge tag 'v0.31.2'
SECURITY UPDATE: Prevent unauthorised users from setting state events in a room
when there is no `m.room.power_levels` event in force in the room. (PR #3397)

Discussion around the Matrix Spec change proposal for this change can be
followed at https://github.com/matrix-org/matrix-doc/issues/1304.
2018-06-14 16:35:33 +01:00
Richard van der Hoff 667c6546bd link to spec proposal from changelog 2018-06-14 16:27:41 +01:00
Richard van der Hoff 7e1c616452 v0.31.2 2018-06-14 16:24:32 +01:00
Richard van der Hoff ba438a3ac1 changelog for 0.31.2 2018-06-14 16:22:46 +01:00
Richard van der Hoff 61ab08a197 Merge pull request #3397 from matrix-org/rav/adjust_auth_rules
Adjust event auth rules when there is no PL event
2018-06-14 16:09:13 +01:00
Richard van der Hoff 1e77ac66e3 Fix broken unit test
We need power levels for this test to do what it is supposed to do.
2018-06-14 14:21:29 +01:00
Richard van der Hoff a502cfec00 remove spurious debug 2018-06-14 14:20:53 +01:00
Michael Wagner 19cd3120ec Add error code to room creation error
This error code is mentioned in the documentation at https://matrix.org/docs/api/client-server/#!/Room32creation/createRoom
2018-06-14 14:08:40 +02:00
Richard van der Hoff 5c9afd6f80 Make default state_default 50
Make it so that, before there is a power-levels event in the room, you need a
power level of at least 50 to send state.

Partially addresses https://github.com/matrix-org/matrix-doc/issues/1192
2018-06-14 12:38:09 +01:00
Richard van der Hoff 52423607bd Clarify interface for event_auth
stop pretending that it returns a boolean, which just almost gave me a heart
attack.
2018-06-14 12:26:17 +01:00
Amber Brown f116f32ace add a last seen metric (#3396) 2018-06-14 20:26:59 +10:00
Richard van der Hoff 557b686eac Refactor get_send_level to take a power_levels event
it makes it easier for me to reason about
2018-06-14 11:26:27 +01:00
Amber Brown a61738b316 Remove run_on_reactor (#3395) 2018-06-14 18:27:37 +10:00
Richard van der Hoff 3681437c35 Merge pull request #3368 from matrix-org/rav/fix_federation_client_host
Fix commandline federation_client to send the right Host header
2018-06-13 15:41:51 +01:00
Amber Brown 0fde1896cd Merge pull request #3389 from turt2live/travis/name_metrics
Use the correct flag (enable_metrics) when warning about an incorrect metrics setup
2018-06-13 23:50:10 +10:00
Amber Brown 2a4fde0a6f Merge pull request #3390 from turt2live/travis/appsvc-metrics
Use the RegistryProxy for appservices too
2018-06-13 22:05:15 +10:00
Travis Ralston 45768d1640 Use the RegistryProxy for appservices too
Signed-off-by: Travis Ralston <travpc@gmail.com>
2018-06-12 12:55:48 -06:00
Travis Ralston 12285a1a76 The flag is named enable_metrics, not collect_metrics
Signed-off-by: Travis Ralston <travpc@gmail.com>
2018-06-12 12:51:31 -06:00
Richard van der Hoff 96bad44f87 Fix federation_client to send the right Host
This appears to have stopped working since matrix.org moved to cloudflare. The
Host header should match the name of the server, not whatever is in the SRV
record.
2018-06-12 14:14:36 +01:00
Richard van der Hoff b6faef2ad7 Filter out erased messages
Redact any messges sent by erased users.
2018-06-12 09:53:18 +01:00
Richard van der Hoff f1023ebf4b mark accounts as erased when requested 2018-06-12 09:53:18 +01:00
Richard van der Hoff 3ff8a619f5 UserErasureStore
to store which users have been erased
2018-06-12 09:52:22 +01:00
Richard van der Hoff 9fc5b74b24 simplify get_persisted_pdu
it doesn't make much sense to use get_persisted_pdu on the receive path: just
get the event straight from the store.
2018-06-12 09:51:31 +01:00
Richard van der Hoff bd348f0af6 remove dead filter_events_for_clients
This is only used by filter_events_for_client, so we can simplify the whole
thing by just doing one user at a time, and removing a dead storage function to
boot.
2018-06-12 09:51:31 +01:00
Richard van der Hoff eb32b2ca20 Optimise state_group_cache update
(1) matrix-org-hotfixes has removed the intern calls; let's do the same here.
(2) remove redundant iteritems() so we can used an optimised db update.
2018-06-11 22:56:11 +01:00
David Baker 187a546bff Merge pull request #3276 from matrix-org/dbkr/unbind
Remove email addresses / phone numbers from ID servers when they're removed from synapse
2018-06-11 16:02:00 +01:00
Matthew Hodgson d6cc369205 fix idiotic typo in state res 2018-06-11 14:43:55 +01:00
Neil Johnson ed5a0780a4 Merge branch 'master' into develop 2018-06-08 15:47:11 +01:00
Neil Johnson 1032393dfb Merge tag 'v0.31.1'
Changes in synapse v0.31.1 (2018-06-08)
=======================================

v0.31.1 fixes a security bug in the ``get_missing_events`` federation API
where event visibility rules were not applied correctly.

We are not aware of it being actively exploited but please upgrade asap.

Bug Fixes:

* Fix event filtering in get_missing_events handler (PR #3371)
2018-06-08 15:46:18 +01:00
Neil Johnson aefcc0f5e5 tweak changelog 2018-06-08 15:32:54 +01:00
Neil Johnson 82e751c43f Update CHANGES.rst 2018-06-08 15:22:34 +01:00
Neil Johnson 0eb4722932 changelog a bump version 2018-06-08 15:21:46 +01:00
Richard van der Hoff c6b1441c52 Fix event filtering in get_missing_events handler 2018-06-08 14:15:31 +01:00
Matthew Hodgson 8b98acca05 fix various changelog bugs and typos 2018-06-08 14:15:16 +01:00
David Baker 0e505b1913 Merge pull request #3372 from matrix-org/rav/better_verification_logging
Try to log more helpful info when a sig verification fails
2018-06-08 13:32:02 +01:00
David Baker ad9edd1d96 Merge pull request #3371 from matrix-org/rav/fix_get_missing_events
Fix event filtering in get_missing_events handler
2018-06-08 13:19:15 +01:00
Richard van der Hoff e82db24a0e Try to log more helpful info when a sig verification fails
Firstly, don't swallow the reason for the failure

Secondly, don't assume all exceptions are verification failures

Thirdly, log a bit of info about the key being used if debug is enabled
2018-06-08 12:13:08 +01:00
Richard van der Hoff 0834b49c6a Fix event filtering in get_missing_events handler 2018-06-08 11:34:46 +01:00
Matthew Hodgson 36446ffedb fix various changelog bugs and typos 2018-06-07 23:54:16 +03:00
Will Hunt 13d211edc1 Merge pull request #3344 from Half-Shot/hs/as-metrics
Add metrics to track appservice transactions
2018-06-07 11:37:12 +01:00
Richard van der Hoff 1152f495a0 Merge pull request #3363 from matrix-org/rav/fix_purge
Fix event-purge-by-ts admin API
2018-06-07 11:34:46 +01:00
Richard van der Hoff e2acf536d4 Merge pull request #3355 from matrix-org/rav/fix_federation_backfill
Fix federation backfill from sqlite servers
2018-06-07 10:18:01 +01:00
Amber Brown 0160f6601f Merge pull request #3356 from matrix-org/rav/add_missing_attr_dep
Make sure that attr is installed
2018-06-07 16:33:30 +10:00
Richard van der Hoff f4caf3f83d fix log 2018-06-07 00:26:38 +01:00
Richard van der Hoff 0546715c18 Fix event-purge-by-ts admin API
This got completely broken in 0.30.

Fixes #3300.
2018-06-07 00:15:49 +01:00
Richard van der Hoff 57e3f923d2 Add missing dependency on attr
We've rcently added a dep on `attr`. I don't know why the CI didn't pick this
up, but we should make it explicit anyway.
2018-06-06 17:12:41 +01:00
Richard van der Hoff d3a8c9c55e Fix sql error in _get_state_groups_from_groups
If this was called with a `(type, None)` entry in types (which is supposed to
return all state of type `type`), it would explode with a sql error.
2018-06-06 14:19:01 +01:00
Neil Johnson fef6c2cdcc Merge branch 'master' into develop 2018-06-06 12:28:15 +01:00
Neil Johnson 752b7b32ed Merge tag 'v0.31.0'
Changes in synapse v0.31.0 (2018-06-06)
======================================

Most notable change from v0.30.0 is to switch to python prometheus library to improve system
stats reporting. WARNING this changes a number of prometheus metrics in a
backwards-incompatible manner. For more details, see
`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.

Bug Fixes:

* Fix metric documentation tables (PR #3341)
* Fix LaterGuage error handling (694968f)
* Fix replication metrics (b7e7fd2)

Changes in synapse v0.31.0-rc1 (2018-06-04)
==========================================

Features:

* Switch to the Python Prometheus library (PR #3256, #3274)
* Let users leave the server notice room after joining (PR #3287)

Changes:

* daily user type phone home stats (PR #3264)
* Use iter* methods for _filter_events_for_server (PR #3267)
* Docs on consent bits (PR #3268)
* Remove users from user directory on deactivate (PR #3277)
* Avoid sending consent notice to guest users (PR #3288)
* disable CPUMetrics if no /proc/self/stat (PR #3299)
* Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat!
* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
* Put python's logs into Trial when running unit tests (PR #3319)

Changes, python 3 migration:

* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
* use repr, not str (PR #3246) Thanks to @NotAFile!
* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
* more six iteritems (PR #3279) Thanks to @NotAFile!
* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
* py3-ize state.py (PR #3283) Thanks to @NotAFile!
* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
* use memoryview in py3 (PR #3303) Thanks to @NotAFile!

Bugs:

* Fix federation backfill bugs (PR #3261)
* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
2018-06-06 12:27:33 +01:00
Neil Johnson 3f589f9097 7 char sha in changelog 2018-06-06 11:39:42 +01:00
Neil Johnson 176f1206d1 Update CHANGES.rst 2018-06-06 11:28:30 +01:00
Neil Johnson 61134debdc bump version and changelog 2018-06-06 11:26:21 +01:00
Richard van der Hoff ad459a106c Merge pull request #3349 from t3chguy/redact_as_request_token
Redact AS tokens in log (fixes to #3327)
2018-06-06 10:58:07 +01:00
Michael Telatynski 592c162516 also redact __str__ of ApplicationService used for logging 2018-06-06 10:35:29 +01:00
Michael Telatynski 330432031b redact_uri in two missed log paths 2018-06-06 10:25:48 +01:00
David Baker bf54c1cf6c pep8 2018-06-06 10:15:33 +01:00
David Baker 3e4bc4488c More doc fixes 2018-06-06 09:44:10 +01:00
Amber Brown 48e2b48888 Merge pull request #3347 from krombel/py3_extend_tox_2
update tox.ini to cover 292 succeeding tests
2018-06-06 16:37:19 +10:00
Amber Brown d8db6d9267 Merge pull request #3348 from intelfx/fix-sortedcontainers
federation/send_queue.py: fix usage of sortedcontainers.SortedDict
2018-06-06 16:36:53 +10:00
Amber Brown 23c785992f Fix metric documentation tables (#3341) 2018-06-06 07:12:16 +01:00
Richard van der Hoff b3b16490f7 Add note to changelog on prometheus metrics 2018-06-06 07:08:36 +01:00
Richard van der Hoff 592ee217a3 Merge commit 'b7e7fd2' into release-v0.31.0 2018-06-06 07:02:02 +01:00
Amber Brown 304bb22c1d Fix metric documentation tables (#3341) 2018-06-06 15:52:37 +10:00
Ivan Shapovalov c88d50aa8f federation/send_queue.py: fix usage of sortedcontainers.SortedDict 2018-06-06 02:45:18 +03:00
Krombel 0c87eed294 update tox.ini to cover 292 succeeding tests
Signed-Off-By: Matthias Kesler <krombel@krombel.de>
2018-06-05 23:10:15 +02:00
Richard van der Hoff e316407b5d Merge pull request #3327 from t3chguy/redact_as_request_token
Strip `access_token` from outgoing requests
2018-06-05 19:08:46 +01:00
Michael Telatynski e6cbf47773 factor out uri redaction into a method on http 2018-06-05 18:31:40 +01:00
David Baker 607bd27c83 fix pep8 2018-06-05 18:10:35 +01:00
David Baker d62162bbec doc fixes 2018-06-05 18:09:13 +01:00
Richard van der Hoff 617afee069 Merge pull request #3340 from ArchangeGabriel/patch-1
doc/postgres.rst: fix display of the last command block
2018-06-05 17:45:17 +01:00
Richard van der Hoff 522bd3c8a3 Merge remote-tracking branch 'origin/master' into develop 2018-06-05 17:42:49 +01:00
Will Hunt d6e3c2c79b Let's try labels instead of label, that might work 2018-06-05 17:30:45 +01:00
Amber Brown f7869f8f8b Port to sortedcontainers (with tests!) (#3332) 2018-06-06 00:13:57 +10:00
Will Hunt 604cff1a06 Add metrics to track appservice transactions 2018-06-05 13:21:30 +01:00
Bruno Pagani b50f18171d doc/postgres.rest: fix displaying of the last command block
Also indent all of them with 4 spaces.
2018-06-04 22:41:52 +00:00
Richard van der Hoff f29b41fde9 Merge pull request #3324 from matrix-org/rav/remove_dead_method
Remove was_forgotten_at
2018-06-04 18:18:08 +01:00
Richard van der Hoff 28b0490dfd Merge pull request #3334 from matrix-org/rav/cache_factor_override
Cache factor override system for specific caches
2018-06-04 17:19:33 +01:00
Richard van der Hoff b7e7fd2d0e Fix replication metrics
fix bug introduced in #3256
2018-06-04 16:23:05 +01:00
Neil Johnson 244ab974e7 bump version and changelog 2018-06-04 16:09:58 +01:00
Richard van der Hoff 694968fa81 Hopefully, fix LaterGuage error handling 2018-06-04 15:59:14 +01:00
Erik Johnston 042eedfa2b Add hacky cache factor override system 2018-06-04 15:39:28 +01:00
David Baker c5930d513a Docstring 2018-06-04 12:05:58 +01:00
David Baker 6a29e815fc Fix comment 2018-06-04 12:01:23 +01:00
David Baker e44150a6de Missing yield 2018-06-04 12:01:13 +01:00
David Baker f731e42baf docstring 2018-06-04 12:00:51 +01:00
Amber Brown 5dbf305444 Put python's logs into Trial when running unit tests (#3319) 2018-06-04 16:06:06 +10:00
Amber Brown 86accac5d5 Merge pull request #3328 from intelfx/fix-metrics-LaterGauge-usage
federation: fix LaterGauge usage
2018-06-04 15:35:32 +10:00
Ivan Shapovalov 7d9d75e4e8 federation/send_queue.py: fix usage of LaterGauge
Fixes a startup crash due to commit df9f72d9e5
"replacing portions".
2018-06-03 14:16:17 +03:00
Michael Telatynski 09503126df Strip access_token from outgoing requests using existing regex 2018-06-02 23:25:13 +01:00
Richard van der Hoff c1f4118bb6 Remove was_forgotten_at
This is unused. IT MUST DIE!!!1
̧̪͈̱̹̳͖͙H̵̰̤̰͕̖e̛ ͚͉̗̼̞w̶̩̥͉̮h̩̺̪̩͘ͅọ͎͉̟ ̜̩͔̦̘ͅW̪̫̩̣̲͔̳a͏͔̳͖i͖͜t͓̤̠͓͙s̘̰̩̥̙̝ͅ ̲̠̬̥Be̡̙̫̦h̰̩i̛̫͙͔̭̤̗̲n̳͞d̸ ͎̻͘T̛͇̝̲̹̠̗ͅh̫̦̝ͅe̩̫͟ ͓͖̼W͕̳͎͚̙̥ą̙l̘͚̺͔͞ͅl̳͍̙̤̤̮̳.̢
̟̺̜̙͉Z̤̲̙̙͎̥̝A͎̣͔̙͘L̥̻̗̳̻̳̳͢G͉̖̯͓̞̩̦O̹̹̺!̙͈͎̞̬ *
2018-06-01 18:21:49 +01:00
Richard van der Hoff a9e97dcd65 Merge pull request #3317 from thegcat/feature/3312-add_ipv6_to_blacklist_example_config
Add private IPv6 addresses to example config for url preview blacklist
2018-06-01 14:45:14 +01:00
Neil Johnson 71477f3317 Merge pull request #3264 from matrix-org/neil/sign-up-stats
daily user type phone home stats
2018-06-01 13:42:01 +00:00
Richard van der Hoff 41006d9c28 Merge pull request #3318 from matrix-org/rav/ignore_depth_on_rrs
Ignore depth when updating read-receipts
2018-06-01 14:14:45 +01:00
Richard van der Hoff 9f797a24a4 Handle RRs which arrive before their events 2018-06-01 14:01:43 +01:00
Richard van der Hoff 857e6fd8b6 Ignore depth when updating read-receipts
Order read receipts by stream ordering instead of depth
2018-06-01 12:18:11 +01:00
Felix Schäfer 4ef76f3ac4 Add private IPv6 addresses to preview blacklist #3312
The added addresses are expected to be local or loopback addresses and
shouldn't be spidered for previews.

Signed-off-by: Felix Schäfer <felix@thegcat.net>
2018-06-01 12:18:35 +02:00
Neil Johnson 4986b084f8 remove unnecessary INSERT 2018-06-01 10:50:40 +01:00
Richard van der Hoff c2c3092cce code_style.rst: formatting 2018-05-31 16:11:34 +01:00
Amber Brown febe0ec8fd Run Prometheus on a different port, optionally. (#3274) 2018-05-31 19:04:50 +10:00
Amber Brown c936a52a9e Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (#3307) 2018-05-31 19:03:47 +10:00
Richard van der Hoff e73635191f Merge pull request #3290 from rubo77/patch-7
Add link to thorough instruction how to configure consent
2018-05-30 19:52:01 +01:00
Richard van der Hoff 219c2a322b remove trailing whitespace 2018-05-30 19:42:19 +01:00
Richard van der Hoff 2e4be8bfd9 fix english and wrap comment 2018-05-30 19:24:12 +01:00
Amber Brown 872cf43516 Merge pull request #3303 from NotAFile/py3-memoryview
use memoryview in py3
2018-05-30 12:51:42 +10:00
Amber Brown debff7ae09 Merge pull request #3281 from NotAFile/py3-six-isinstance
remaining isintance fixes
2018-05-30 12:44:46 +10:00
Richard van der Hoff 34b85df7f5 Update some comments and docstrings in SyncHandler 2018-05-29 22:31:18 +01:00
Richard van der Hoff 711f61a31d Merge pull request #3304 from matrix-org/rav/exempt_as_users_from_gdpr
Exempt AS-registered users from doing gdpr
2018-05-29 20:25:12 +01:00
Richard van der Hoff a995fdae39 fix tests 2018-05-29 20:19:29 +01:00
Richard van der Hoff 4a9cbdbc15 Exempt AS-registered users from doing gdpr 2018-05-29 19:54:32 +01:00
Neil Johnson ab0ef31dc7 create users index on creation_ts 2018-05-29 17:51:08 +01:00
Neil Johnson 558f3d376a create index in background 2018-05-29 17:47:55 +01:00
Neil Johnson c379acd4fd bump version 2018-05-29 17:47:28 +01:00
Richard van der Hoff db2e4608ab Merge pull request #3302 from krombel/py3_extend_tox_testing
extend tox testing for py3 to avoid regressions
2018-05-29 17:37:52 +01:00
Adrian Tschira 4b9d0cde97 add remaining memoryview changes 2018-05-29 17:42:43 +02:00
Adrian Tschira 7873cde526 pep8 2018-05-29 17:35:55 +02:00
Adrian Tschira 1afafb3497 use memoryview in py3
Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-05-29 17:14:34 +02:00
Krombel d6cd55532b extend tox testing for py3 to avoid regressions 2018-05-29 16:02:41 +02:00
Amber Brown 235b53263a Merge pull request #3299 from matrix-org/matthew/macos-fixes
disable CPUMetrics if no /proc/self/stat
2018-05-29 11:45:45 +10:00
Matthew Hodgson ff1bc0a279 pep8 2018-05-29 02:32:15 +01:00
Matthew Hodgson adb6bac4d5 fix another dumb typo 2018-05-29 02:29:22 +01:00
Matthew Hodgson 0a240ad36e disable CPUMetrics if no /proc/self/stat
fixes build on macOS again
2018-05-29 02:23:30 +01:00
Matthew Hodgson 284e36ad04 fix dumb typo 2018-05-29 02:23:11 +01:00
Amber Brown 81717e8515 Merge pull request #3256 from matrix-org/3218-official-prom
Switch to the Python Prometheus library
2018-05-28 23:30:09 +10:00
Amber Brown 57ad76fa4a fix up tests 2018-05-28 19:51:53 +10:00
Amber Brown 3ef5cd74a6 update to more consistently use seconds in any metrics or logging 2018-05-28 19:39:27 +10:00
Amber Brown 5c40ce3777 invalid syntax :( 2018-05-28 19:16:09 +10:00
Amber Brown 357c74a50f add comment about why unreg 2018-05-28 19:14:41 +10:00
Amber Brown a2eb5db4a0 update metrics to be in seconds 2018-05-28 19:10:27 +10:00
Amber Brown 754826a830 Merge remote-tracking branch 'origin/develop' into 3218-official-prom 2018-05-28 18:57:23 +10:00
Ruben Barkow 08ea5fe635 add link to thorough instruction how to configure consent 2018-05-25 23:19:55 +02:00
Richard van der Hoff 60f09b1e11 Merge pull request #3288 from matrix-org/rav/no_spam_guests
Avoid sending consent notice to guest users
2018-05-25 11:44:45 +01:00
Richard van der Hoff 66bdae986f Fix default for send_server_notice_to_guests
bool("False") == True...
2018-05-25 11:42:05 +01:00
Richard van der Hoff ba1b163590 Avoid sending consent notice to guest users
we think it makes sense not to send the notices to guest users.
2018-05-25 11:36:43 +01:00
Richard van der Hoff 41921ac01b Merge pull request #3287 from matrix-org/rav/allow_leaving_server_notices_room
Let users leave the server notice room after joining
2018-05-25 11:15:45 +01:00
Richard van der Hoff 757ed27258 Let users leave the server notice room after joining
They still can't reject invites, but we let them leave it.
2018-05-25 11:07:21 +01:00
Adrian Tschira 4ee4450d66 fix recursion error 2018-05-24 21:44:10 +02:00
Adrian Tschira dd068ca979 remaining isintance fixes
Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-05-24 20:55:08 +02:00
David Baker 77a23e2e05 Merge remote-tracking branch 'origin/develop' into dbkr/unbind 2018-05-24 16:20:53 +01:00
David Baker 9700d15611 pep8 2018-05-24 11:23:15 +01:00
David Baker a21a41bad7 comment 2018-05-24 11:19:59 +01:00
David Baker b3bff53178 Unbind 3pids when they're deleted too 2018-05-24 11:08:05 +01:00
Amber Brown 389dac2c15 pepeightttt 2018-05-23 13:08:59 -05:00
Amber Brown 472a5ec4e2 add back CPU metrics 2018-05-23 13:03:56 -05:00
Amber Brown e987079037 fixes 2018-05-23 13:03:51 -05:00
David Baker 2c7866d664 Hit the 3pid unbind endpoint on deactivation 2018-05-23 14:38:56 +01:00
Amber Brown b6063631c3 more cleanup 2018-05-22 17:36:20 -05:00
Amber Brown 53cc2cde1f cleanup 2018-05-22 17:32:57 -05:00
Amber Brown 071206304d cleanup pep8 errors 2018-05-22 16:54:22 -05:00
Amber Brown 4abeaedcf3 tests/metrics is gone now 2018-05-22 16:45:11 -05:00
Amber Brown 85ba83eb51 fixes 2018-05-22 16:28:23 -05:00
Amber Brown 228f1f584e fix the test failures 2018-05-22 15:02:38 -05:00
Neil Johnson d8cb7225d2 daily user type phone home stats 2018-05-22 18:09:09 +01:00
Amber Brown 07bb9bdae8 update gitignore to remove some files that got put on my machine 2018-05-22 10:57:28 -05:00
Amber Brown 8f5a688d42 cleanups, self-registration 2018-05-22 10:56:03 -05:00
Amber Brown a8990fa2ec Merge remote-tracking branch 'origin/develop' into 3218-official-prom 2018-05-22 10:50:26 -05:00
Amber Brown fcc525b0b7 rest of the changes 2018-05-21 19:48:57 -05:00
Amber Brown df9f72d9e5 replacing portions 2018-05-21 19:47:37 -05:00
Amber Brown c60e0d5e02 don't need the resource portion 2018-05-21 17:03:20 -05:00
Amber Brown 02c1d29133 look at the Prometheus metrics instead 2018-05-21 17:02:20 -05:00
Amber Brown f258deffcb remove old metrics libs 2018-05-21 17:01:15 -05:00
169 changed files with 2748 additions and 2093 deletions
+4
View File
@@ -1,5 +1,6 @@
*.pyc
.*.swp
*~
.DS_Store
_trial_temp/
@@ -13,6 +14,7 @@ docs/build/
cmdclient_config.json
homeserver*.db
homeserver*.log
homeserver*.log.*
homeserver*.pid
homeserver*.yaml
@@ -40,6 +42,8 @@ media_store/
*.tac
build/
venv/
venv*/
localhost-800*/
static/client/register/register_config.js
+9 -1
View File
@@ -4,7 +4,12 @@ language: python
# tell travis to cache ~/.cache/pip
cache: pip
before_script:
- git remote set-branches --add origin develop
- git fetch origin develop
matrix:
fast_finish: true
include:
- python: 2.7
env: TOX_ENV=packaging
@@ -14,10 +19,13 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 3.6
env: TOX_ENV=py36
- python: 3.6
env: TOX_ENV=check-newsfragment
install:
- pip install tox
+78
View File
@@ -1,3 +1,81 @@
Changes in synapse v0.31.2 (2018-06-14)
=======================================
SECURITY UPDATE: Prevent unauthorised users from setting state events in a room
when there is no ``m.room.power_levels`` event in force in the room. (PR #3397)
Discussion around the Matrix Spec change proposal for this change can be
followed at https://github.com/matrix-org/matrix-doc/issues/1304.
Changes in synapse v0.31.1 (2018-06-08)
=======================================
v0.31.1 fixes a security bug in the ``get_missing_events`` federation API
where event visibility rules were not applied correctly.
We are not aware of it being actively exploited but please upgrade asap.
Bug Fixes:
* Fix event filtering in get_missing_events handler (PR #3371)
Changes in synapse v0.31.0 (2018-06-06)
=======================================
Most notable change from v0.30.0 is to switch to the python prometheus library to improve system
stats reporting. WARNING: this changes a number of prometheus metrics in a
backwards-incompatible manner. For more details, see
`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.
Bug Fixes:
* Fix metric documentation tables (PR #3341)
* Fix LaterGauge error handling (694968f)
* Fix replication metrics (b7e7fd2)
Changes in synapse v0.31.0-rc1 (2018-06-04)
==========================================
Features:
* Switch to the Python Prometheus library (PR #3256, #3274)
* Let users leave the server notice room after joining (PR #3287)
Changes:
* daily user type phone home stats (PR #3264)
* Use iter* methods for _filter_events_for_server (PR #3267)
* Docs on consent bits (PR #3268)
* Remove users from user directory on deactivate (PR #3277)
* Avoid sending consent notice to guest users (PR #3288)
* disable CPUMetrics if no /proc/self/stat (PR #3299)
* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
* Put python's logs into Trial when running unit tests (PR #3319)
Changes, python 3 migration:
* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
* use repr, not str (PR #3246) Thanks to @NotAFile!
* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
* more six iteritems (PR #3279) Thanks to @NotAFile!
* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
* py3-ize state.py (PR #3283) Thanks to @NotAFile!
* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
* use memoryview in py3 (PR #3303) Thanks to @NotAFile!
Bugs:
* Fix federation backfill bugs (PR #3261)
* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
Changes in synapse v0.30.0 (2018-05-24)
==========================================
+20
View File
@@ -48,6 +48,26 @@ Please ensure your changes match the cosmetic style of the existing project,
and **never** mix cosmetic and functional changes in the same commit, as it
makes it horribly hard to review otherwise.
Changelog
~~~~~~~~~
All changes, even minor ones, need a corresponding changelog
entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``issuenumberOrPR.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
For example, a fix for a bug reported in #1234 would have its
changelog entry in ``changelog.d/1234.bugfix``, and contain content
like "The security levels of Florbs are now validated when
recieved over federation. Contributed by Jane Matrix".
Attribution
~~~~~~~~~~~
+3
View File
@@ -29,5 +29,8 @@ exclude Dockerfile
exclude .dockerignore
recursive-exclude jenkins *.sh
include pyproject.toml
recursive-include changelog.d *
prune .github
prune demo/etc
+1
View File
@@ -0,0 +1 @@
Remove was_forgotten_at
+1
View File
@@ -0,0 +1 @@
Strip access_token from outgoing requests
+1
View File
@@ -0,0 +1 @@
Cache factor override system for specific caches
+1
View File
@@ -0,0 +1 @@
``doc/postgres.rst``: fix display of the last command block. Thanks to @ArchangeGabriel!
View File
+1
View File
@@ -0,0 +1 @@
Add metrics to track appservice transactions
View File
View File
+1
View File
@@ -0,0 +1 @@
Redact AS tokens in logs
+1
View File
@@ -0,0 +1 @@
Fix federation backfill from SQLite servers
View File
+1
View File
@@ -0,0 +1 @@
Fix event-purge-by-ts admin API
+1
View File
@@ -0,0 +1 @@
Fix event filtering in get_missing_events handler
+1
View File
@@ -0,0 +1 @@
Try to log more helpful info when a sig verification fails
+1 -1
View File
@@ -16,7 +16,7 @@
print("I am a fish %s" %
"moo")
and this::
and this::
print(
"I am a fish %s" %
+76 -11
View File
@@ -1,25 +1,47 @@
How to monitor Synapse metrics using Prometheus
===============================================
1. Install prometheus:
1. Install Prometheus:
Follow instructions at http://prometheus.io/docs/introduction/install/
2. Enable synapse metrics:
2. Enable Synapse metrics:
Simply setting a (local) port number will enable it. Pick a port.
prometheus itself defaults to 9090, so starting just above that for
locally monitored services seems reasonable. E.g. 9092:
There are two methods of enabling metrics in Synapse.
Add to homeserver.yaml::
The first serves the metrics as a part of the usual web server and can be
enabled by adding the "metrics" resource to the existing listener as such::
metrics_port: 9092
resources:
- names:
- client
- metrics
Also ensure that ``enable_metrics`` is set to ``True``.
This provides a simple way of adding metrics to your Synapse installation,
and serves under ``/_synapse/metrics``. If you do not wish your metrics be
publicly exposed, you will need to either filter it out at your load
balancer, or use the second method.
Restart synapse.
The second method runs the metrics server on a different port, in a
different thread to Synapse. This can make it more resilient to heavy load
meaning metrics cannot be retrieved, and can be exposed to just internal
networks easier. The served metrics are available over HTTP only, and will
be available at ``/``.
3. Add a prometheus target for synapse.
Add a new listener to homeserver.yaml::
listeners:
- type: metrics
port: 9000
bind_addresses:
- '0.0.0.0'
For both options, you will need to ensure that ``enable_metrics`` is set to
``True``.
Restart Synapse.
3. Add a Prometheus target for Synapse.
It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``)::
@@ -31,7 +53,50 @@ How to monitor Synapse metrics using Prometheus
If your prometheus is older than 1.5.2, you will need to replace
``static_configs`` in the above with ``target_groups``.
Restart prometheus.
Restart Prometheus.
Removal of deprecated metrics & time based counters becoming histograms in 0.31.0
---------------------------------------------------------------------------------
The duplicated metrics deprecated in Synapse 0.27.0 have been removed.
All time duration-based metrics have been changed to be seconds. This affects:
+----------------------------------+
| msec -> sec metrics |
+==================================+
| python_gc_time |
+----------------------------------+
| python_twisted_reactor_tick_time |
+----------------------------------+
| synapse_storage_query_time |
+----------------------------------+
| synapse_storage_schedule_time |
+----------------------------------+
| synapse_storage_transaction_time |
+----------------------------------+
Several metrics have been changed to be histograms, which sort entries into
buckets and allow better analysis. The following metrics are now histograms:
+-------------------------------------------+
| Altered metrics |
+===========================================+
| python_gc_time |
+-------------------------------------------+
| python_twisted_reactor_pending_calls |
+-------------------------------------------+
| python_twisted_reactor_tick_time |
+-------------------------------------------+
| synapse_http_server_response_time_seconds |
+-------------------------------------------+
| synapse_storage_query_time |
+-------------------------------------------+
| synapse_storage_schedule_time |
+-------------------------------------------+
| synapse_storage_transaction_time |
+-------------------------------------------+
Block and response metrics renamed for 0.27.0
+9 -9
View File
@@ -9,19 +9,19 @@ Set up database
Assuming your PostgreSQL database user is called ``postgres``, create a user
``synapse_user`` with::
su - postgres
createuser --pwprompt synapse_user
su - postgres
createuser --pwprompt synapse_user
The PostgreSQL database used *must* have the correct encoding set, otherwise it
would not be able to store UTF8 strings. To create a database with the correct
encoding use, e.g.::
CREATE DATABASE synapse
ENCODING 'UTF8'
LC_COLLATE='C'
LC_CTYPE='C'
template=template0
OWNER synapse_user;
CREATE DATABASE synapse
ENCODING 'UTF8'
LC_COLLATE='C'
LC_CTYPE='C'
template=template0
OWNER synapse_user;
This would create an appropriate database named ``synapse`` owned by the
``synapse_user`` user (which must already exist).
@@ -126,7 +126,7 @@ run::
--postgres-config homeserver-postgres.yaml
Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file ``homeserver-postgres.yaml``:
database configuration file ``homeserver-postgres.yaml``::
./synctl stop
mv homeserver.yaml homeserver-old-sqlite.yaml
+5 -2
View File
@@ -5,7 +5,7 @@ Server Notices
channel whereby server administrators can send messages to users on the server.
They are used as part of communication of the server polices(see
[consent_tracking.md](consent_tracking.md)), however the intention is that
[consent_tracking.md](consent_tracking.md)), however the intention is that
they may also find a use for features such as "Message of the day".
This is a feature specific to Synapse, but it uses standard Matrix
@@ -24,7 +24,10 @@ history; it will appear to have come from the 'server notices user' (see
below).
The user is prevented from sending any messages in this room by the power
levels. They also cannot leave it.
levels.
Having joined the room, the user can leave the room if they want. Subsequent
server notices will then cause a new room to be created.
Synapse configuration
---------------------
+5
View File
@@ -0,0 +1,5 @@
[tool.towncrier]
package = "synapse"
filename = "CHANGES.rst"
directory = "changelog.d"
issue_format = "`#{issue} <https://github.com/matrix-org/synapse/issues/{issue}>`_"
+51 -14
View File
@@ -18,14 +18,22 @@
from __future__ import print_function
import argparse
from urlparse import urlparse, urlunparse
import nacl.signing
import json
import base64
import requests
import sys
from requests.adapters import HTTPAdapter
import srvlookup
import yaml
# uncomment the following to enable debug logging of http requests
#from httplib import HTTPConnection
#HTTPConnection.debuglevel = 1
def encode_base64(input_bytes):
"""Encode bytes as a base64 string without any padding."""
@@ -113,17 +121,6 @@ def read_signing_keys(stream):
return keys
def lookup(destination, path):
if ":" in destination:
return "https://%s%s" % (destination, path)
else:
try:
srv = srvlookup.lookup("matrix", "tcp", destination)[0]
return "https://%s:%d%s" % (srv.host, srv.port, path)
except:
return "https://%s:%d%s" % (destination, 8448, path)
def request_json(method, origin_name, origin_key, destination, path, content):
if method is None:
if content is None:
@@ -152,13 +149,19 @@ def request_json(method, origin_name, origin_key, destination, path, content):
authorization_headers.append(bytes(header))
print ("Authorization: %s" % header, file=sys.stderr)
dest = lookup(destination, path)
dest = "matrix://%s%s" % (destination, path)
print ("Requesting %s" % dest, file=sys.stderr)
result = requests.request(
s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
result = s.request(
method=method,
url=dest,
headers={"Authorization": authorization_headers[0]},
headers={
"Host": destination,
"Authorization": authorization_headers[0]
},
verify=False,
data=content,
)
@@ -242,5 +245,39 @@ def read_args_from_config(args):
args.signing_key_path = config['signing_key_path']
class MatrixConnectionAdapter(HTTPAdapter):
@staticmethod
def lookup(s):
if s[-1] == ']':
# ipv6 literal (with no port)
return s, 8448
if ":" in s:
out = s.rsplit(":",1)
try:
port = int(out[1])
except ValueError:
raise ValueError("Invalid host:port '%s'" % s)
return out[0], port
try:
srv = srvlookup.lookup("matrix", "tcp", s)[0]
return srv.host, srv.port
except:
return s, 8448
def get_connection(self, url, proxies=None):
parsed = urlparse(url)
(host, port) = self.lookup(parsed.netloc)
netloc = "%s:%d" % (host, port)
print("Connecting to %s" % (netloc,), file=sys.stderr)
url = urlunparse((
"https", netloc, parsed.path, parsed.params, parsed.query,
parsed.fragment,
))
return super(MatrixConnectionAdapter, self).get_connection(url, proxies)
if __name__ == "__main__":
main()
+2 -1
View File
@@ -17,4 +17,5 @@ ignore =
[flake8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
ignore = W503
# E203 is contrary to PEP8.
ignore = W503,E203
+2 -1
View File
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.30.0"
__version__ = "0.31.2"
+5 -3
View File
@@ -15,6 +15,8 @@
import logging
from six import itervalues
import pymacaroons
from twisted.internet import defer
@@ -57,7 +59,7 @@ class Auth(object):
self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
register_cache("token_cache", self.token_cache)
register_cache("cache", "token_cache", self.token_cache)
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
@@ -66,7 +68,7 @@ class Auth(object):
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
(e.type, e.state_key): e for e in auth_events.values()
(e.type, e.state_key): e for e in itervalues(auth_events)
}
self.check(event, auth_events=auth_events, do_sig_check=do_sig_check)
@@ -653,7 +655,7 @@ class Auth(object):
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = event_auth.get_send_level(
EventTypes.Aliases, "", auth_events
EventTypes.Aliases, "", power_level_event,
)
user_level = event_auth.get_user_power_level(user_id, auth_events)
+1 -1
View File
@@ -411,7 +411,7 @@ class Filter(object):
return room_ids
def filter(self, events):
return filter(self.check, events)
return list(filter(self.check, events))
def limit(self):
return self.filter_json.get("limit", 10)
+13
View File
@@ -124,6 +124,19 @@ def quit_with_error(error_string):
sys.exit(1)
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
"""
from synapse.metrics import RegistryProxy
from prometheus_client import start_http_server
for host in bind_addresses:
reactor.callInThread(start_http_server, int(port),
addr=host, registry=RegistryProxy)
logger.info("Metrics now reporting on %s:%d", host, port)
def listen_tcp(bind_addresses, port, factory, backlog=50):
"""
Create a TCP socket for a port and several addresses
+9 -1
View File
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -94,6 +95,13 @@ class AppserviceServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -2
View File
@@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -77,7 +78,7 @@ class ClientReaderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
PublicRoomListRestServlet(self).register(resource)
@@ -118,7 +119,13 @@ class ClientReaderServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -90,7 +91,7 @@ class EventCreatorServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
RoomSendEventRestServlet(self).register(resource)
@@ -134,6 +135,13 @@ class EventCreatorServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -71,7 +72,7 @@ class FederationReaderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "federation":
resources.update({
FEDERATION_PREFIX: TransportLayerServer(self),
@@ -107,6 +108,13 @@ class FederationReaderServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -89,7 +90,7 @@ class FederationSenderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -121,6 +122,13 @@ class FederationSenderServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -29,6 +29,7 @@ from synapse.http.servlet import (
RestServlet, parse_json_object_from_request,
)
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -131,7 +132,7 @@ class FrontendProxyServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
@@ -172,6 +173,13 @@ class FrontendProxyServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+14 -10
View File
@@ -34,7 +34,7 @@ from synapse.module_api import ModuleApi
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import register_memory_metrics
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements
@@ -230,7 +230,7 @@ class SynapseHomeServer(HomeServer):
resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
@@ -263,6 +263,13 @@ class SynapseHomeServer(HomeServer):
reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening,
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -311,11 +318,6 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
version_string = "Synapse/" + get_version_string(synapse)
logger.info("Server hostname: %s", config.server_name)
logger.info("Server version: %s", version_string)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
@@ -328,7 +330,7 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
version_string=version_string,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
@@ -362,8 +364,6 @@ def setup(config_options):
hs.get_datastore().start_doing_background_updates()
hs.get_federation_client().start_get_pdu_cache()
register_memory_metrics(hs)
reactor.callWhenRunning(start)
return hs
@@ -434,6 +434,10 @@ def run(hs):
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.iteritems():
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
+9 -1
View File
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -73,7 +74,7 @@ class MediaRepositoryServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media":
media_repo = self.get_media_repository_resource()
resources.update({
@@ -114,6 +115,13 @@ class MediaRepositoryServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
@@ -92,7 +93,7 @@ class PusherServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -124,6 +125,13 @@ class PusherServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -257,7 +258,7 @@ class SynchrotronServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource)
@@ -301,6 +302,13 @@ class SynchrotronServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+4
View File
@@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
+9 -1
View File
@@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -105,7 +106,7 @@ class UserDirectoryServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
user_directory.register_servlets(self, resource)
@@ -146,6 +147,13 @@ class UserDirectoryServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+5 -1
View File
@@ -292,4 +292,8 @@ class ApplicationService(object):
return self.rate_limited
def __str__(self):
return "ApplicationService: %s" % (self.__dict__,)
# copy dictionary and redact token fields so they don't get logged
dict_copy = self.__dict__.copy()
dict_copy["token"] = "<redacted>"
dict_copy["hs_token"] = "<redacted>"
return "ApplicationService: %s" % (dict_copy,)
+22
View File
@@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
import logging
import urllib
from prometheus_client import Counter
logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"]
)
failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"]
)
sent_events_counter = Counter(
"synapse_appservice_api_sent_events",
"Number of events sent to the AS",
["service"]
)
HOUR_IN_MS = 60 * 60 * 1000
@@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
failed_transactions_counter.labels(service.id).inc()
defer.returnValue(False)
def _serialize(self, events):
+13
View File
@@ -19,6 +19,7 @@ from synapse.types import UserID
import yaml
import logging
import re
from six import string_types
from six.moves.urllib import parse as urlparse
@@ -31,6 +32,18 @@ class AppServiceConfig(Config):
def read_config(self, config):
self.app_service_config_files = config.get("app_service_config_files", [])
self.notify_appservices = config.get("notify_appservices", True)
s_blacklist = config.get(
"app_service_server_blacklist", []
)
try:
self.app_service_server_blacklist = [
re.compile(x) for x in s_blacklist
]
except re.error as ex:
raise ConfigError(
"Could not compile regex in app_service_server_blacklist: %s",
ex.message
)
def default_config(cls, **kwargs):
return """\
+10 -1
View File
@@ -18,6 +18,9 @@ from ._base import Config
DEFAULT_CONFIG = """\
# User Consent configuration
#
# for detailed instructions, see
# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
#
# Parts of this section are required if enabling the 'consent' resource under
# 'listeners', in particular 'template_dir' and 'version'.
#
@@ -32,7 +35,8 @@ DEFAULT_CONFIG = """\
#
# 'server_notice_content', if enabled, will send a user a "Server Notice"
# asking them to consent to the privacy policy. The 'server_notices' section
# must also be configured for this to work.
# must also be configured for this to work. Notices will *not* be sent to
# guest users unless 'send_server_notice_to_guests' is set to true.
#
# 'block_events_error', if set, will block any attempts to send events
# until the user consents to the privacy policy. The value of the setting is
@@ -46,6 +50,7 @@ DEFAULT_CONFIG = """\
# body: >-
# To continue using this homeserver you must review and agree to the
# terms and conditions at %(consent_uri)s
# send_server_notice_to_guests: True
# block_events_error: >-
# To continue using this homeserver you must review and agree to the
# terms and conditions at %(consent_uri)s
@@ -60,6 +65,7 @@ class ConsentConfig(Config):
self.user_consent_version = None
self.user_consent_template_dir = None
self.user_consent_server_notice_content = None
self.user_consent_server_notice_to_guests = False
self.block_events_without_consent_error = None
def read_config(self, config):
@@ -74,6 +80,9 @@ class ConsentConfig(Config):
self.block_events_without_consent_error = consent_config.get(
"block_events_error",
)
self.user_consent_server_notice_to_guests = bool(consent_config.get(
"send_server_notice_to_guests", False,
))
def default_config(self, **kwargs):
return DEFAULT_CONFIG
+18 -6
View File
@@ -12,17 +12,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
from synapse.util.logcontext import LoggingContextFilter
from twisted.logger import globalLogBeginner, STDLibLogObserver
import logging
import logging.config
import yaml
from string import Template
import os
import signal
from string import Template
import sys
from twisted.logger import STDLibLogObserver, globalLogBeginner
import yaml
import synapse
from synapse.util.logcontext import LoggingContextFilter
from synapse.util.versionstring import get_version_string
from ._base import Config
DEFAULT_LOG_CONFIG = Template("""
version: 1
@@ -202,6 +205,15 @@ def setup_logging(config, use_worker_options=False):
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn(
"Server %s version %s",
sys.argv[0], get_version_string(synapse),
)
logging.info("Server hostname: %s", config.server_name)
# It's critical to point twisted's internal logging somewhere, otherwise it
# stacks up and leaks kup to 64K object;
# see: https://twistedmatrix.com/trac/ticket/8164
+3
View File
@@ -250,6 +250,9 @@ class ContentRepositoryConfig(Config):
# - '192.168.0.0/16'
# - '100.64.0.0/10'
# - '169.254.0.0/16'
# - '::1/128'
# - 'fe80::/64'
# - 'fc00::/7'
#
# List of IP address CIDR ranges that the URL preview spider is allowed
# to access even if they are specified in url_preview_ip_range_blacklist.
+10
View File
@@ -14,8 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from ._base import Config, ConfigError
logger = logging.Logger(__name__)
class ServerConfig(Config):
@@ -138,6 +142,12 @@ class ServerConfig(Config):
metrics_port = config.get("metrics_port")
if metrics_port:
logger.warn(
("The metrics_port configuration option is deprecated in Synapse 0.31 "
"in favour of a listener. Please see "
"http://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst"
" on how to configure the new listener."))
self.listeners.append({
"port": metrics_port,
"bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
+25 -6
View File
@@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
from twisted.internet import defer
from signedjson.sign import (
verify_signed_json, signature_ids, sign_json, encode_canonical_json
verify_signed_json, signature_ids, sign_json, encode_canonical_json,
SignatureVerifyException,
)
from signedjson.key import (
is_signing_algorithm_supported, decode_verify_key_bytes
is_signing_algorithm_supported, decode_verify_key_bytes,
encode_verify_key_base64,
)
from unpaddedbase64 import decode_base64, encode_base64
@@ -56,7 +58,7 @@ Attributes:
key_ids(set(str)): The set of key_ids to that could be used to verify the
JSON object
json_object(dict): The JSON object to verify.
deferred(twisted.internet.defer.Deferred):
deferred(Deferred[str, str, nacl.signing.VerifyKey]):
A deferred (server_name, key_id, verify_key) tuple that resolves when
a verify key has been fetched. The deferreds' callbacks are run with no
logcontext.
@@ -736,6 +738,17 @@ class Keyring(object):
@defer.inlineCallbacks
def _handle_key_deferred(verify_request):
"""Waits for the key to become available, and then performs a verification
Args:
verify_request (VerifyKeyRequest):
Returns:
Deferred[None]
Raises:
SynapseError if there was a problem performing the verification
"""
server_name = verify_request.server_name
try:
with PreserveLoggingContext():
@@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
))
try:
verify_signed_json(json_object, server_name, verify_key)
except Exception:
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
server_name, verify_key.alg, verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s" % (
server_name, verify_key.alg, verify_key.version
"Invalid signature for server %s with key %s:%s: %s" % (
server_name, verify_key.alg, verify_key.version, str(e),
),
Codes.UNAUTHORIZED,
)
+68 -45
View File
@@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
event: the event being checked.
auth_events (dict: event-key -> event): the existing room state.
Raises:
AuthError if the checks fail
Returns:
True if the auth checks pass.
if the auth checks pass.
"""
if do_size_check:
_check_size_limits(event)
@@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
# Oh, we don't know what the state of the room was, so we
# are trusting that this is allowed (at least for now)
logger.warn("Trusting event: %s", event.event_id)
return True
return
if event.type == EventTypes.Create:
room_id_domain = get_domain_from_id(event.room_id)
@@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"Creation event's room_id domain does not match sender's"
)
# FIXME
return True
logger.debug("Allowing! %s", event)
return
creation_event = auth_events.get((EventTypes.Create, ""), None)
@@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Alias event's state_key does not match sender's domain"
)
return True
logger.debug("Allowing! %s", event)
return
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
if event.type == EventTypes.Member:
allowed = _is_membership_change_allowed(
event, auth_events
)
if allowed:
logger.debug("Allowing! %s", event)
else:
logger.debug("Denying! %s", event)
return allowed
_is_membership_change_allowed(event, auth_events)
logger.debug("Allowing! %s", event)
return
_check_event_sender_in_room(event, auth_events)
@@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
)
else:
return True
logger.debug("Allowing! %s", event)
return
_can_send_event(event, auth_events)
@@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
create = auth_events.get(key)
if create and event.prev_events[0][0] == create.event_id:
if create.content["creator"] == event.state_key:
return True
return
target_user_id = event.state_key
@@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
raise AuthError(
403, "%s is banned from the room" % (target_user_id,)
)
return True
return
if Membership.JOIN != membership:
if (caller_invited
and Membership.LEAVE == membership
and target_user_id == event.user_id):
return True
return
if not caller_in_room: # caller isn't joined
raise AuthError(
@@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
else:
raise AuthError(500, "Unknown membership %s" % membership)
return True
def _check_event_sender_in_room(event, auth_events):
key = (EventTypes.Member, event.user_id, )
@@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
))
def get_send_level(etype, state_key, auth_events):
key = (EventTypes.PowerLevels, "", )
send_level_event = auth_events.get(key)
send_level = None
if send_level_event:
send_level = send_level_event.content.get("events", {}).get(
etype
)
if send_level is None:
if state_key is not None:
send_level = send_level_event.content.get(
"state_default", 50
)
else:
send_level = send_level_event.content.get(
"events_default", 0
)
def get_send_level(etype, state_key, power_levels_event):
"""Get the power level required to send an event of a given type
if send_level:
send_level = int(send_level)
The federation spec [1] refers to this as "Required Power Level".
https://matrix.org/docs/spec/server_server/unstable.html#definitions
Args:
etype (str): type of event
state_key (str|None): state_key of state event, or None if it is not
a state event.
power_levels_event (synapse.events.EventBase|None): power levels event
in force at this point in the room
Returns:
int: power level required to send this event.
"""
if power_levels_event:
power_levels_content = power_levels_event.content
else:
send_level = 0
power_levels_content = {}
return send_level
# see if we have a custom level for this event type
send_level = power_levels_content.get("events", {}).get(etype)
# otherwise, fall back to the state_default/events_default.
if send_level is None:
if state_key is not None:
send_level = power_levels_content.get("state_default", 50)
else:
send_level = power_levels_content.get("events_default", 0)
return int(send_level)
def _can_send_event(event, auth_events):
power_levels_event = _get_power_level_event(auth_events)
send_level = get_send_level(
event.type, event.get("state_key", None), auth_events
event.type, event.get("state_key"), power_levels_event,
)
user_level = get_user_power_level(event.user_id, auth_events)
@@ -471,14 +480,14 @@ def _check_power_levels(event, auth_events):
]
old_list = current_state.content.get("users", {})
for user in set(old_list.keys() + user_list.keys()):
for user in set(list(old_list) + list(user_list)):
levels_to_check.append(
(user, "users")
)
old_list = current_state.content.get("events", {})
new_list = event.content.get("events", {})
for ev_id in set(old_list.keys() + new_list.keys()):
for ev_id in set(list(old_list) + list(new_list)):
levels_to_check.append(
(ev_id, "events")
)
@@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
def _get_power_level_event(auth_events):
key = (EventTypes.PowerLevels, "", )
return auth_events.get(key)
return auth_events.get((EventTypes.PowerLevels, ""))
def get_user_power_level(user_id, auth_events):
power_level_event = _get_power_level_event(auth_events)
"""Get a user's power level
Args:
user_id (str): user's id to look up in power_levels
auth_events (dict[(str, str), synapse.events.EventBase]):
state in force at this point in the room (or rather, a subset of
it including at least the create event and power levels event.
Returns:
int: the user's power level in this room.
"""
power_level_event = _get_power_level_event(auth_events)
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
if not level:
@@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
else:
return int(level)
else:
# if there is no power levels event, the creator gets 100 and everyone
# else gets 0.
# some things which call this don't pass the create event: hack around
# that.
key = (EventTypes.Create, "", )
create_event = auth_events.get(key)
if (create_event is not None and
+1 -1
View File
@@ -146,7 +146,7 @@ class EventBase(object):
return field in self._event_dict
def items(self):
return self._event_dict.items()
return list(self._event_dict.items())
class FrozenEvent(EventBase):
+9 -12
View File
@@ -32,20 +32,17 @@ from synapse.federation.federation_base import (
FederationBase,
event_from_pdu_json,
)
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from prometheus_client import Counter
logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
PDU_RETRY_TIME_MS = 1 * 60 * 1000
@@ -108,7 +105,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc(query_type)
sent_queries_counter.labels(query_type).inc()
return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
@@ -127,7 +124,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_device_keys")
sent_queries_counter.labels("client_device_keys").inc()
return self.transport_layer.query_client_keys(
destination, content, timeout
)
@@ -137,7 +134,7 @@ class FederationClient(FederationBase):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.inc("user_devices")
sent_queries_counter.labels("user_devices").inc()
return self.transport_layer.query_user_devices(
destination, user_id, timeout
)
@@ -154,7 +151,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_one_time_keys")
sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(
destination, content, timeout
)
@@ -394,7 +391,7 @@ class FederationClient(FederationBase):
"""
if return_local:
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
signed_events = list(seen_events.values())
else:
seen_events = yield self.store.have_seen_events(event_ids)
signed_events = []
@@ -592,7 +589,7 @@ class FederationClient(FederationBase):
}
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, pdus.values(),
destination, list(pdus.values()),
outlier=True,
)
+11 -21
View File
@@ -27,12 +27,13 @@ from synapse.federation.federation_base import (
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
from prometheus_client import Counter
from six import iteritems
# when processing incoming transactions, we try to handle multiple rooms in
@@ -41,17 +42,17 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
# synapse.federation.federation_server is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
received_pdus_counter = metrics.register_counter("received_pdus")
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
received_edus_counter = metrics.register_counter("received_edus")
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
class FederationServer(FederationBase):
def __init__(self, hs):
super(FederationServer, self).__init__(hs)
@@ -131,7 +132,7 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id)
received_pdus_counter.inc_by(len(transaction.pdus))
received_pdus_counter.inc(len(transaction.pdus))
pdus_by_room = {}
@@ -276,7 +277,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_pdu_request(self, origin, event_id):
pdu = yield self._get_persisted_pdu(origin, event_id)
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
if pdu:
defer.returnValue(
@@ -292,7 +293,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))
@@ -469,17 +470,6 @@ class FederationServer(FederationBase):
ts_now_ms = self._clock.time_msec()
return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
@log_function
def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
Returns:
Deferred: Results in a `Pdu`.
"""
return self.handler.get_persisted_pdu(
origin, event_id, do_auth=do_auth
)
def _transaction_from_pdus(self, pdu_list):
"""Returns a new Transaction containing the given PDUs suitable for
transmission.
+33 -43
View File
@@ -33,9 +33,9 @@ from .units import Edu
from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
import synapse.metrics
from synapse.metrics import LaterGauge
from blist import sorteddict
from sortedcontainers import SortedDict
from collections import namedtuple
import logging
@@ -45,9 +45,6 @@ from six import itervalues, iteritems
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
class FederationRemoteSendQueue(object):
"""A drop in replacement for TransactionQueue"""
@@ -58,29 +55,27 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id
self.presence_changed = SortedDict() # Stream position -> user_id
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
self.edus = sorteddict() # stream position -> Edu
self.edus = SortedDict() # stream position -> Edu
self.failures = sorteddict() # stream position -> (destination, Failure)
self.failures = SortedDict() # stream position -> (destination, Failure)
self.device_messages = sorteddict() # stream position -> destination
self.device_messages = SortedDict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
self.pos_time = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
metrics.register_callback(
queue_name + "_size",
lambda: len(queue),
)
LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
"", [], lambda: len(queue))
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@@ -103,7 +98,7 @@ class FederationRemoteSendQueue(object):
now = self.clock.time_msec()
keys = self.pos_time.keys()
time = keys.bisect_left(now - FIVE_MINUTES_AGO)
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
@@ -118,7 +113,7 @@ class FederationRemoteSendQueue(object):
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = keys.bisect_left(position_to_delete)
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
@@ -136,7 +131,7 @@ class FederationRemoteSendQueue(object):
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_left(position_to_delete)
i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
@@ -150,19 +145,19 @@ class FederationRemoteSendQueue(object):
# Delete things out of edu map
keys = self.edus.keys()
i = keys.bisect_left(position_to_delete)
i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
i = keys.bisect_left(position_to_delete)
i = self.failures.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map
keys = self.device_messages.keys()
i = keys.bisect_left(position_to_delete)
i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
@@ -202,7 +197,7 @@ class FederationRemoteSendQueue(object):
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
@@ -255,13 +250,12 @@ class FederationRemoteSendQueue(object):
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for user_id in self.presence_changed[pos]
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
@@ -270,13 +264,12 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
@@ -285,19 +278,17 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed edus
keys = self.edus.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
edus = ((k, self.edus[k]) for k in keys[i:j])
i = self.edus.bisect_right(from_token)
j = self.edus.bisect_right(to_token) + 1
edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Fetch changed failures
keys = self.failures.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
failures = ((k, self.failures[k]) for k in keys[i:j])
i = self.failures.bisect_right(from_token)
j = self.failures.bisect_right(to_token) + 1
failures = self.failures.items()[i:j]
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
@@ -306,10 +297,9 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed device messages
keys = self.device_messages.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
+38 -33
View File
@@ -21,28 +21,32 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException, FederationDeniedError
from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
from synapse.metrics import LaterGauge
from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
events_processed_counter,
)
from prometheus_client import Counter
from six import itervalues
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = client_metrics.register_distribution(
"sent_pdu_destinations"
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
)
sent_edus_counter = client_metrics.register_counter("sent_edus")
sent_transactions_counter = client_metrics.register_counter("sent_transactions")
events_processed_counter = client_metrics.register_counter("events_processed")
class TransactionQueue(object):
@@ -69,8 +73,10 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
metrics.register_callback(
"pending_destinations",
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: len(self.pending_transactions),
)
@@ -94,12 +100,16 @@ class TransactionQueue(object):
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
"pending_pdus",
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
@@ -228,7 +238,7 @@ class TransactionQueue(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
for evs in itervalues(events_by_room)
],
consumeErrors=True
))
@@ -241,18 +251,15 @@ class TransactionQueue(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc_by(len(events))
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
@@ -275,7 +282,8 @@ class TransactionQueue(object):
if not destinations:
return
sent_pdus_destination_dist.inc_by(len(destinations))
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -322,7 +330,7 @@ class TransactionQueue(object):
if not states_map:
break
yield self._process_presence_inner(states_map.values())
yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
@@ -446,9 +454,6 @@ class TransactionQueue(object):
# hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store)
# XXX: what's this for?
yield run_on_reactor()
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
+2 -2
View File
@@ -114,14 +114,14 @@ class BaseHandler(object):
if guest_access != "can_join":
if context:
current_state = yield self.store.get_events(
context.current_state_ids.values()
list(context.current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
event.room_id
)
current_state = current_state.values()
current_state = list(current_state.values())
logger.info("maybe_kick_guest_users %r", current_state)
yield self.kick_guest_users(current_state)
+24 -14
View File
@@ -15,20 +15,23 @@
from twisted.internet import defer
from six import itervalues
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import (
make_deferred_yieldable, run_in_background,
)
from synapse.types import get_domain_from_id
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
events_processed_counter = metrics.register_counter("events_processed")
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
def log_failure(failure):
@@ -51,6 +54,7 @@ class ApplicationServicesHandler(object):
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
self.clock = hs.get_clock()
self.server_blacklist = hs.config.app_service_server_blacklist
self.notify_appservices = hs.config.notify_appservices
self.current_max = 0
@@ -92,6 +96,15 @@ class ApplicationServicesHandler(object):
@defer.inlineCallbacks
def handle_event(event):
ev_domain = get_domain_from_id(event.sender)
if any(srv_re.match(ev_domain) is not None
for srv_re in self.server_blacklist):
logger.info(
"Ignoring %s from %s, matches server blacklist",
event.sender,
ev_domain
)
return
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
@@ -120,7 +133,7 @@ class ApplicationServicesHandler(object):
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
for evs in itervalues(events_by_room)
], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
@@ -128,18 +141,15 @@ class ApplicationServicesHandler(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
synapse.metrics.event_processing_positions.labels(
"appservice_sender").set(upper_bound)
events_processed_counter.inc_by(len(events))
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"appservice_sender").set(ts)
finally:
self.is_processing = False
+34 -19
View File
@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, threads
from ._base import BaseHandler
@@ -23,7 +24,6 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
@@ -33,6 +33,7 @@ import logging
import bcrypt
import pymacaroons
import simplejson
import attr
import synapse.util.stringutils as stringutils
@@ -249,7 +250,7 @@ class AuthHandler(BaseHandler):
errordict = e.error_dict()
for f in flows:
if len(set(f) - set(creds.keys())) == 0:
if len(set(f) - set(creds)) == 0:
# it's very useful to know what args are stored, but this can
# include the password in the case of registering, so only log
# the keys (confusingly, clientdict may contain a password
@@ -257,12 +258,12 @@ class AuthHandler(BaseHandler):
# and is not sensitive).
logger.info(
"Auth completed with creds: %r. Client dict has keys: %r",
creds, clientdict.keys()
creds, list(clientdict)
)
defer.returnValue((creds, clientdict, session['id']))
ret = self._auth_dict_for_flows(flows, session)
ret['completed'] = creds.keys()
ret['completed'] = list(creds)
ret.update(errordict)
raise InteractiveAuthIncompleteError(
ret,
@@ -423,15 +424,11 @@ class AuthHandler(BaseHandler):
def _check_msisdn(self, authdict, _):
return self._check_threepid('msisdn', authdict)
@defer.inlineCallbacks
def _check_dummy_auth(self, authdict, _):
yield run_on_reactor()
defer.returnValue(True)
return defer.succeed(True)
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
yield run_on_reactor()
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@@ -825,6 +822,15 @@ class AuthHandler(BaseHandler):
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
yield identity_handler.unbind_threepid(
user_id,
{
'medium': medium,
'address': address,
},
)
ret = yield self.store.user_delete_threepid(
user_id, medium, address,
)
@@ -849,7 +855,11 @@ class AuthHandler(BaseHandler):
return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
bcrypt.gensalt(self.bcrypt_rounds))
return make_deferred_yieldable(threads.deferToThread(_do_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
),
)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -869,16 +879,21 @@ class AuthHandler(BaseHandler):
)
if stored_hash:
return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
self.hs.get_reactor().getThreadPool(),
_do_validate_hash,
),
)
else:
return defer.succeed(False)
class MacaroonGeneartor(object):
def __init__(self, hs):
self.clock = hs.get_clock()
self.server_name = hs.config.server_name
self.macaroon_secret_key = hs.config.macaroon_secret_key
@attr.s
class MacaroonGenerator(object):
hs = attr.ib()
def generate_access_token(self, user_id, extra_caveats=None):
extra_caveats = extra_caveats or []
@@ -896,7 +911,7 @@ class MacaroonGeneartor(object):
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.clock.time_msec()
now = self.hs.get_clock().time_msec()
expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
@@ -908,9 +923,9 @@ class MacaroonGeneartor(object):
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
location=self.server_name,
location=self.hs.config.server_name,
identifier="key",
key=self.macaroon_secret_key)
key=self.hs.config.macaroon_secret_key)
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon
+33 -5
View File
@@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from ._base import BaseHandler
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
from synapse.api.errors import SynapseError
import logging
@@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_handlers().identity_handler
self.user_directory_handler = hs.get_user_directory_handler()
# Flag that indicates whether the process to part users from rooms is running
@@ -37,10 +39,10 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
reactor.callWhenRunning(self._start_user_parting)
hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id):
def deactivate_account(self, user_id, erase_data):
"""Deactivate a user's account
Args:
@@ -52,14 +54,35 @@ class DeactivateAccountHandler(BaseHandler):
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
# first delete any devices belonging to the user, which will also
# delete threepids first. We remove these from the IS so if this fails,
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
yield self._identity_handler.unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
raise SynapseError(400, "Failed to remove threepid from ID server")
yield self.store.user_delete_threepid(
user_id, threepid['medium'], threepid['address'],
)
# delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users pending deactivation (ie.
@@ -69,6 +92,11 @@ class DeactivateAccountHandler(BaseHandler):
# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
# Mark the user as erased, if they asked for that
if erase_data:
logger.info("Marking %s as erased", user_id)
yield self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
+2 -2
View File
@@ -114,7 +114,7 @@ class DeviceHandler(BaseHandler):
user_id, device_id=None
)
devices = device_map.values()
devices = list(device_map.values())
for device in devices:
_update_device_from_client_ips(device, ips)
@@ -187,7 +187,7 @@ class DeviceHandler(BaseHandler):
defer.Deferred:
"""
device_map = yield self.store.get_devices_by_user(user_id)
device_ids = device_map.keys()
device_ids = list(device_map)
if except_device_id is not None:
device_ids = [d for d in device_ids if d != except_device_id]
yield self.delete_devices(user_id, device_ids)
+52 -32
View File
@@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.async import Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
@@ -52,7 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
logger = logging.getLogger(__name__)
@@ -104,8 +103,10 @@ class FederationHandler(BaseHandler):
"""
# We reprocess pdus when we have seen them only as outliers
existing = yield self.get_persisted_pdu(
origin, pdu.event_id, do_auth=False
existing = yield self.store.get_event(
pdu.event_id,
allow_none=True,
allow_rejected=True,
)
# FIXME: Currently we fetch an event again when we already have it
@@ -480,8 +481,8 @@ class FederationHandler(BaseHandler):
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
for key_to_eid in event_to_state_ids.itervalues()
for key, e_id in key_to_eid.iteritems()
for key_to_eid in list(event_to_state_ids.values())
for key, e_id in key_to_eid.items()
if key[0] != EventTypes.Member or check_match(key[1])
])
@@ -494,7 +495,20 @@ class FederationHandler(BaseHandler):
for e_id, key_to_eid in event_to_state_ids.iteritems()
}
erased_senders = yield self.store.are_users_erased(
e.sender for e in events,
)
def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)
if not state:
return event
@@ -1149,13 +1163,13 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
state_ids = context.prev_state_ids.values()
state_ids = list(context.prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(context.prev_state_ids.values())
state = yield self.store.get_events(list(context.prev_state_ids.values()))
defer.returnValue({
"state": state.values(),
"state": list(state.values()),
"auth_chain": auth_chain,
})
@@ -1382,8 +1396,6 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
yield run_on_reactor()
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
@@ -1405,7 +1417,7 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
res = results.values()
res = list(results.values())
for event in res:
# We sign these again because there was a bug where we
# incorrectly signed things the first time round
@@ -1426,8 +1438,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
yield run_on_reactor()
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)
@@ -1446,7 +1456,7 @@ class FederationHandler(BaseHandler):
else:
results.pop((event.type, event.state_key), None)
defer.returnValue(results.values())
defer.returnValue(list(results.values()))
else:
defer.returnValue([])
@@ -1469,11 +1479,20 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
def get_persisted_pdu(self, origin, event_id):
"""Get an event from the database for the given server.
Args:
origin [str]: hostname of server which is requesting the event; we
will check that the server is allowed to see it.
event_id [str]: id of the event being requested
Returns:
Deferred: Results in a `Pdu`.
Deferred[EventBase|None]: None if we know nothing about the event;
otherwise the (possibly-redacted) event.
Raises:
AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event_id,
@@ -1494,20 +1513,17 @@ class FederationHandler(BaseHandler):
)
)
if do_auth:
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self._filter_events_for_server(
origin, event.room_id, [event]
)
event = events[0]
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self._filter_events_for_server(
origin, event.room_id, [event]
)
event = events[0]
defer.returnValue(event)
else:
defer.returnValue(None)
@@ -1795,6 +1811,10 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
)
defer.returnValue(missing_events)
@defer.inlineCallbacks
@@ -1915,7 +1935,7 @@ class FederationHandler(BaseHandler):
})
new_state = self.state_handler.resolve_events(
[local_view.values(), remote_view.values()],
[list(local_view.values()), list(remote_view.values())],
event
)
+48 -8
View File
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,7 +27,6 @@ from synapse.api.errors import (
MatrixCodeMessageException, CodeMessageException
)
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError, Codes
logger = logging.getLogger(__name__)
@@ -38,6 +38,7 @@ class IdentityHandler(BaseHandler):
super(IdentityHandler, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.federation_http_client = hs.get_http_client()
self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +61,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
yield run_on_reactor()
if 'id_server' in creds:
id_server = creds['id_server']
elif 'idServer' in creds:
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def bind_threepid(self, creds, mxid):
yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
data = None
@@ -139,9 +137,53 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
yield run_on_reactor()
def unbind_threepid(self, mxid, threepid):
"""
Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
Returns:
Deferred[bool]: True on success, otherwise False
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
logger.warn("Can't unbind threepid: no trusted ID servers set in config")
defer.returnValue(False)
# We don't track what ID server we added 3pids on (perhaps we ought to)
# but we assume that any of the servers in the trusted list are in the
# same ID server federation, so we can pick any one of them to send the
# deletion request to.
id_server = next(iter(self.trusted_id_servers))
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
content = {
"mxid": mxid,
"threepid": threepid,
}
headers = {}
# we abuse the federation http client to sign the request, but we have to send it
# using the normal http client since we don't want the SRV lookup and want normal
# 'browser-like' HTTPS.
self.federation_http_client.sign_request(
destination=None,
method='POST',
url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
headers_dict=headers,
content=content,
destination_is=id_server,
)
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
defer.returnValue(True)
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@@ -176,8 +218,6 @@ class IdentityHandler(BaseHandler):
self, id_server, country, phone_number,
client_secret, send_attempt, **kwargs
):
yield run_on_reactor()
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
+7 -5
View File
@@ -20,7 +20,7 @@ import sys
from canonicaljson import encode_canonical_json
import six
from six import string_types, itervalues, iteritems
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
@@ -36,7 +36,7 @@ from synapse.events.validator import EventValidator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.async import ReadWriteLock, Limiter
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
@@ -157,7 +157,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
reactor.callLater(24 * 3600, clear_purge)
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
@@ -572,6 +572,9 @@ class EventCreationHandler(object):
u = yield self.store.get_user_by_id(user_id)
assert u is not None
if u["appservice_id"] is not None:
# users registered by an appservice are exempt
return
if u["consent_version"] == self.config.user_consent_version:
return
@@ -803,6 +806,7 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
@@ -956,9 +960,7 @@ class EventCreationHandler(object):
event_stream_id, max_stream_id
)
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
+41 -36
View File
@@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from contextlib import contextmanager
from six import itervalues, iteritems
@@ -38,26 +38,29 @@ from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
import synapse.metrics
from synapse.metrics import LaterGauge
import logging
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_presence_counter = metrics.register_counter("notified_presence")
federation_presence_out_counter = metrics.register_counter("federation_presence_out")
presence_updates_counter = metrics.register_counter("presence_updates")
timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out", "")
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
state_transition_counter = metrics.register_counter(
"state_transition", labels=["from", "to"]
notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason", "", ["reason"])
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["from", "to"]
)
@@ -142,8 +145,9 @@ class PresenceHandler(object):
for state in active_presence
}
metrics.register_callback(
"user_to_current_state_size", lambda: len(self.user_to_current_state)
LaterGauge(
"synapse_handlers_presence_user_to_current_state_size", "", [],
lambda: len(self.user_to_current_state)
)
now = self.clock.time_msec()
@@ -175,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1
@@ -213,7 +217,8 @@ class PresenceHandler(object):
60 * 1000,
)
metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
@defer.inlineCallbacks
def _on_shutdown(self):
@@ -316,11 +321,11 @@ class PresenceHandler(object):
# TODO: We should probably ensure there are no races hereafter
presence_updates_counter.inc_by(len(new_states))
presence_updates_counter.inc(len(new_states))
if to_notify:
notified_presence_counter.inc_by(len(to_notify))
yield self._persist_and_notify(to_notify.values())
notified_presence_counter.inc(len(to_notify))
yield self._persist_and_notify(list(to_notify.values()))
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
self.unpersisted_users_changes -= set(to_notify.keys())
@@ -330,7 +335,7 @@ class PresenceHandler(object):
if user_id not in to_notify
}
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
federation_presence_out_counter.inc(len(to_federation_ping))
self._push_to_remotes(to_federation_ping.values())
@@ -368,7 +373,7 @@ class PresenceHandler(object):
for user_id in users_to_check
]
timers_fired_counter.inc_by(len(states))
timers_fired_counter.inc(len(states))
changes = handle_timeouts(
states,
@@ -657,7 +662,7 @@ class PresenceHandler(object):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
federation_presence_counter.inc_by(len(updates))
federation_presence_counter.inc(len(updates))
yield self._update_states(updates)
@defer.inlineCallbacks
@@ -682,7 +687,7 @@ class PresenceHandler(object):
"""
updates = yield self.current_state_for_users(target_user_ids)
updates = updates.values()
updates = list(updates.values())
for user_id in set(target_user_ids) - set(u.user_id for u in updates):
updates.append(UserPresenceState.default(user_id))
@@ -748,11 +753,11 @@ class PresenceHandler(object):
self._push_to_remotes([state])
else:
user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.is_mine_id, user_ids)
user_ids = list(filter(self.is_mine_id, user_ids))
states = yield self.current_state_for_users(user_ids)
self._push_to_remotes(states.values())
self._push_to_remotes(list(states.values()))
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -932,28 +937,28 @@ def should_notify(old_state, new_state):
return False
if old_state.status_msg != new_state.status_msg:
notify_reason_counter.inc("status_msg_change")
notify_reason_counter.labels("status_msg_change").inc()
return True
if old_state.state != new_state.state:
notify_reason_counter.inc("state_change")
state_transition_counter.inc(old_state.state, new_state.state)
notify_reason_counter.labels("state_change").inc()
state_transition_counter.labels(old_state.state, new_state.state).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
notify_reason_counter.inc("current_active_change")
notify_reason_counter.labels("current_active_change").inc()
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
notify_reason_counter.inc("last_active_change_online")
notify_reason_counter.labels("last_active_change_online").inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
notify_reason_counter.inc("last_active_change_not_online")
notify_reason_counter.labels("last_active_change_not_online").inc()
return True
return False
@@ -1027,14 +1032,14 @@ class PresenceEventSource(object):
if changed is not None and len(changed) < 500:
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
get_updates_counter.inc("stream")
get_updates_counter.labels("stream").inc()
for other_user_id in changed:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.inc("full")
get_updates_counter.labels("full").inc()
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
@@ -1046,7 +1051,7 @@ class PresenceEventSource(object):
updates = yield presence.current_state_for_users(user_ids_changed)
if include_offline:
defer.returnValue((updates.values(), max_token))
defer.returnValue((list(updates.values()), max_token))
else:
defer.returnValue(([
s for s in itervalues(updates)
@@ -1107,7 +1112,7 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
if new_state:
changes[state.user_id] = new_state
return changes.values()
return list(changes.values())
def handle_timeout(state, is_mine, syncing_user_ids, now):
+1 -4
View File
@@ -24,7 +24,7 @@ from synapse.api.errors import (
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.async import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield run_on_reactor()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield run_on_reactor()
if localpart is None:
raise SynapseError(400, "Request must include user id")
+7 -3
View File
@@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
EventTypes, JoinRules, RoomCreationPreset
)
from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler):
)
if mapping:
raise SynapseError(400, "Room alias already taken")
raise SynapseError(
400,
"Room alias already taken",
Codes.ROOM_IN_USE
)
else:
room_alias = None
@@ -455,7 +459,7 @@ class RoomContextHandler(BaseHandler):
state = yield self.store.get_state_for_events(
[last_event_id], None
)
results["state"] = state[last_event_id].values()
results["state"] = list(state[last_event_id].values())
results["start"] = now_token.copy_and_replace(
"room_key", results["start"]
+2 -1
View File
@@ -15,6 +15,7 @@
from twisted.internet import defer
from six import iteritems
from six.moves import range
from ._base import BaseHandler
@@ -307,7 +308,7 @@ class RoomListHandler(BaseHandler):
)
event_map = yield self.store.get_events([
event_id for key, event_id in current_state_ids.iteritems()
event_id for key, event_id in iteritems(current_state_ids)
if key[0] in (
EventTypes.JoinRules,
EventTypes.Name,
+14 -10
View File
@@ -298,16 +298,6 @@ class RoomMemberHandler(object):
is_blocked = yield self.store.is_room_blocked(room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
else:
# we don't allow people to reject invites to, or leave, the
# server notice room.
is_blocked = yield self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
http_client.FORBIDDEN,
"You cannot leave this room",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
if effective_membership_state == Membership.INVITE:
# block any attempts to invite the server notices mxid
@@ -383,6 +373,20 @@ class RoomMemberHandler(object):
if same_sender and same_membership and same_content:
defer.returnValue(old_state)
# we don't allow people to reject invites to the server notice
# room, but they can leave it once they are joined.
if (
old_membership == Membership.INVITE and
effective_membership_state == Membership.LEAVE
):
is_blocked = yield self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
http_client.FORBIDDEN,
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
is_host_in_room = yield self._is_host_in_room(current_state_ids)
if effective_membership_state == Membership.JOIN:
+15 -1
View File
@@ -64,6 +64,13 @@ class SearchHandler(BaseHandler):
except Exception:
raise SynapseError(400, "Invalid batch")
logger.info(
"Search batch properties: %r, %r, %r",
batch_group, batch_group_key, batch_token,
)
logger.info("Search content: %s", content)
try:
room_cat = content["search_categories"]["room_events"]
@@ -271,6 +278,8 @@ class SearchHandler(BaseHandler):
# We should never get here due to the guard earlier.
raise NotImplementedError()
logger.info("Found %d events to return", len(allowed_events))
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
@@ -282,6 +291,11 @@ class SearchHandler(BaseHandler):
event.room_id, event.event_id, before_limit, after_limit
)
logger.info(
"Context for search returned %d and %d events",
len(res["events_before"]), len(res["events_after"]),
)
res["events_before"] = yield filter_events_for_client(
self.store, user.to_string(), res["events_before"]
)
@@ -348,7 +362,7 @@ class SearchHandler(BaseHandler):
rooms = set(e.room_id for e in allowed_events)
for room_id in rooms:
state = yield self.state_handler.get_current_state(room_id)
state_results[room_id] = state.values()
state_results[room_id] = list(state.values())
state_results.values()
+14 -4
View File
@@ -443,6 +443,10 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1,
)
@@ -537,11 +541,11 @@ class SyncHandler(object):
state = {}
if state_ids:
state = yield self.store.get_events(state_ids.values())
state = yield self.store.get_events(list(state_ids.values()))
defer.returnValue({
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(state.values())
for e in sync_config.filter_collection.filter_room_state(list(state.values()))
})
@defer.inlineCallbacks
@@ -890,7 +894,7 @@ class SyncHandler(object):
presence.extend(states)
# Deduplicate the presence entries so that there's at most one per user
presence = {p.user_id: p for p in presence}.values()
presence = list({p.user_id: p for p in presence}.values())
presence = sync_config.filter_collection.filter_presence(
presence
@@ -1042,7 +1046,13 @@ class SyncHandler(object):
Returns:
Deferred(tuple): Returns a tuple of the form:
`([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
`(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
where:
room_entries is a list [RoomSyncResultBuilder]
invited_rooms is a list [InvitedSyncResult]
newly_joined rooms is a list[str] of room ids
newly_left_rooms is a list[str] of room ids
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
+4 -5
View File
@@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id
from six import iteritems
@@ -174,7 +173,7 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
@@ -188,7 +187,7 @@ class UserDirectoryHandler(object):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
logger.info("Processed all users")
@@ -236,7 +235,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@@ -251,7 +250,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)
+13
View File
@@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from twisted.internet.defer import CancelledError
from twisted.python import failure
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
uri
)
+15 -21
View File
@@ -19,11 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
import synapse.metrics
from synapse.http.endpoint import SpiderEndpoint
from canonicaljson import encode_canonical_json
@@ -42,6 +41,7 @@ from twisted.web._newclient import ResponseDone
from six import StringIO
from prometheus_client import Counter
import simplejson as json
import logging
import urllib
@@ -49,16 +49,9 @@ import urllib
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
incoming_responses_counter = Counter("synapse_http_client_responses", "",
["method", "code"])
class SimpleHttpClient(object):
@@ -95,33 +88,34 @@ class SimpleHttpClient(object):
def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
outgoing_requests_counter.labels(method).inc()
logger.info("Sending request %s %s", method, uri)
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
request_deferred,
60, cancelled_to_request_timed_out_error,
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
incoming_responses_counter.inc(method, response.code)
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.inc(method, "ERR")
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, type(e).__name__, e.message
method, redact_uri(uri), type(e).__name__, e.message
)
raise e
raise
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
+10 -8
View File
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
@@ -74,21 +74,22 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
))
), reactor)
else:
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
))
), reactor)
class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac):
def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
self.reactor = reactor
@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn)
conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)
@@ -98,9 +99,10 @@ class _WrappedConnection(object):
"""
__slots__ = ["conn", "last_request"]
def __init__(self, conn):
def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
self._reactor = reactor
def __getattr__(self, name):
return getattr(self.conn, name)
@@ -131,14 +133,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request)
def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res
d.addCallback(update_request_time)
+32 -14
View File
@@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util.async import add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
@@ -45,19 +45,15 @@ from six.moves.urllib import parse as urlparse
from six import string_types
from prometheus_client import Counter
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
"", ["method", "code"])
MAX_LONG_RETRIES = 10
@@ -197,6 +193,7 @@ class MatrixFederationHttpClient(object):
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
@@ -238,7 +235,7 @@ class MatrixFederationHttpClient(object):
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
yield sleep(delay)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
@@ -264,14 +261,35 @@ class MatrixFederationHttpClient(object):
defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
content=None, destination_is=None):
"""
Signs a request by adding an Authorization header to headers_dict
Args:
destination (bytes|None): The desination home server of the request.
May be None if the destination is an identity server, in which case
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
headers_dict (dict): Dictionary of request headers to append to
content (bytes): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
Returns:
None
"""
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
"destination": destination,
}
if destination is not None:
request["destination"] = destination
if destination_is is not None:
request["destination_is"] = destination_is
if content is not None:
request["content"] = content
+97 -127
View File
@@ -16,158 +16,135 @@
import logging
import synapse.metrics
from prometheus_client.core import Counter, Histogram
from synapse.metrics import LaterGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for("synapse.http.server")
# total number of responses served, split by method/servlet/tag
response_count = metrics.register_counter(
"response_count",
labels=["method", "servlet", "tag"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_requests",
"_response_time:count",
"_response_ru_utime:count",
"_response_ru_stime:count",
"_response_db_txn_count:count",
"_response_db_txn_duration:count",
)
)
response_count = Counter(
"synapse_http_server_response_count", "", ["method", "servlet", "tag"]
)
requests_counter = metrics.register_counter(
"requests_received",
labels=["method", "servlet", ],
requests_counter = Counter(
"synapse_http_server_requests_received", "", ["method", "servlet"]
)
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
outgoing_responses_counter = Counter(
"synapse_http_server_responses", "", ["method", "code"]
)
response_timer = metrics.register_counter(
"response_time_seconds",
labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_time:total",
),
response_timer = Histogram(
"synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
)
response_ru_utime = metrics.register_counter(
"response_ru_utime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_utime:total",
),
response_ru_utime = Counter(
"synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"]
)
response_ru_stime = metrics.register_counter(
"response_ru_stime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_stime:total",
),
response_ru_stime = Counter(
"synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"]
)
response_db_txn_count = metrics.register_counter(
"response_db_txn_count", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_count:total",
),
response_db_txn_count = Counter(
"synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
response_db_txn_duration = metrics.register_counter(
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_duration:total",
),
response_db_txn_duration = Counter(
"synapse_http_server_response_db_txn_duration_seconds",
"",
["method", "servlet", "tag"],
)
# seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
response_db_sched_duration = Counter(
"synapse_http_server_response_db_sched_duration_seconds",
"",
["method", "servlet", "tag"],
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
response_size = Counter(
"synapse_http_server_response_size", "", ["method", "servlet", "tag"]
)
# In flight metrics are incremented while the requests are in flight, rather
# than when the response was written.
in_flight_requests_ru_utime = metrics.register_counter(
"in_flight_requests_ru_utime_seconds", labels=["method", "servlet"],
in_flight_requests_ru_utime = Counter(
"synapse_http_server_in_flight_requests_ru_utime_seconds",
"",
["method", "servlet"],
)
in_flight_requests_ru_stime = metrics.register_counter(
"in_flight_requests_ru_stime_seconds", labels=["method", "servlet"],
in_flight_requests_ru_stime = Counter(
"synapse_http_server_in_flight_requests_ru_stime_seconds",
"",
["method", "servlet"],
)
in_flight_requests_db_txn_count = metrics.register_counter(
"in_flight_requests_db_txn_count", labels=["method", "servlet"],
in_flight_requests_db_txn_count = Counter(
"synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"]
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
in_flight_requests_db_txn_duration = metrics.register_counter(
"in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"],
in_flight_requests_db_txn_duration = Counter(
"synapse_http_server_in_flight_requests_db_txn_duration_seconds",
"",
["method", "servlet"],
)
# seconds spent waiting for a db connection, when processing this request
in_flight_requests_db_sched_duration = metrics.register_counter(
"in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"]
in_flight_requests_db_sched_duration = Counter(
"synapse_http_server_in_flight_requests_db_sched_duration_seconds",
"",
["method", "servlet"],
)
# The set of all in flight requests, set[RequestMetrics]
_in_flight_requests = set()
def _collect_in_flight():
"""Called just before metrics are collected, so we use it to update all
the in flight request metrics
"""
for rm in _in_flight_requests:
rm.update_metrics()
metrics.register_collector(_collect_in_flight)
def _get_in_flight_counts():
"""Returns a count of all in flight requests by (method, server_name)
Returns:
dict[tuple[str, str], int]
"""
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
reqs = list(_in_flight_requests)
for rm in reqs:
rm.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type
counts = {}
for rm in _in_flight_requests:
for rm in reqs:
key = (rm.method, rm.name,)
counts[key] = counts.get(key, 0) + 1
return counts
metrics.register_callback(
"in_flight_requests_count",
LaterGauge(
"synapse_http_server_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,
labels=["method", "servlet"]
)
class RequestMetrics(object):
def start(self, time_msec, name, method):
self.start = time_msec
def start(self, time_sec, name, method):
self.start = time_sec
self.start_context = LoggingContext.current_context()
self.name = name
self.method = method
@@ -176,7 +153,7 @@ class RequestMetrics(object):
_in_flight_requests.add(self)
def stop(self, time_msec, request):
def stop(self, time_sec, request):
_in_flight_requests.discard(self)
context = LoggingContext.current_context()
@@ -192,34 +169,29 @@ class RequestMetrics(object):
)
return
outgoing_responses_counter.inc(request.method, str(request.code))
outgoing_responses_counter.labels(request.method, str(request.code)).inc()
response_count.inc(request.method, self.name, tag)
response_count.labels(request.method, self.name, tag).inc()
response_timer.inc_by(
time_msec - self.start, request.method,
self.name, tag
response_timer.labels(request.method, self.name, tag).observe(
time_sec - self.start
)
ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by(
ru_utime, request.method, self.name, tag
response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime)
response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime)
response_db_txn_count.labels(request.method, self.name, tag).inc(
context.db_txn_count
)
response_ru_stime.inc_by(
ru_stime, request.method, self.name, tag
response_db_txn_duration.labels(request.method, self.name, tag).inc(
context.db_txn_duration_sec
)
response_db_txn_count.inc_by(
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
response_db_sched_duration.labels(request.method, self.name, tag).inc(
context.db_sched_duration_sec
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
response_size.labels(request.method, self.name, tag).inc(request.sentLength)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
@@ -229,27 +201,21 @@ class RequestMetrics(object):
def update_metrics(self):
"""Updates the in flight metrics with values from this request.
"""
diff = self._request_stats.update(self.start_context)
in_flight_requests_ru_utime.inc_by(
diff.ru_utime, self.method, self.name,
in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count
)
in_flight_requests_ru_stime.inc_by(
diff.ru_stime, self.method, self.name,
in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(
diff.db_txn_duration_sec
)
in_flight_requests_db_txn_count.inc_by(
diff.db_txn_count, self.method, self.name,
)
in_flight_requests_db_txn_duration.inc_by(
diff.db_txn_duration_ms / 1000., self.method, self.name,
)
in_flight_requests_db_sched_duration.inc_by(
diff.db_sched_duration_ms / 1000., self.method, self.name,
in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(
diff.db_sched_duration_sec
)
@@ -258,17 +224,21 @@ class _RequestStats(object):
"""
__slots__ = [
"ru_utime", "ru_stime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"ru_utime",
"ru_stime",
"db_txn_count",
"db_txn_duration_sec",
"db_sched_duration_sec",
]
def __init__(self, ru_utime, ru_stime, db_txn_count,
db_txn_duration_ms, db_sched_duration_ms):
def __init__(
self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec
):
self.ru_utime = ru_utime
self.ru_stime = ru_stime
self.db_txn_count = db_txn_count
self.db_txn_duration_ms = db_txn_duration_ms
self.db_sched_duration_ms = db_sched_duration_ms
self.db_txn_duration_sec = db_txn_duration_sec
self.db_sched_duration_sec = db_sched_duration_sec
@staticmethod
def from_context(context):
@@ -277,8 +247,8 @@ class _RequestStats(object):
return _RequestStats(
ru_utime, ru_stime,
context.db_txn_count,
context.db_txn_duration_ms,
context.db_sched_duration_ms,
context.db_txn_duration_sec,
context.db_sched_duration_sec,
)
def update(self, context):
@@ -294,14 +264,14 @@ class _RequestStats(object):
new.ru_utime - self.ru_utime,
new.ru_stime - self.ru_stime,
new.db_txn_count - self.db_txn_count,
new.db_txn_duration_ms - self.db_txn_duration_ms,
new.db_sched_duration_ms - self.db_sched_duration_ms,
new.db_txn_duration_sec - self.db_txn_duration_sec,
new.db_sched_duration_sec - self.db_sched_duration_sec,
)
self.ru_utime = new.ru_utime
self.ru_stime = new.ru_stime
self.db_txn_count = new.db_txn_count
self.db_txn_duration_ms = new.db_txn_duration_ms
self.db_sched_duration_ms = new.db_sched_duration_ms
self.db_txn_duration_sec = new.db_txn_duration_sec
self.db_sched_duration_sec = new.db_sched_duration_sec
return diff
+2 -2
View File
@@ -210,8 +210,8 @@ def wrap_request_handler_with_logging(h):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.inc(request.method,
request.request_metrics.name)
requests_counter.labels(request.method,
request.request_metrics.name).inc()
yield d
return wrapped_request_handler
+16 -18
View File
@@ -14,18 +14,16 @@
import contextlib
import logging
import re
import time
from twisted.web.server import Site, Request
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
_next_request_seq = 0
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
self.uri
)
return redact_uri(self.uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -83,7 +78,7 @@ class SynapseRequest(Request):
return Request.render(self, resrc)
def _started_processing(self, servlet_name):
self.start_time = int(time.time() * 1000)
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
self.start_time, name=servlet_name, method=self.method,
@@ -102,26 +97,28 @@ class SynapseRequest(Request):
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
db_txn_count = context.db_txn_count
db_txn_duration_ms = context.db_txn_duration_ms
db_sched_duration_ms = context.db_sched_duration_ms
db_txn_duration_sec = context.db_txn_duration_sec
db_sched_duration_sec = context.db_sched_duration_sec
evt_db_fetch_count = context.evt_db_fetch_count
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_ms = (0, 0)
db_txn_count, db_txn_duration_sec = (0, 0)
evt_db_fetch_count = 0
end_time = int(time.time() * 1000)
end_time = time.time()
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
end_time - self.start_time,
int(ru_utime * 1000),
int(ru_stime * 1000),
db_sched_duration_ms,
db_txn_duration_ms,
ru_utime,
ru_stime,
db_sched_duration_sec,
db_txn_duration_sec,
int(db_txn_count),
self.sentLength,
self.code,
@@ -129,6 +126,7 @@ class SynapseRequest(Request):
self.get_redacted_uri(),
self.clientproto,
self.get_user_agent(),
evt_db_fetch_count,
)
try:
+159 -125
View File
@@ -17,165 +17,194 @@ import logging
import functools
import time
import gc
import os
import platform
import attr
from prometheus_client import Gauge, Histogram, Counter
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
running_on_pypy = platform.python_implementation() == "PyPy"
all_metrics = []
all_collectors = []
all_gauges = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class Metrics(object):
""" A single Metrics object gives a (mutable) slice view of the all_metrics
dict, allowing callers to easily register new metrics that are namespaced
nicely."""
class RegistryProxy(object):
def __init__(self, name):
self.name_prefix = name
def make_subspace(self, name):
return Metrics("%s_%s" % (self.name_prefix, name))
def register_collector(self, func):
all_collectors.append(func)
def _register(self, metric_class, name, *args, **kwargs):
full_name = "%s_%s" % (self.name_prefix, name)
metric = metric_class(full_name, *args, **kwargs)
all_metrics.append(metric)
return metric
def register_counter(self, *args, **kwargs):
"""
Returns:
CounterMetric
"""
return self._register(CounterMetric, *args, **kwargs)
def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
"""
Returns:
CallbackMetric
"""
return self._register(CallbackMetric, *args, **kwargs)
def register_distribution(self, *args, **kwargs):
"""
Returns:
DistributionMetric
"""
return self._register(DistributionMetric, *args, **kwargs)
def register_cache(self, *args, **kwargs):
"""
Returns:
CacheMetric
"""
return self._register(CacheMetric, *args, **kwargs)
@staticmethod
def collect():
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
def register_memory_metrics(hs):
try:
import psutil
process = psutil.Process()
process.memory_info().rss
except (ImportError, AttributeError):
logger.warn(
"psutil is not installed or incorrect version."
" Disabling memory metrics."
)
return
metric = MemoryUsageMetric(hs, psutil)
all_metrics.append(metric)
@attr.s(hash=True)
class LaterGauge(object):
name = attr.ib()
desc = attr.ib()
labels = attr.ib(hash=False)
caller = attr.ib()
def get_metrics_for(pkg_name):
""" Returns a Metrics instance for conveniently creating metrics
namespaced with the given name prefix. """
def collect(self):
# Convert a "package.name" to "package_name" because Prometheus doesn't
# let us use . in metric names
return Metrics(pkg_name.replace(".", "_"))
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
def render_all():
strs = []
for collector in all_collectors:
collector()
for metric in all_metrics:
try:
strs += metric.render()
calls = self.caller()
except Exception:
strs += ["# FAILED to render"]
logger.exception("Failed to render metric")
logger.exception(
"Exception running callback for LaterGauge(%s)",
self.name,
)
yield g
return
strs.append("") # to generate a final CRLF
if isinstance(calls, dict):
for k, v in calls.items():
g.add_metric(k, v)
else:
g.add_metric([], calls)
return "\n".join(strs)
yield g
def __attrs_post_init__(self):
self._register()
def _register(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
register_process_collector(get_metrics_for("process"))
#
# Detailed CPU metrics
#
class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
# Try and get the system config
ticks_per_sec = os.sysconf('SC_CLK_TCK')
except (ValueError, TypeError, AttributeError):
pass
self.ticks_per_sec = ticks_per_sec
def collect(self):
if not HAVE_PROC_SELF_STAT:
return
with open("/proc/self/stat") as s:
line = s.read()
raw_stats = line.split(") ", 1)[1].split(" ")
user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
yield user
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
yield sys
python_metrics = get_metrics_for("python")
REGISTRY.register(CPUMetrics())
gc_time = python_metrics.register_distribution("gc_time", labels=["gen"])
gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"])
python_metrics.register_callback(
"gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
#
# Python GC metrics
#
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
)
reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
class GCCounts(object):
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
yield cm
REGISTRY.register(GCCounts())
#
# Twisted reactor metrics
#
tick_time = Histogram(
"python_twisted_reactor_tick_time",
"Tick time of the Twisted reactor (sec)",
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
)
pending_calls_metric = Histogram(
"python_twisted_reactor_pending_calls",
"Pending calls",
buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
)
#
# Federation Metrics
#
sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
event_persisted_position = Gauge("synapse_event_persisted_position", "")
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
last_ticked = time.time()
class ReactorLastSeenMetric(object):
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
)
cm.add_metric([], time.time() - last_ticked)
yield cm
REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
@@ -197,17 +226,22 @@ def runUntilCurrentTimer(func):
num_pending += 1
num_pending += len(reactor.threadCallQueue)
start = time.time() * 1000
start = time.time()
ret = func(*args, **kwargs)
end = time.time() * 1000
end = time.time()
# record the amount of wallclock time spent running pending calls.
# This is a proxy for the actual amount of time between reactor polls,
# since about 25% of time is actually spent running things triggered by
# I/O events, but that is harder to capture without rewriting half the
# reactor.
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
# Update the time we last ticked, for the metric to test whether
# Synapse's reactor has frozen
global last_ticked
last_ticked = end
if running_on_pypy:
return ret
@@ -220,12 +254,12 @@ def runUntilCurrentTimer(func):
if threshold[i] < counts[i]:
logger.info("Collecting gc %d", i)
start = time.time() * 1000
start = time.time()
unreachable = gc.collect(i)
end = time.time() * 1000
end = time.time()
gc_time.inc_by(end - start, i)
gc_unreachable.inc_by(unreachable, i)
gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)
return ret
-328
View File
@@ -1,328 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
import logging
import re
logger = logging.getLogger(__name__)
def flatten(items):
"""Flatten a list of lists
Args:
items: iterable[iterable[X]]
Returns:
list[X]: flattened list
"""
return list(chain.from_iterable(items))
class BaseMetric(object):
"""Base class for metrics which report a single value per label set
"""
def __init__(self, name, labels=[], alternative_names=[]):
"""
Args:
name (str): principal name for this metric
labels (list(str)): names of the labels which will be reported
for this metric
alternative_names (iterable(str)): list of alternative names for
this metric. This can be useful to provide a migration path
when renaming metrics.
"""
self._names = [name] + list(alternative_names)
self.labels = labels # OK not to clone as we never write it
def dimension(self):
return len(self.labels)
def is_scalar(self):
return not len(self.labels)
def _render_labelvalue(self, value):
return '"%s"' % (_escape_label_value(value),)
def _render_key(self, values):
if self.is_scalar():
return ""
return "{%s}" % (
",".join(["%s=%s" % (k, self._render_labelvalue(v))
for k, v in zip(self.labels, values)])
)
def _render_for_labels(self, label_values, value):
"""Render this metric for a single set of labels
Args:
label_values (list[object]): values for each of the labels,
(which get stringified).
value: value of the metric at with these labels
Returns:
iterable[str]: rendered metric
"""
rendered_labels = self._render_key(label_values)
return (
"%s%s %.12g" % (name, rendered_labels, value)
for name in self._names
)
def render(self):
"""Render this metric
Each metric is rendered as:
name{label1="val1",label2="val2"} value
https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details
Returns:
iterable[str]: rendered metrics
"""
raise NotImplementedError()
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
value that counts events or running totals.
Example use cases for Counters:
- Number of requests processed
- Number of items that were inserted into a queue
- Total amount of data that a system has processed
Counters can only go up (and be reset when the process restarts).
"""
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0.
def inc_by(self, incr, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
if values not in self.counts:
self.counts[values] = incr
else:
self.counts[values] += incr
def inc(self, *values):
self.inc_by(1, *values)
def render(self):
return flatten(
self._render_for_labels(k, self.counts[k])
for k in sorted(self.counts.keys())
)
class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""
def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}
def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
self.guages[values] = v
def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
size or other state of some in-memory object by actively querying it."""
def __init__(self, name, callback, labels=[]):
super(CallbackMetric, self).__init__(name, labels=labels)
self.callback = callback
def render(self):
try:
value = self.callback()
except Exception:
logger.exception("Failed to render %s", self.name)
return ["# FAILED to render " + self.name]
if self.is_scalar():
return list(self._render_for_labels([], value))
return flatten(
self._render_for_labels(k, value[k])
for k in sorted(value.keys())
)
class DistributionMetric(object):
"""A combination of an event counter and an accumulator, which counts
both the number of events and accumulates the total value. Typically this
could be used to keep track of method-running times, or other distributions
of values that occur in discrete occurances.
TODO(paul): Try to export some heatmap-style stats?
"""
def __init__(self, name, *args, **kwargs):
self.counts = CounterMetric(name + ":count", **kwargs)
self.totals = CounterMetric(name + ":total", **kwargs)
def inc_by(self, inc, *values):
self.counts.inc(*values)
self.totals.inc_by(inc, *values)
def render(self):
return self.counts.render() + self.totals.render()
class CacheMetric(object):
__slots__ = (
"name", "cache_name", "hits", "misses", "evicted_size", "size_callback",
)
def __init__(self, name, size_callback, cache_name):
self.name = name
self.cache_name = cache_name
self.hits = 0
self.misses = 0
self.evicted_size = 0
self.size_callback = size_callback
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def render(self):
size = self.size_callback()
hits = self.hits
total = self.misses + self.hits
return [
"""%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
"""%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
"""%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
"""%s:evicted_size{name="%s"} %d""" % (
self.name, self.cache_name, self.evicted_size
),
]
class MemoryUsageMetric(object):
"""Keeps track of the current memory usage, using psutil.
The class will keep the current min/max/sum/counts of rss over the last
WINDOW_SIZE_SEC, by polling UPDATE_HZ times per second
"""
UPDATE_HZ = 2 # number of times to get memory per second
WINDOW_SIZE_SEC = 30 # the size of the window in seconds
def __init__(self, hs, psutil):
clock = hs.get_clock()
self.memory_snapshots = []
self.process = psutil.Process()
clock.looping_call(self._update_curr_values, 1000 / self.UPDATE_HZ)
def _update_curr_values(self):
max_size = self.UPDATE_HZ * self.WINDOW_SIZE_SEC
self.memory_snapshots.append(self.process.memory_info().rss)
self.memory_snapshots[:] = self.memory_snapshots[-max_size:]
def render(self):
if not self.memory_snapshots:
return []
max_rss = max(self.memory_snapshots)
min_rss = min(self.memory_snapshots)
sum_rss = sum(self.memory_snapshots)
len_rss = len(self.memory_snapshots)
return [
"process_psutil_rss:max %d" % max_rss,
"process_psutil_rss:min %d" % min_rss,
"process_psutil_rss:total %d" % sum_rss,
"process_psutil_rss:count %d" % len_rss,
]
def _escape_character(m):
"""Replaces a single character with its escape sequence.
Args:
m (re.MatchObject): A match object whose first group is the single
character to replace
Returns:
str
"""
c = m.group(1)
if c == "\\":
return "\\\\"
elif c == "\"":
return "\\\""
elif c == "\n":
return "\\n"
return c
def _escape_label_value(value):
"""Takes a label value and escapes quotes, newlines and backslashes
"""
return re.sub(r"([\n\"\\])", _escape_character, str(value))
-123
View File
@@ -1,123 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from six import iteritems
TICKS_PER_SEC = 100
BYTES_PER_PAGE = 4096
HAVE_PROC_STAT = os.path.exists("/proc/stat")
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits")
HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd")
# Field indexes from /proc/self/stat, taken from the proc(5) manpage
STAT_FIELDS = {
"utime": 14,
"stime": 15,
"starttime": 22,
"vsize": 23,
"rss": 24,
}
stats = {}
# In order to report process_start_time_seconds we need to know the
# machine's boot time, because the value in /proc/self/stat is relative to
# this
boot_time = None
if HAVE_PROC_STAT:
with open("/proc/stat") as _procstat:
for line in _procstat:
if line.startswith("btime "):
boot_time = int(line.split()[1])
def update_resource_metrics():
if HAVE_PROC_SELF_STAT:
global stats
with open("/proc/self/stat") as s:
line = s.read()
# line is PID (command) more stats go here ...
raw_stats = line.split(") ", 1)[1].split(" ")
for (name, index) in iteritems(STAT_FIELDS):
# subtract 3 from the index, because proc(5) is 1-based, and
# we've lost the first two fields in PID and COMMAND above
stats[name] = int(raw_stats[index - 3])
def _count_fds():
# Not every OS will have a /proc/self/fd directory
if not HAVE_PROC_SELF_FD:
return 0
return len(os.listdir("/proc/self/fd"))
def register_process_collector(process_metrics):
process_metrics.register_collector(update_resource_metrics)
if HAVE_PROC_SELF_STAT:
process_metrics.register_callback(
"cpu_user_seconds_total",
lambda: float(stats["utime"]) / TICKS_PER_SEC
)
process_metrics.register_callback(
"cpu_system_seconds_total",
lambda: float(stats["stime"]) / TICKS_PER_SEC
)
process_metrics.register_callback(
"cpu_seconds_total",
lambda: (float(stats["utime"] + stats["stime"])) / TICKS_PER_SEC
)
process_metrics.register_callback(
"virtual_memory_bytes",
lambda: int(stats["vsize"])
)
process_metrics.register_callback(
"resident_memory_bytes",
lambda: int(stats["rss"]) * BYTES_PER_PAGE
)
process_metrics.register_callback(
"start_time_seconds",
lambda: boot_time + int(stats["starttime"]) / TICKS_PER_SEC
)
if HAVE_PROC_SELF_FD:
process_metrics.register_callback(
"open_fds",
lambda: _count_fds()
)
if HAVE_PROC_SELF_LIMITS:
def _get_max_fds():
with open("/proc/self/limits") as limits:
for line in limits:
if not line.startswith("Max open files "):
continue
# Line is Max open files $SOFT $HARD
return int(line.split()[3])
return None
process_metrics.register_callback(
"max_fds",
lambda: _get_max_fds()
)
+2 -21
View File
@@ -13,27 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.web.resource import Resource
import synapse.metrics
from prometheus_client.twisted import MetricsResource
METRICS_PREFIX = "/_synapse/metrics"
class MetricsResource(Resource):
isLeaf = True
def __init__(self, hs):
Resource.__init__(self) # Resource is old-style, so no super()
self.hs = hs
def render_GET(self, request):
response = synapse.metrics.render_all()
request.setHeader("Content-Type", "text/plain")
request.setHeader("Content-Length", str(len(response)))
# Encode as UTF-8 (default)
return response.encode()
__all__ = ["MetricsResource", "METRICS_PREFIX"]
+14 -13
View File
@@ -28,22 +28,20 @@ from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
import synapse.metrics
from synapse.metrics import LaterGauge
from collections import namedtuple
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = Counter("synapse_notifier_notified_events", "")
notified_events_counter = metrics.register_counter("notified_events")
users_woken_by_stream_counter = metrics.register_counter(
"users_woken_by_stream", labels=["stream"]
)
users_woken_by_stream_counter = Counter(
"synapse_notifier_users_woken_by_stream", "", ["stream"])
# TODO(paul): Should be shared somewhere
@@ -108,7 +106,7 @@ class _NotifierUserStream(object):
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
users_woken_by_stream_counter.inc(stream_key)
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
@@ -163,6 +161,7 @@ class Notifier(object):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
@@ -197,14 +196,14 @@ class Notifier(object):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
metrics.register_callback(
"rooms",
LaterGauge(
"synapse_notifier_rooms", "", [],
lambda: count(bool, self.room_to_user_streams.values()),
)
metrics.register_callback(
"users",
LaterGauge(
"synapse_notifier_users", "", [],
lambda: len(self.user_to_user_stream),
)
@@ -342,6 +341,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
yield listener.deferred
@@ -563,6 +563,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
+1 -1
View File
@@ -39,7 +39,7 @@ def list_with_base_rules(rawrules):
rawrules = [r for r in rawrules if r['priority_class'] >= 0]
# shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
ruleslist.extend(make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
+13 -17
View File
@@ -22,14 +22,13 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.event_auth import get_user_power_level
from synapse.api.constants import EventTypes, Membership
from synapse.metrics import get_metrics_for
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches import register_cache
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
from synapse.state import POWER_KEY
from collections import namedtuple
from prometheus_client import Counter
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@@ -37,21 +36,18 @@ logger = logging.getLogger(__name__)
rules_by_room = {}
push_metrics = get_metrics_for(__name__)
push_rules_invalidation_counter = push_metrics.register_counter(
"push_rules_invalidation_counter"
)
push_rules_state_size_counter = push_metrics.register_counter(
"push_rules_state_size_counter"
)
push_rules_invalidation_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
push_rules_state_size_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
# Measures whether we use the fast path of using state deltas, or if we have to
# recalculate from scratch
push_rules_delta_state_cache_metric = cache_metrics.register_cache(
push_rules_delta_state_cache_metric = register_cache(
"cache",
size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values
cache_name="push_rules_delta_state_cache_metric",
"push_rules_delta_state_cache_metric",
cache=[], # Meaningless size, as this isn't a cache that stores values
)
@@ -65,10 +61,10 @@ class BulkPushRuleEvaluator(object):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.room_push_rule_cache_metrics = cache_metrics.register_cache(
self.room_push_rule_cache_metrics = register_cache(
"cache",
size_callback=lambda: 0, # There's not good value for this
cache_name="room_push_rule_cache",
"room_push_rule_cache",
cache=[], # Meaningless size, as this isn't a cache that stores values
)
@defer.inlineCallbacks
@@ -310,7 +306,7 @@ class RulesForRoom(object):
current_state_ids = context.current_state_ids
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc_by(len(current_state_ids))
push_rules_state_size_counter.inc(len(current_state_ids))
logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids
+2 -2
View File
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging
@@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
self.timed_call = reactor.callLater(
self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)
+8 -11
View File
@@ -15,27 +15,22 @@
# limitations under the License.
import logging
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
from . import push_tools
import synapse
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
http_push_processed_counter = metrics.register_counter(
"http_pushes_processed",
)
http_push_failed_counter = metrics.register_counter(
"http_pushes_failed",
)
http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
class HttpPusher(object):
@@ -225,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
self.timed_call = self.hs.get_reactor().callLater(
self.backoff_delay, self.on_timer
)
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break
+2 -1
View File
@@ -229,7 +229,8 @@ class Mailer(object):
if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
prev_messages = room_vars['notifs'][-1]['messages']
for message in notifvars['messages']:
pm = filter(lambda pm: pm['id'] == message['id'], prev_messages)
pm = list(filter(lambda pm: pm['id'] == message['id'],
prev_messages))
if pm:
if not message["is_historical"]:
pm[0]["is_historical"] = False
+1 -1
View File
@@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
# so find out who is in the room that isn't the user.
if "m.room.member" in room_state_bytype_ids:
member_events = yield store.get_events(
room_state_bytype_ids["m.room.member"].values()
list(room_state_bytype_ids["m.room.member"].values())
)
all_members = [
ev for ev in member_events.values()
+1 -1
View File
@@ -152,7 +152,7 @@ class PushRuleEvaluatorForEvent(object):
# Caches (glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
register_cache("regex_push_cache", regex_cache)
register_cache("cache", "regex_push_cache", regex_cache)
def _glob_matches(glob, value, word_boundary=False):
-3
View File
@@ -19,7 +19,6 @@ import logging
from twisted.internet import defer
from synapse.push.pusher import PusherFactory
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -125,7 +124,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
yield run_on_reactor()
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@@ -151,7 +149,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
yield run_on_reactor()
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
+4 -1
View File
@@ -50,13 +50,16 @@ REQUIREMENTS = {
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
"blist": ["blist"],
"sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
"attr": ["attr"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
+3 -3
View File
@@ -21,7 +21,6 @@ from synapse.api.errors import (
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@@ -33,11 +32,12 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context,
def send_event_to_master(clock, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield sleep(1)
yield clock.sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
@@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.user_erasure_store import UserErasureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):
+3 -3
View File
@@ -15,7 +15,7 @@
"""A replication client for use by synapse workers.
"""
from twisted.internet import reactor, defer
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
@@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class
reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
reactor.connectTCP(host, port, factory)
hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
+48 -49
View File
@@ -60,22 +60,21 @@ from .commands import (
)
from .streams import STREAMS_MAP
from synapse.metrics import LaterGauge
from synapse.util.stringutils import random_string
from synapse.metrics.metric import CounterMetric
import logging
import synapse.metrics
import struct
import fcntl
from prometheus_client import Counter
from collections import defaultdict
from six import iterkeys, iteritems
metrics = synapse.metrics.get_metrics_for(__name__)
connection_close_counter = metrics.register_counter(
"close_reason", labels=["reason_type"],
)
import logging
import struct
import fcntl
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
# A list of all connected protocols. This allows us to send metrics about the
# connections.
@@ -137,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
self.inbound_commands_counter = CounterMetric(
"inbound_commands", labels=["command"],
)
self.outbound_commands_counter = CounterMetric(
"outbound_commands", labels=["command"],
)
self.inbound_commands_counter = defaultdict(int)
self.outbound_commands_counter = defaultdict(int)
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -202,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
self.inbound_commands_counter.inc(cmd_name)
self.inbound_commands_counter[cmd_name] = (
self.inbound_commands_counter[cmd_name] + 1)
cmd_cls = COMMAND_MAP[cmd_name]
try:
@@ -252,8 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
self.outbound_commands_counter.inc(cmd.NAME)
self.outbound_commands_counter[cmd.NAME] = (
self.outbound_commands_counter[cmd.NAME] + 1)
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
@@ -318,9 +314,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
if isinstance(reason, Failure):
connection_close_counter.inc(reason.type.__name__)
connection_close_counter.labels(reason.type.__name__).inc()
else:
connection_close_counter.inc(reason.__class__.__name__)
connection_close_counter.labels(reason.__class__.__name__).inc()
try:
# Remove us from list of connections to be monitored
@@ -519,7 +515,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_RDATA(self, cmd):
stream_name = cmd.stream_name
inbound_rdata_count.inc(stream_name)
inbound_rdata_count.labels(stream_name).inc()
try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
@@ -567,13 +563,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
metrics.register_callback(
"pending_commands",
pending_commands = LaterGauge(
"synapse_replication_tcp_protocol_pending_commands",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
(p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
},
labels=["name", "conn_id"],
)
@@ -584,13 +580,13 @@ def transport_buffer_size(protocol):
return 0
metrics.register_callback(
"transport_send_buffer",
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_send_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
(p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
},
labels=["name", "conn_id"],
)
@@ -609,48 +605,51 @@ def transport_kernel_read_buffer_size(protocol, read=True):
return 0
metrics.register_callback(
"transport_kernel_send_buffer",
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
labels=["name", "conn_id"],
)
metrics.register_callback(
"transport_kernel_read_buffer",
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
labels=["name", "conn_id"],
)
metrics.register_callback(
"inbound_commands",
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_inbound_commands",
"",
["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter.counts)
for k, count in iteritems(p.inbound_commands_counter)
},
labels=["command", "name", "conn_id"],
)
metrics.register_callback(
"outbound_commands",
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_outbound_commands",
"",
["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter.counts)
for k, count in iteritems(p.outbound_commands_counter)
},
labels=["command", "name", "conn_id"],
)
# number of updates received for each RDATA stream
inbound_rdata_count = metrics.register_counter(
"inbound_rdata_count",
labels=["stream_name"],
inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)
+19 -19
View File
@@ -15,28 +15,28 @@
"""The server side of the replication stream.
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
from .protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func
from synapse.metrics import LaterGauge
import logging
import synapse.metrics
from prometheus_client import Counter
from six import itervalues
metrics = synapse.metrics.get_metrics_for(__name__)
stream_updates_counter = metrics.register_counter(
"stream_updates", labels=["stream_name"]
)
user_sync_counter = metrics.register_counter("user_sync")
federation_ack_counter = metrics.register_counter("federation_ack")
remove_pusher_counter = metrics.register_counter("remove_pusher")
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
user_ip_cache_counter = metrics.register_counter("user_ip_cache")
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache",
"")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
logger = logging.getLogger(__name__)
@@ -75,7 +75,8 @@ class ReplicationStreamer(object):
# Current connections.
self.connections = []
metrics.register_callback("total_connections", lambda: len(self.connections))
LaterGauge("synapse_replication_tcp_resource_total_connections", "", [],
lambda: len(self.connections))
# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
@@ -87,17 +88,16 @@ class ReplicationStreamer(object):
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
metrics.register_callback(
"connections_per_stream",
LaterGauge(
"synapse_replication_tcp_resource_connections_per_stream", "",
["stream_name"],
lambda: {
(stream_name,): len([
conn for conn in self.connections
if stream_name in conn.replication_streams
])
for stream_name in self.streams_by_name
},
labels=["stream_name"],
)
})
self.federation_sender = None
if not hs.config.send_federation:
@@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self):
# close all connections on shutdown
@@ -177,7 +177,7 @@ class ReplicationStreamer(object):
logger.info(
"Streaming: %s -> %s", stream.NAME, updates[-1][0]
)
stream_updates_counter.inc_by(len(updates), stream.NAME)
stream_updates_counter.labels(stream.NAME).inc(len(updates))
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
+1 -1
View File
@@ -104,7 +104,7 @@ class HttpTransactionCache(object):
def _cleanup(self):
now = self.clock.time_msec()
for key in self.transactions.keys():
for key in list(self.transactions):
ts = self.transactions[key][1]
if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period
del self.transactions[key]

Some files were not shown because too many files have changed in this diff Show More