1
0

Compare commits

...

120 Commits

Author SHA1 Message Date
Michael Kaye 11b3fcd91c Merge branch 'michaelkaye/rearrange_docker' of github.com:matrix-org/synapse into michaelkaye/rearrange_docker 2018-06-08 13:39:09 +01:00
Michael Kaye 6685a9e971 Address SPAG issues 2018-06-08 13:38:10 +01:00
Michael Kaye ada79de46f Do not include docker files in python build 2018-06-08 13:38:10 +01:00
Michael Kaye 9beb86768c Refactor docker locations and README.
This addresses #3224
2018-06-08 13:38:10 +01:00
David Baker 0e505b1913 Merge pull request #3372 from matrix-org/rav/better_verification_logging
Try to log more helpful info when a sig verification fails
2018-06-08 13:32:02 +01:00
Michael Kaye c68be90b6f Merge branch 'michaelkaye/rearrange_docker' of github.com:matrix-org/synapse into michaelkaye/rearrange_docker 2018-06-08 13:31:05 +01:00
Michael Kaye 57554be12d Refactor docker locations and README.
This addresses #3224
2018-06-08 13:30:19 +01:00
David Baker ad9edd1d96 Merge pull request #3371 from matrix-org/rav/fix_get_missing_events
Fix event filtering in get_missing_events handler
2018-06-08 13:19:15 +01:00
Richard van der Hoff e82db24a0e Try to log more helpful info when a sig verification fails
Firstly, don't swallow the reason for the failure

Secondly, don't assume all exceptions are verification failures

Thirdly, log a bit of info about the key being used if debug is enabled
2018-06-08 12:13:08 +01:00
Richard van der Hoff 0834b49c6a Fix event filtering in get_missing_events handler 2018-06-08 11:34:46 +01:00
Matthew Hodgson 36446ffedb fix various changelog bugs and typos 2018-06-07 23:54:16 +03:00
Will Hunt 13d211edc1 Merge pull request #3344 from Half-Shot/hs/as-metrics
Add metrics to track appservice transactions
2018-06-07 11:37:12 +01:00
Richard van der Hoff 1152f495a0 Merge pull request #3363 from matrix-org/rav/fix_purge
Fix event-purge-by-ts admin API
2018-06-07 11:34:46 +01:00
Richard van der Hoff e2acf536d4 Merge pull request #3355 from matrix-org/rav/fix_federation_backfill
Fix federation backfill from sqlite servers
2018-06-07 10:18:01 +01:00
Amber Brown 0160f6601f Merge pull request #3356 from matrix-org/rav/add_missing_attr_dep
Make sure that attr is installed
2018-06-07 16:33:30 +10:00
Richard van der Hoff f4caf3f83d fix log 2018-06-07 00:26:38 +01:00
Richard van der Hoff 0546715c18 Fix event-purge-by-ts admin API
This got completely broken in 0.30.

Fixes #3300.
2018-06-07 00:15:49 +01:00
Richard van der Hoff 57e3f923d2 Add missing dependency on attr
We've rcently added a dep on `attr`. I don't know why the CI didn't pick this
up, but we should make it explicit anyway.
2018-06-06 17:12:41 +01:00
Richard van der Hoff d3a8c9c55e Fix sql error in _get_state_groups_from_groups
If this was called with a `(type, None)` entry in types (which is supposed to
return all state of type `type`), it would explode with a sql error.
2018-06-06 14:19:01 +01:00
Neil Johnson fef6c2cdcc Merge branch 'master' into develop 2018-06-06 12:28:15 +01:00
Neil Johnson 752b7b32ed Merge tag 'v0.31.0'
Changes in synapse v0.31.0 (2018-06-06)
======================================

Most notable change from v0.30.0 is to switch to python prometheus library to improve system
stats reporting. WARNING this changes a number of prometheus metrics in a
backwards-incompatible manner. For more details, see
`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.

Bug Fixes:

* Fix metric documentation tables (PR #3341)
* Fix LaterGuage error handling (694968f)
* Fix replication metrics (b7e7fd2)

Changes in synapse v0.31.0-rc1 (2018-06-04)
==========================================

Features:

* Switch to the Python Prometheus library (PR #3256, #3274)
* Let users leave the server notice room after joining (PR #3287)

Changes:

* daily user type phone home stats (PR #3264)
* Use iter* methods for _filter_events_for_server (PR #3267)
* Docs on consent bits (PR #3268)
* Remove users from user directory on deactivate (PR #3277)
* Avoid sending consent notice to guest users (PR #3288)
* disable CPUMetrics if no /proc/self/stat (PR #3299)
* Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat!
* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
* Put python's logs into Trial when running unit tests (PR #3319)

Changes, python 3 migration:

* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
* use repr, not str (PR #3246) Thanks to @NotAFile!
* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
* more six iteritems (PR #3279) Thanks to @NotAFile!
* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
* py3-ize state.py (PR #3283) Thanks to @NotAFile!
* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
* use memoryview in py3 (PR #3303) Thanks to @NotAFile!

Bugs:

* Fix federation backfill bugs (PR #3261)
* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
2018-06-06 12:27:33 +01:00
Neil Johnson 3f589f9097 7 char sha in changelog 2018-06-06 11:39:42 +01:00
Neil Johnson 176f1206d1 Update CHANGES.rst 2018-06-06 11:28:30 +01:00
Neil Johnson 61134debdc bump version and changelog 2018-06-06 11:26:21 +01:00
Richard van der Hoff ad459a106c Merge pull request #3349 from t3chguy/redact_as_request_token
Redact AS tokens in log (fixes to #3327)
2018-06-06 10:58:07 +01:00
Michael Telatynski 592c162516 also redact __str__ of ApplicationService used for logging 2018-06-06 10:35:29 +01:00
Michael Telatynski 330432031b redact_uri in two missed log paths 2018-06-06 10:25:48 +01:00
Amber Brown 48e2b48888 Merge pull request #3347 from krombel/py3_extend_tox_2
update tox.ini to cover 292 succeeding tests
2018-06-06 16:37:19 +10:00
Amber Brown d8db6d9267 Merge pull request #3348 from intelfx/fix-sortedcontainers
federation/send_queue.py: fix usage of sortedcontainers.SortedDict
2018-06-06 16:36:53 +10:00
Amber Brown 23c785992f Fix metric documentation tables (#3341) 2018-06-06 07:12:16 +01:00
Richard van der Hoff b3b16490f7 Add note to changelog on prometheus metrics 2018-06-06 07:08:36 +01:00
Richard van der Hoff 592ee217a3 Merge commit 'b7e7fd2' into release-v0.31.0 2018-06-06 07:02:02 +01:00
Amber Brown 304bb22c1d Fix metric documentation tables (#3341) 2018-06-06 15:52:37 +10:00
Ivan Shapovalov c88d50aa8f federation/send_queue.py: fix usage of sortedcontainers.SortedDict 2018-06-06 02:45:18 +03:00
Krombel 0c87eed294 update tox.ini to cover 292 succeeding tests
Signed-Off-By: Matthias Kesler <krombel@krombel.de>
2018-06-05 23:10:15 +02:00
Richard van der Hoff e316407b5d Merge pull request #3327 from t3chguy/redact_as_request_token
Strip `access_token` from outgoing requests
2018-06-05 19:08:46 +01:00
Michael Telatynski e6cbf47773 factor out uri redaction into a method on http 2018-06-05 18:31:40 +01:00
Richard van der Hoff 617afee069 Merge pull request #3340 from ArchangeGabriel/patch-1
doc/postgres.rst: fix display of the last command block
2018-06-05 17:45:17 +01:00
Richard van der Hoff 522bd3c8a3 Merge remote-tracking branch 'origin/master' into develop 2018-06-05 17:42:49 +01:00
Will Hunt d6e3c2c79b Let's try labels instead of label, that might work 2018-06-05 17:30:45 +01:00
Amber Brown f7869f8f8b Port to sortedcontainers (with tests!) (#3332) 2018-06-06 00:13:57 +10:00
Will Hunt 604cff1a06 Add metrics to track appservice transactions 2018-06-05 13:21:30 +01:00
Bruno Pagani b50f18171d doc/postgres.rest: fix displaying of the last command block
Also indent all of them with 4 spaces.
2018-06-04 22:41:52 +00:00
Richard van der Hoff f29b41fde9 Merge pull request #3324 from matrix-org/rav/remove_dead_method
Remove was_forgotten_at
2018-06-04 18:18:08 +01:00
Richard van der Hoff 28b0490dfd Merge pull request #3334 from matrix-org/rav/cache_factor_override
Cache factor override system for specific caches
2018-06-04 17:19:33 +01:00
Richard van der Hoff b7e7fd2d0e Fix replication metrics
fix bug introduced in #3256
2018-06-04 16:23:05 +01:00
Neil Johnson 244ab974e7 bump version and changelog 2018-06-04 16:09:58 +01:00
Richard van der Hoff 694968fa81 Hopefully, fix LaterGuage error handling 2018-06-04 15:59:14 +01:00
Erik Johnston 042eedfa2b Add hacky cache factor override system 2018-06-04 15:39:28 +01:00
Amber Brown 5dbf305444 Put python's logs into Trial when running unit tests (#3319) 2018-06-04 16:06:06 +10:00
Amber Brown 86accac5d5 Merge pull request #3328 from intelfx/fix-metrics-LaterGauge-usage
federation: fix LaterGauge usage
2018-06-04 15:35:32 +10:00
Ivan Shapovalov 7d9d75e4e8 federation/send_queue.py: fix usage of LaterGauge
Fixes a startup crash due to commit df9f72d9e5
"replacing portions".
2018-06-03 14:16:17 +03:00
Michael Telatynski 09503126df Strip access_token from outgoing requests using existing regex 2018-06-02 23:25:13 +01:00
Richard van der Hoff c1f4118bb6 Remove was_forgotten_at
This is unused. IT MUST DIE!!!1
̧̪͈̱̹̳͖͙H̵̰̤̰͕̖e̛ ͚͉̗̼̞w̶̩̥͉̮h̩̺̪̩͘ͅọ͎͉̟ ̜̩͔̦̘ͅW̪̫̩̣̲͔̳a͏͔̳͖i͖͜t͓̤̠͓͙s̘̰̩̥̙̝ͅ ̲̠̬̥Be̡̙̫̦h̰̩i̛̫͙͔̭̤̗̲n̳͞d̸ ͎̻͘T̛͇̝̲̹̠̗ͅh̫̦̝ͅe̩̫͟ ͓͖̼W͕̳͎͚̙̥ą̙l̘͚̺͔͞ͅl̳͍̙̤̤̮̳.̢
̟̺̜̙͉Z̤̲̙̙͎̥̝A͎̣͔̙͘L̥̻̗̳̻̳̳͢G͉̖̯͓̞̩̦O̹̹̺!̙͈͎̞̬ *
2018-06-01 18:21:49 +01:00
Richard van der Hoff a9e97dcd65 Merge pull request #3317 from thegcat/feature/3312-add_ipv6_to_blacklist_example_config
Add private IPv6 addresses to example config for url preview blacklist
2018-06-01 14:45:14 +01:00
Neil Johnson 71477f3317 Merge pull request #3264 from matrix-org/neil/sign-up-stats
daily user type phone home stats
2018-06-01 13:42:01 +00:00
Richard van der Hoff 41006d9c28 Merge pull request #3318 from matrix-org/rav/ignore_depth_on_rrs
Ignore depth when updating read-receipts
2018-06-01 14:14:45 +01:00
Richard van der Hoff 9f797a24a4 Handle RRs which arrive before their events 2018-06-01 14:01:43 +01:00
Richard van der Hoff 857e6fd8b6 Ignore depth when updating read-receipts
Order read receipts by stream ordering instead of depth
2018-06-01 12:18:11 +01:00
Felix Schäfer 4ef76f3ac4 Add private IPv6 addresses to preview blacklist #3312
The added addresses are expected to be local or loopback addresses and
shouldn't be spidered for previews.

Signed-off-by: Felix Schäfer <felix@thegcat.net>
2018-06-01 12:18:35 +02:00
Neil Johnson 4986b084f8 remove unnecessary INSERT 2018-06-01 10:50:40 +01:00
Richard van der Hoff c2c3092cce code_style.rst: formatting 2018-05-31 16:11:34 +01:00
Amber Brown febe0ec8fd Run Prometheus on a different port, optionally. (#3274) 2018-05-31 19:04:50 +10:00
Amber Brown c936a52a9e Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (#3307) 2018-05-31 19:03:47 +10:00
Richard van der Hoff e73635191f Merge pull request #3290 from rubo77/patch-7
Add link to thorough instruction how to configure consent
2018-05-30 19:52:01 +01:00
Richard van der Hoff 219c2a322b remove trailing whitespace 2018-05-30 19:42:19 +01:00
Richard van der Hoff 2e4be8bfd9 fix english and wrap comment 2018-05-30 19:24:12 +01:00
Amber Brown 872cf43516 Merge pull request #3303 from NotAFile/py3-memoryview
use memoryview in py3
2018-05-30 12:51:42 +10:00
Amber Brown debff7ae09 Merge pull request #3281 from NotAFile/py3-six-isinstance
remaining isintance fixes
2018-05-30 12:44:46 +10:00
Richard van der Hoff 34b85df7f5 Update some comments and docstrings in SyncHandler 2018-05-29 22:31:18 +01:00
Richard van der Hoff 711f61a31d Merge pull request #3304 from matrix-org/rav/exempt_as_users_from_gdpr
Exempt AS-registered users from doing gdpr
2018-05-29 20:25:12 +01:00
Richard van der Hoff a995fdae39 fix tests 2018-05-29 20:19:29 +01:00
Richard van der Hoff 4a9cbdbc15 Exempt AS-registered users from doing gdpr 2018-05-29 19:54:32 +01:00
Neil Johnson ab0ef31dc7 create users index on creation_ts 2018-05-29 17:51:08 +01:00
Neil Johnson 558f3d376a create index in background 2018-05-29 17:47:55 +01:00
Neil Johnson c379acd4fd bump version 2018-05-29 17:47:28 +01:00
Richard van der Hoff db2e4608ab Merge pull request #3302 from krombel/py3_extend_tox_testing
extend tox testing for py3 to avoid regressions
2018-05-29 17:37:52 +01:00
Adrian Tschira 4b9d0cde97 add remaining memoryview changes 2018-05-29 17:42:43 +02:00
Adrian Tschira 7873cde526 pep8 2018-05-29 17:35:55 +02:00
Adrian Tschira 1afafb3497 use memoryview in py3
Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-05-29 17:14:34 +02:00
Krombel d6cd55532b extend tox testing for py3 to avoid regressions 2018-05-29 16:02:41 +02:00
Amber Brown 235b53263a Merge pull request #3299 from matrix-org/matthew/macos-fixes
disable CPUMetrics if no /proc/self/stat
2018-05-29 11:45:45 +10:00
Matthew Hodgson ff1bc0a279 pep8 2018-05-29 02:32:15 +01:00
Matthew Hodgson adb6bac4d5 fix another dumb typo 2018-05-29 02:29:22 +01:00
Matthew Hodgson 0a240ad36e disable CPUMetrics if no /proc/self/stat
fixes build on macOS again
2018-05-29 02:23:30 +01:00
Matthew Hodgson 284e36ad04 fix dumb typo 2018-05-29 02:23:11 +01:00
Amber Brown 81717e8515 Merge pull request #3256 from matrix-org/3218-official-prom
Switch to the Python Prometheus library
2018-05-28 23:30:09 +10:00
Amber Brown 57ad76fa4a fix up tests 2018-05-28 19:51:53 +10:00
Amber Brown 3ef5cd74a6 update to more consistently use seconds in any metrics or logging 2018-05-28 19:39:27 +10:00
Amber Brown 5c40ce3777 invalid syntax :( 2018-05-28 19:16:09 +10:00
Amber Brown 357c74a50f add comment about why unreg 2018-05-28 19:14:41 +10:00
Amber Brown a2eb5db4a0 update metrics to be in seconds 2018-05-28 19:10:27 +10:00
Amber Brown 754826a830 Merge remote-tracking branch 'origin/develop' into 3218-official-prom 2018-05-28 18:57:23 +10:00
Ruben Barkow 08ea5fe635 add link to thorough instruction how to configure consent 2018-05-25 23:19:55 +02:00
Richard van der Hoff 60f09b1e11 Merge pull request #3288 from matrix-org/rav/no_spam_guests
Avoid sending consent notice to guest users
2018-05-25 11:44:45 +01:00
Richard van der Hoff 66bdae986f Fix default for send_server_notice_to_guests
bool("False") == True...
2018-05-25 11:42:05 +01:00
Richard van der Hoff ba1b163590 Avoid sending consent notice to guest users
we think it makes sense not to send the notices to guest users.
2018-05-25 11:36:43 +01:00
Richard van der Hoff 41921ac01b Merge pull request #3287 from matrix-org/rav/allow_leaving_server_notices_room
Let users leave the server notice room after joining
2018-05-25 11:15:45 +01:00
Richard van der Hoff 757ed27258 Let users leave the server notice room after joining
They still can't reject invites, but we let them leave it.
2018-05-25 11:07:21 +01:00
Adrian Tschira 4ee4450d66 fix recursion error 2018-05-24 21:44:10 +02:00
Adrian Tschira dd068ca979 remaining isintance fixes
Signed-off-by: Adrian Tschira <nota@notafile.com>
2018-05-24 20:55:08 +02:00
Amber Brown 389dac2c15 pepeightttt 2018-05-23 13:08:59 -05:00
Amber Brown 472a5ec4e2 add back CPU metrics 2018-05-23 13:03:56 -05:00
Amber Brown e987079037 fixes 2018-05-23 13:03:51 -05:00
Amber Brown b6063631c3 more cleanup 2018-05-22 17:36:20 -05:00
Amber Brown 53cc2cde1f cleanup 2018-05-22 17:32:57 -05:00
Amber Brown 071206304d cleanup pep8 errors 2018-05-22 16:54:22 -05:00
Amber Brown 4abeaedcf3 tests/metrics is gone now 2018-05-22 16:45:11 -05:00
Amber Brown 85ba83eb51 fixes 2018-05-22 16:28:23 -05:00
Amber Brown 228f1f584e fix the test failures 2018-05-22 15:02:38 -05:00
Neil Johnson d8cb7225d2 daily user type phone home stats 2018-05-22 18:09:09 +01:00
Amber Brown 07bb9bdae8 update gitignore to remove some files that got put on my machine 2018-05-22 10:57:28 -05:00
Amber Brown 8f5a688d42 cleanups, self-registration 2018-05-22 10:56:03 -05:00
Amber Brown a8990fa2ec Merge remote-tracking branch 'origin/develop' into 3218-official-prom 2018-05-22 10:50:26 -05:00
Amber Brown fcc525b0b7 rest of the changes 2018-05-21 19:48:57 -05:00
Amber Brown df9f72d9e5 replacing portions 2018-05-21 19:47:37 -05:00
Amber Brown c60e0d5e02 don't need the resource portion 2018-05-21 17:03:20 -05:00
Amber Brown 02c1d29133 look at the Prometheus metrics instead 2018-05-21 17:02:20 -05:00
Amber Brown f258deffcb remove old metrics libs 2018-05-21 17:01:15 -05:00
Michael Kaye a1517f1369 Refactor docker locations and README.
This addresses #3224
2018-05-21 16:34:21 +01:00
113 changed files with 1720 additions and 1766 deletions
+4
View File
@@ -1,5 +1,6 @@
*.pyc
.*.swp
*~
.DS_Store
_trial_temp/
@@ -13,6 +14,7 @@ docs/build/
cmdclient_config.json
homeserver*.db
homeserver*.log
homeserver*.log.*
homeserver*.pid
homeserver*.yaml
@@ -40,6 +42,8 @@ media_store/
*.tac
build/
venv/
venv*/
localhost-800*/
static/client/register/register_config.js
+57
View File
@@ -1,3 +1,60 @@
Changes in synapse v0.31.0 (2018-06-06)
=======================================
Most notable change from v0.30.0 is to switch to the python prometheus library to improve system
stats reporting. WARNING: this changes a number of prometheus metrics in a
backwards-incompatible manner. For more details, see
`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.
Bug Fixes:
* Fix metric documentation tables (PR #3341)
* Fix LaterGauge error handling (694968f)
* Fix replication metrics (b7e7fd2)
Changes in synapse v0.31.0-rc1 (2018-06-04)
==========================================
Features:
* Switch to the Python Prometheus library (PR #3256, #3274)
* Let users leave the server notice room after joining (PR #3287)
Changes:
* daily user type phone home stats (PR #3264)
* Use iter* methods for _filter_events_for_server (PR #3267)
* Docs on consent bits (PR #3268)
* Remove users from user directory on deactivate (PR #3277)
* Avoid sending consent notice to guest users (PR #3288)
* disable CPUMetrics if no /proc/self/stat (PR #3299)
* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
* Put python's logs into Trial when running unit tests (PR #3319)
Changes, python 3 migration:
* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
* use repr, not str (PR #3246) Thanks to @NotAFile!
* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
* more six iteritems (PR #3279) Thanks to @NotAFile!
* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
* py3-ize state.py (PR #3283) Thanks to @NotAFile!
* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
* use memoryview in py3 (PR #3303) Thanks to @NotAFile!
Bugs:
* Fix federation backfill bugs (PR #3261)
* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
Changes in synapse v0.30.0 (2018-05-24)
==========================================
+1
View File
@@ -31,3 +31,4 @@ recursive-exclude jenkins *.sh
prune .github
prune demo/etc
prune docker
+1 -1
View File
@@ -157,7 +157,7 @@ if you prefer.
In case of problems, please see the _`Troubleshooting` section below.
There is an offical synapse image available at https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with the docker-compose file available at `contrib/docker`. Further information on this including configuration options is available in `contrib/docker/README.md`.
There is an offical synapse image available at https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with the docker-compose file available at `contrib/docker`. Further information on this including configuration options is available in the README on hub.docker.com.
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate a synapse server in a single Docker image, at https://hub.docker.com/r/avhost/docker-matrix/tags/
+2 -115
View File
@@ -1,29 +1,4 @@
# Synapse Docker
The `matrixdotorg/synapse` Docker image will run Synapse as a single process. It does not provide a
database server or a TURN server, you should run these separately.
If you run a Postgres server, you should simply include it in the same Compose
project or set the proper environment variables and the image will automatically
use that server.
## Build
Build the docker image with the `docker build` command from the root of the synapse repository.
```
docker build -t docker.io/matrixdotorg/synapse .
```
The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
You may have a local Python wheel cache available, in which case copy the relevant packages in the ``cache/`` directory at the root of the project.
## Run
This image is designed to run either with an automatically generated configuration
file or with a custom configuration that requires manual edition.
### Automated configuration
It is recommended that you use Docker Compose to run your containers, including
@@ -60,94 +35,6 @@ Then, customize your configuration and run the server:
docker-compose up -d
```
### Without Compose
### More information
If you do not wish to use Compose, you may still run this image using plain
Docker commands. Note that the following is just a guideline and you may need
to add parameters to the docker run command to account for the network situation
with your postgres database.
```
docker run \
-d \
--name synapse \
-v ${DATA_PATH}:/data \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
docker.io/matrixdotorg/synapse:latest
```
## Volumes
The image expects a single volume, located at ``/data``, that will hold:
* temporary files during uploads;
* uploaded media and thumbnails;
* the SQLite database if you do not configure postgres;
* the appservices configuration.
You are free to use separate volumes depending on storage endpoints at your
disposal. For instance, ``/data/media`` coud be stored on a large but low
performance hdd storage while other files could be stored on high performance
endpoints.
In order to setup an application service, simply create an ``appservices``
directory in the data volume and write the application service Yaml
configuration file there. Multiple application services are supported.
## Environment
Unless you specify a custom path for the configuration file, a very generic
file will be generated, based on the following environment settings.
These are a good starting point for setting up your own deployment.
Global settings:
* ``UID``, the user id Synapse will run as [default 991]
* ``GID``, the group id Synapse will run as [default 991]
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
then customize it manually. No other environment variable is required.
Otherwise, a dynamic configuration file will be used. The following environment
variables are available for configuration:
* ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
statistics reporting back to the Matrix project which helps us to get funding.
* ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
you run your own TLS-capable reverse proxy).
* ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
the Synapse instance.
* ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
* ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
* ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
* ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
key in order to enable recaptcha upon registration.
* ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
key in order to enable recaptcha upon registration.
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
Shared secrets, that will be initialized to random values if not set:
* ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
registration is disable.
* ``SYNAPSE_MACAROON_SECRET_KEY`` secret for signing access tokens
to the server.
Database specific values (will use SQLite if not set):
* `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
* `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
* `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
* `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
Mail server specific values (will not send emails if not set):
* ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
* ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
* ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.
For more information on required environment variables and mounts, see the main docker documentation at `docker/README.md`
+1 -1
View File
@@ -9,7 +9,7 @@ RUN cd /synapse \
&& pip install --upgrade pip setuptools psycopg2 lxml \
&& mkdir -p /synapse/cache \
&& pip install -f /synapse/cache --upgrade --process-dependency-links . \
&& mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \
&& mv /synapse/docker/start.py /synapse/docker/conf / \
&& rm -rf setup.py setup.cfg synapse
VOLUME ["/data"]
+124
View File
@@ -0,0 +1,124 @@
# Synapse Docker
This Docker image will run Synapse as a single process. It does not provide a database
server or a TURN server, you should run these separately.
## Run
We do not currently offer a `latest` image, as this has somewhat undefined semantics.
We instead release only tagged versions so upgrading between releases is entirely
within your control.
### Using docker-compose (easier)
This image is designed to run either with an automatically generated configuration
file or with a custom configuration that requires manual editing.
An easy way to make use of this image is via docker-compose. See the
(https://github.com/matrix-org/synapse/tree/develop/contrib/docker)[contrib]
section of the synapse project for examples.
### Without Compose (harder)
If you do not wish to use Compose, you may still run this image using plain
Docker commands. Note that the following is just a guideline and you may need
to add parameters to the docker run command to account for the network situation
with your postgres database.
```
docker run \
-d \
--name synapse \
-v ${DATA_PATH}:/data \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
docker.io/matrixdotorg/synapse:latest
```
## Volumes
The image expects a single volume, located at ``/data``, that will hold:
* temporary files during uploads;
* uploaded media and thumbnails;
* the SQLite database if you do not configure postgres;
* the appservices configuration.
You are free to use separate volumes depending on storage endpoints at your
disposal. For instance, ``/data/media`` coud be stored on a large but low
performance hdd storage while other files could be stored on high performance
endpoints.
In order to setup an application service, simply create an ``appservices``
directory in the data volume and write the application service Yaml
configuration file there. Multiple application services are supported.
## Environment
Unless you specify a custom path for the configuration file, a very generic
file will be generated, based on the following environment settings.
These are a good starting point for setting up your own deployment.
Global settings:
* ``UID``, the user id Synapse will run as [default 991]
* ``GID``, the group id Synapse will run as [default 991]
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
then customize it manually. No other environment variable is required.
Otherwise, a dynamic configuration file will be used. The following environment
variables are available for configuration:
* ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
statistics reporting back to the Matrix project which helps us to get funding.
* ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
you run your own TLS-capable reverse proxy).
* ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
the Synapse instance.
* ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
* ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
* ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
* ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
key in order to enable recaptcha upon registration.
* ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
key in order to enable recaptcha upon registration.
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
Shared secrets, that will be initialized to random values if not set:
* ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
registration is disable.
* ``SYNAPSE_MACAROON_SECRET_KEY`` secret for signing access tokens
to the server.
Database specific values (will use SQLite if not set):
* `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
* `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
* `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
* `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
Mail server specific values (will not send emails if not set):
* ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
* ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
* ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.
## Build
Build the docker image with the `docker build` command from the root of the synapse repository.
```
docker build -t docker.io/matrixdotorg/synapse . -f docker/Dockerfile
```
The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
You may have a local Python wheel cache available, in which case copy the relevant
packages in the ``cache/`` directory at the root of the project.
+1 -1
View File
@@ -16,7 +16,7 @@
print("I am a fish %s" %
"moo")
and this::
and this::
print(
"I am a fish %s" %
+76 -11
View File
@@ -1,25 +1,47 @@
How to monitor Synapse metrics using Prometheus
===============================================
1. Install prometheus:
1. Install Prometheus:
Follow instructions at http://prometheus.io/docs/introduction/install/
2. Enable synapse metrics:
2. Enable Synapse metrics:
Simply setting a (local) port number will enable it. Pick a port.
prometheus itself defaults to 9090, so starting just above that for
locally monitored services seems reasonable. E.g. 9092:
There are two methods of enabling metrics in Synapse.
Add to homeserver.yaml::
The first serves the metrics as a part of the usual web server and can be
enabled by adding the "metrics" resource to the existing listener as such::
metrics_port: 9092
resources:
- names:
- client
- metrics
Also ensure that ``enable_metrics`` is set to ``True``.
This provides a simple way of adding metrics to your Synapse installation,
and serves under ``/_synapse/metrics``. If you do not wish your metrics be
publicly exposed, you will need to either filter it out at your load
balancer, or use the second method.
Restart synapse.
The second method runs the metrics server on a different port, in a
different thread to Synapse. This can make it more resilient to heavy load
meaning metrics cannot be retrieved, and can be exposed to just internal
networks easier. The served metrics are available over HTTP only, and will
be available at ``/``.
3. Add a prometheus target for synapse.
Add a new listener to homeserver.yaml::
listeners:
- type: metrics
port: 9000
bind_addresses:
- '0.0.0.0'
For both options, you will need to ensure that ``enable_metrics`` is set to
``True``.
Restart Synapse.
3. Add a Prometheus target for Synapse.
It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``)::
@@ -31,7 +53,50 @@ How to monitor Synapse metrics using Prometheus
If your prometheus is older than 1.5.2, you will need to replace
``static_configs`` in the above with ``target_groups``.
Restart prometheus.
Restart Prometheus.
Removal of deprecated metrics & time based counters becoming histograms in 0.31.0
---------------------------------------------------------------------------------
The duplicated metrics deprecated in Synapse 0.27.0 have been removed.
All time duration-based metrics have been changed to be seconds. This affects:
+----------------------------------+
| msec -> sec metrics |
+==================================+
| python_gc_time |
+----------------------------------+
| python_twisted_reactor_tick_time |
+----------------------------------+
| synapse_storage_query_time |
+----------------------------------+
| synapse_storage_schedule_time |
+----------------------------------+
| synapse_storage_transaction_time |
+----------------------------------+
Several metrics have been changed to be histograms, which sort entries into
buckets and allow better analysis. The following metrics are now histograms:
+-------------------------------------------+
| Altered metrics |
+===========================================+
| python_gc_time |
+-------------------------------------------+
| python_twisted_reactor_pending_calls |
+-------------------------------------------+
| python_twisted_reactor_tick_time |
+-------------------------------------------+
| synapse_http_server_response_time_seconds |
+-------------------------------------------+
| synapse_storage_query_time |
+-------------------------------------------+
| synapse_storage_schedule_time |
+-------------------------------------------+
| synapse_storage_transaction_time |
+-------------------------------------------+
Block and response metrics renamed for 0.27.0
+9 -9
View File
@@ -9,19 +9,19 @@ Set up database
Assuming your PostgreSQL database user is called ``postgres``, create a user
``synapse_user`` with::
su - postgres
createuser --pwprompt synapse_user
su - postgres
createuser --pwprompt synapse_user
The PostgreSQL database used *must* have the correct encoding set, otherwise it
would not be able to store UTF8 strings. To create a database with the correct
encoding use, e.g.::
CREATE DATABASE synapse
ENCODING 'UTF8'
LC_COLLATE='C'
LC_CTYPE='C'
template=template0
OWNER synapse_user;
CREATE DATABASE synapse
ENCODING 'UTF8'
LC_COLLATE='C'
LC_CTYPE='C'
template=template0
OWNER synapse_user;
This would create an appropriate database named ``synapse`` owned by the
``synapse_user`` user (which must already exist).
@@ -126,7 +126,7 @@ run::
--postgres-config homeserver-postgres.yaml
Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file ``homeserver-postgres.yaml``:
database configuration file ``homeserver-postgres.yaml``::
./synctl stop
mv homeserver.yaml homeserver-old-sqlite.yaml
+5 -2
View File
@@ -5,7 +5,7 @@ Server Notices
channel whereby server administrators can send messages to users on the server.
They are used as part of communication of the server polices(see
[consent_tracking.md](consent_tracking.md)), however the intention is that
[consent_tracking.md](consent_tracking.md)), however the intention is that
they may also find a use for features such as "Message of the day".
This is a feature specific to Synapse, but it uses standard Matrix
@@ -24,7 +24,10 @@ history; it will appear to have come from the 'server notices user' (see
below).
The user is prevented from sending any messages in this room by the power
levels. They also cannot leave it.
levels.
Having joined the room, the user can leave the room if they want. Subsequent
server notices will then cause a new room to be created.
Synapse configuration
---------------------
+2 -1
View File
@@ -17,4 +17,5 @@ ignore =
[flake8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
ignore = W503
# E203 is contrary to PEP8.
ignore = W503,E203
+1 -1
View File
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.30.0"
__version__ = "0.31.0"
+4 -2
View File
@@ -15,6 +15,8 @@
import logging
from six import itervalues
import pymacaroons
from twisted.internet import defer
@@ -57,7 +59,7 @@ class Auth(object):
self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
register_cache("token_cache", self.token_cache)
register_cache("cache", "token_cache", self.token_cache)
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
@@ -66,7 +68,7 @@ class Auth(object):
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
(e.type, e.state_key): e for e in auth_events.values()
(e.type, e.state_key): e for e in itervalues(auth_events)
}
self.check(event, auth_events=auth_events, do_sig_check=do_sig_check)
+1 -1
View File
@@ -411,7 +411,7 @@ class Filter(object):
return room_ids
def filter(self, events):
return filter(self.check, events)
return list(filter(self.check, events))
def limit(self):
return self.filter_json.get("limit", 10)
+13
View File
@@ -124,6 +124,19 @@ def quit_with_error(error_string):
sys.exit(1)
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
"""
from synapse.metrics import RegistryProxy
from prometheus_client import start_http_server
for host in bind_addresses:
reactor.callInThread(start_http_server, int(port),
addr=host, registry=RegistryProxy)
logger.info("Metrics now reporting on %s:%d", host, port)
def listen_tcp(bind_addresses, port, factory, backlog=50):
"""
Create a TCP socket for a port and several addresses
+7
View File
@@ -94,6 +94,13 @@ class AppserviceServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -2
View File
@@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -77,7 +78,7 @@ class ClientReaderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
PublicRoomListRestServlet(self).register(resource)
@@ -118,7 +119,13 @@ class ClientReaderServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -90,7 +91,7 @@ class EventCreatorServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
RoomSendEventRestServlet(self).register(resource)
@@ -134,6 +135,13 @@ class EventCreatorServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -71,7 +72,7 @@ class FederationReaderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "federation":
resources.update({
FEDERATION_PREFIX: TransportLayerServer(self),
@@ -107,6 +108,13 @@ class FederationReaderServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -89,7 +90,7 @@ class FederationSenderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -121,6 +122,13 @@ class FederationSenderServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -29,6 +29,7 @@ from synapse.http.servlet import (
RestServlet, parse_json_object_from_request,
)
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -131,7 +132,7 @@ class FrontendProxyServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
@@ -172,6 +173,13 @@ class FrontendProxyServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+13 -4
View File
@@ -34,7 +34,7 @@ from synapse.module_api import ModuleApi
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import register_memory_metrics
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements
@@ -230,7 +230,7 @@ class SynapseHomeServer(HomeServer):
resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
@@ -263,6 +263,13 @@ class SynapseHomeServer(HomeServer):
reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening,
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -362,8 +369,6 @@ def setup(config_options):
hs.get_datastore().start_doing_background_updates()
hs.get_federation_client().start_get_pdu_cache()
register_memory_metrics(hs)
reactor.callWhenRunning(start)
return hs
@@ -434,6 +439,10 @@ def run(hs):
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.iteritems():
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
+9 -1
View File
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -73,7 +74,7 @@ class MediaRepositoryServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media":
media_repo = self.get_media_repository_resource()
resources.update({
@@ -114,6 +115,13 @@ class MediaRepositoryServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
@@ -92,7 +93,7 @@ class PusherServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -124,6 +125,13 @@ class PusherServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+9 -1
View File
@@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -257,7 +258,7 @@ class SynchrotronServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource)
@@ -301,6 +302,13 @@ class SynchrotronServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+4
View File
@@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
+9 -1
View File
@@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -105,7 +106,7 @@ class UserDirectoryServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
user_directory.register_servlets(self, resource)
@@ -146,6 +147,13 @@ class UserDirectoryServer(HomeServer):
globals={"hs": self},
)
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
+5 -1
View File
@@ -292,4 +292,8 @@ class ApplicationService(object):
return self.rate_limited
def __str__(self):
return "ApplicationService: %s" % (self.__dict__,)
# copy dictionary and redact token fields so they don't get logged
dict_copy = self.__dict__.copy()
dict_copy["token"] = "<redacted>"
dict_copy["hs_token"] = "<redacted>"
return "ApplicationService: %s" % (dict_copy,)
+22
View File
@@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
import logging
import urllib
from prometheus_client import Counter
logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"]
)
failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"]
)
sent_events_counter = Counter(
"synapse_appservice_api_sent_events",
"Number of events sent to the AS",
["service"]
)
HOUR_IN_MS = 60 * 60 * 1000
@@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
failed_transactions_counter.labels(service.id).inc()
defer.returnValue(False)
def _serialize(self, events):
+10 -1
View File
@@ -18,6 +18,9 @@ from ._base import Config
DEFAULT_CONFIG = """\
# User Consent configuration
#
# for detailed instructions, see
# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
#
# Parts of this section are required if enabling the 'consent' resource under
# 'listeners', in particular 'template_dir' and 'version'.
#
@@ -32,7 +35,8 @@ DEFAULT_CONFIG = """\
#
# 'server_notice_content', if enabled, will send a user a "Server Notice"
# asking them to consent to the privacy policy. The 'server_notices' section
# must also be configured for this to work.
# must also be configured for this to work. Notices will *not* be sent to
# guest users unless 'send_server_notice_to_guests' is set to true.
#
# 'block_events_error', if set, will block any attempts to send events
# until the user consents to the privacy policy. The value of the setting is
@@ -46,6 +50,7 @@ DEFAULT_CONFIG = """\
# body: >-
# To continue using this homeserver you must review and agree to the
# terms and conditions at %(consent_uri)s
# send_server_notice_to_guests: True
# block_events_error: >-
# To continue using this homeserver you must review and agree to the
# terms and conditions at %(consent_uri)s
@@ -60,6 +65,7 @@ class ConsentConfig(Config):
self.user_consent_version = None
self.user_consent_template_dir = None
self.user_consent_server_notice_content = None
self.user_consent_server_notice_to_guests = False
self.block_events_without_consent_error = None
def read_config(self, config):
@@ -74,6 +80,9 @@ class ConsentConfig(Config):
self.block_events_without_consent_error = consent_config.get(
"block_events_error",
)
self.user_consent_server_notice_to_guests = bool(consent_config.get(
"send_server_notice_to_guests", False,
))
def default_config(self, **kwargs):
return DEFAULT_CONFIG
+3
View File
@@ -250,6 +250,9 @@ class ContentRepositoryConfig(Config):
# - '192.168.0.0/16'
# - '100.64.0.0/10'
# - '169.254.0.0/16'
# - '::1/128'
# - 'fe80::/64'
# - 'fc00::/7'
#
# List of IP address CIDR ranges that the URL preview spider is allowed
# to access even if they are specified in url_preview_ip_range_blacklist.
+10
View File
@@ -14,8 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from ._base import Config, ConfigError
logger = logging.Logger(__name__)
class ServerConfig(Config):
@@ -138,6 +142,12 @@ class ServerConfig(Config):
metrics_port = config.get("metrics_port")
if metrics_port:
logger.warn(
("The metrics_port configuration option is deprecated in Synapse 0.31 "
"in favour of a listener. Please see "
"http://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst"
" on how to configure the new listener."))
self.listeners.append({
"port": metrics_port,
"bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
+25 -6
View File
@@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
from twisted.internet import defer
from signedjson.sign import (
verify_signed_json, signature_ids, sign_json, encode_canonical_json
verify_signed_json, signature_ids, sign_json, encode_canonical_json,
SignatureVerifyException,
)
from signedjson.key import (
is_signing_algorithm_supported, decode_verify_key_bytes
is_signing_algorithm_supported, decode_verify_key_bytes,
encode_verify_key_base64,
)
from unpaddedbase64 import decode_base64, encode_base64
@@ -56,7 +58,7 @@ Attributes:
key_ids(set(str)): The set of key_ids to that could be used to verify the
JSON object
json_object(dict): The JSON object to verify.
deferred(twisted.internet.defer.Deferred):
deferred(Deferred[str, str, nacl.signing.VerifyKey]):
A deferred (server_name, key_id, verify_key) tuple that resolves when
a verify key has been fetched. The deferreds' callbacks are run with no
logcontext.
@@ -736,6 +738,17 @@ class Keyring(object):
@defer.inlineCallbacks
def _handle_key_deferred(verify_request):
"""Waits for the key to become available, and then performs a verification
Args:
verify_request (VerifyKeyRequest):
Returns:
Deferred[None]
Raises:
SynapseError if there was a problem performing the verification
"""
server_name = verify_request.server_name
try:
with PreserveLoggingContext():
@@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
))
try:
verify_signed_json(json_object, server_name, verify_key)
except Exception:
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
server_name, verify_key.alg, verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s" % (
server_name, verify_key.alg, verify_key.version
"Invalid signature for server %s with key %s:%s: %s" % (
server_name, verify_key.alg, verify_key.version, str(e),
),
Codes.UNAUTHORIZED,
)
+2 -2
View File
@@ -471,14 +471,14 @@ def _check_power_levels(event, auth_events):
]
old_list = current_state.content.get("users", {})
for user in set(old_list.keys() + user_list.keys()):
for user in set(list(old_list) + list(user_list)):
levels_to_check.append(
(user, "users")
)
old_list = current_state.content.get("events", {})
new_list = event.content.get("events", {})
for ev_id in set(old_list.keys() + new_list.keys()):
for ev_id in set(list(old_list) + list(new_list)):
levels_to_check.append(
(ev_id, "events")
)
+1 -1
View File
@@ -146,7 +146,7 @@ class EventBase(object):
return field in self._event_dict
def items(self):
return self._event_dict.items()
return list(self._event_dict.items())
class FrozenEvent(EventBase):
+9 -12
View File
@@ -32,20 +32,17 @@ from synapse.federation.federation_base import (
FederationBase,
event_from_pdu_json,
)
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from prometheus_client import Counter
logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
PDU_RETRY_TIME_MS = 1 * 60 * 1000
@@ -108,7 +105,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc(query_type)
sent_queries_counter.labels(query_type).inc()
return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
@@ -127,7 +124,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_device_keys")
sent_queries_counter.labels("client_device_keys").inc()
return self.transport_layer.query_client_keys(
destination, content, timeout
)
@@ -137,7 +134,7 @@ class FederationClient(FederationBase):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.inc("user_devices")
sent_queries_counter.labels("user_devices").inc()
return self.transport_layer.query_user_devices(
destination, user_id, timeout
)
@@ -154,7 +151,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_one_time_keys")
sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(
destination, content, timeout
)
@@ -394,7 +391,7 @@ class FederationClient(FederationBase):
"""
if return_local:
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
signed_events = list(seen_events.values())
else:
seen_events = yield self.store.have_seen_events(event_ids)
signed_events = []
@@ -592,7 +589,7 @@ class FederationClient(FederationBase):
}
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, pdus.values(),
destination, list(pdus.values()),
outlier=True,
)
+10 -9
View File
@@ -27,12 +27,13 @@ from synapse.federation.federation_base import (
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
from prometheus_client import Counter
from six import iteritems
# when processing incoming transactions, we try to handle multiple rooms in
@@ -41,17 +42,17 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
# synapse.federation.federation_server is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
received_pdus_counter = metrics.register_counter("received_pdus")
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
received_edus_counter = metrics.register_counter("received_edus")
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
class FederationServer(FederationBase):
def __init__(self, hs):
super(FederationServer, self).__init__(hs)
@@ -131,7 +132,7 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id)
received_pdus_counter.inc_by(len(transaction.pdus))
received_pdus_counter.inc(len(transaction.pdus))
pdus_by_room = {}
@@ -292,7 +293,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))
+33 -43
View File
@@ -33,9 +33,9 @@ from .units import Edu
from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
import synapse.metrics
from synapse.metrics import LaterGauge
from blist import sorteddict
from sortedcontainers import SortedDict
from collections import namedtuple
import logging
@@ -45,9 +45,6 @@ from six import itervalues, iteritems
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
class FederationRemoteSendQueue(object):
"""A drop in replacement for TransactionQueue"""
@@ -58,29 +55,27 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id
self.presence_changed = SortedDict() # Stream position -> user_id
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
self.edus = sorteddict() # stream position -> Edu
self.edus = SortedDict() # stream position -> Edu
self.failures = sorteddict() # stream position -> (destination, Failure)
self.failures = SortedDict() # stream position -> (destination, Failure)
self.device_messages = sorteddict() # stream position -> destination
self.device_messages = SortedDict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
self.pos_time = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
metrics.register_callback(
queue_name + "_size",
lambda: len(queue),
)
LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
"", [], lambda: len(queue))
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@@ -103,7 +98,7 @@ class FederationRemoteSendQueue(object):
now = self.clock.time_msec()
keys = self.pos_time.keys()
time = keys.bisect_left(now - FIVE_MINUTES_AGO)
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
@@ -118,7 +113,7 @@ class FederationRemoteSendQueue(object):
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = keys.bisect_left(position_to_delete)
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
@@ -136,7 +131,7 @@ class FederationRemoteSendQueue(object):
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_left(position_to_delete)
i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
@@ -150,19 +145,19 @@ class FederationRemoteSendQueue(object):
# Delete things out of edu map
keys = self.edus.keys()
i = keys.bisect_left(position_to_delete)
i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
i = keys.bisect_left(position_to_delete)
i = self.failures.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map
keys = self.device_messages.keys()
i = keys.bisect_left(position_to_delete)
i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
@@ -202,7 +197,7 @@ class FederationRemoteSendQueue(object):
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
@@ -255,13 +250,12 @@ class FederationRemoteSendQueue(object):
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for user_id in self.presence_changed[pos]
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
@@ -270,13 +264,12 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
@@ -285,19 +278,17 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed edus
keys = self.edus.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
edus = ((k, self.edus[k]) for k in keys[i:j])
i = self.edus.bisect_right(from_token)
j = self.edus.bisect_right(to_token) + 1
edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Fetch changed failures
keys = self.failures.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
failures = ((k, self.failures[k]) for k in keys[i:j])
i = self.failures.bisect_right(from_token)
j = self.failures.bisect_right(to_token) + 1
failures = self.failures.items()[i:j]
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
@@ -306,10 +297,9 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed device messages
keys = self.device_messages.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
+34 -29
View File
@@ -26,23 +26,25 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
from synapse.metrics import LaterGauge
from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
events_processed_counter,
)
from prometheus_client import Counter
from six import itervalues
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = client_metrics.register_distribution(
"sent_pdu_destinations"
sent_pdus_destination_dist = Counter(
"synapse_federation_transaction_queue_sent_pdu_destinations", ""
)
sent_edus_counter = client_metrics.register_counter("sent_edus")
sent_transactions_counter = client_metrics.register_counter("sent_transactions")
events_processed_counter = client_metrics.register_counter("events_processed")
class TransactionQueue(object):
@@ -69,8 +71,10 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
metrics.register_callback(
"pending_destinations",
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: len(self.pending_transactions),
)
@@ -94,12 +98,16 @@ class TransactionQueue(object):
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
"pending_pdus",
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
@@ -228,7 +236,7 @@ class TransactionQueue(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
for evs in itervalues(events_by_room)
],
consumeErrors=True
))
@@ -241,18 +249,15 @@ class TransactionQueue(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc_by(len(events))
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
@@ -275,7 +280,7 @@ class TransactionQueue(object):
if not destinations:
return
sent_pdus_destination_dist.inc_by(len(destinations))
sent_pdus_destination_dist.inc(len(destinations))
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -322,7 +327,7 @@ class TransactionQueue(object):
if not states_map:
break
yield self._process_presence_inner(states_map.values())
yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
+2 -2
View File
@@ -114,14 +114,14 @@ class BaseHandler(object):
if guest_access != "can_join":
if context:
current_state = yield self.store.get_events(
context.current_state_ids.values()
list(context.current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
event.room_id
)
current_state = current_state.values()
current_state = list(current_state.values())
logger.info("maybe_kick_guest_users %r", current_state)
yield self.kick_guest_users(current_state)
+12 -14
View File
@@ -15,20 +15,21 @@
from twisted.internet import defer
from six import itervalues
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import (
make_deferred_yieldable, run_in_background,
)
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
events_processed_counter = metrics.register_counter("events_processed")
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
def log_failure(failure):
@@ -120,7 +121,7 @@ class ApplicationServicesHandler(object):
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
for evs in itervalues(events_by_room)
], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
@@ -128,18 +129,15 @@ class ApplicationServicesHandler(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
synapse.metrics.event_processing_positions.labels(
"appservice_sender").set(upper_bound)
events_processed_counter.inc_by(len(events))
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"appservice_sender").set(ts)
finally:
self.is_processing = False
+3 -3
View File
@@ -249,7 +249,7 @@ class AuthHandler(BaseHandler):
errordict = e.error_dict()
for f in flows:
if len(set(f) - set(creds.keys())) == 0:
if len(set(f) - set(creds)) == 0:
# it's very useful to know what args are stored, but this can
# include the password in the case of registering, so only log
# the keys (confusingly, clientdict may contain a password
@@ -257,12 +257,12 @@ class AuthHandler(BaseHandler):
# and is not sensitive).
logger.info(
"Auth completed with creds: %r. Client dict has keys: %r",
creds, clientdict.keys()
creds, list(clientdict)
)
defer.returnValue((creds, clientdict, session['id']))
ret = self._auth_dict_for_flows(flows, session)
ret['completed'] = creds.keys()
ret['completed'] = list(creds)
ret.update(errordict)
raise InteractiveAuthIncompleteError(
ret,
+2 -2
View File
@@ -114,7 +114,7 @@ class DeviceHandler(BaseHandler):
user_id, device_id=None
)
devices = device_map.values()
devices = list(device_map.values())
for device in devices:
_update_device_from_client_ips(device, ips)
@@ -187,7 +187,7 @@ class DeviceHandler(BaseHandler):
defer.Deferred:
"""
device_map = yield self.store.get_devices_by_user(user_id)
device_ids = device_map.keys()
device_ids = list(device_map)
if except_device_id is not None:
device_ids = [d for d in device_ids if d != except_device_id]
yield self.delete_devices(user_id, device_ids)
+12 -9
View File
@@ -52,7 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
logger = logging.getLogger(__name__)
@@ -480,8 +479,8 @@ class FederationHandler(BaseHandler):
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
for key_to_eid in event_to_state_ids.itervalues()
for key, e_id in key_to_eid.iteritems()
for key_to_eid in list(event_to_state_ids.values())
for key, e_id in key_to_eid.items()
if key[0] != EventTypes.Member or check_match(key[1])
])
@@ -1149,13 +1148,13 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
state_ids = context.prev_state_ids.values()
state_ids = list(context.prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(context.prev_state_ids.values())
state = yield self.store.get_events(list(context.prev_state_ids.values()))
defer.returnValue({
"state": state.values(),
"state": list(state.values()),
"auth_chain": auth_chain,
})
@@ -1405,7 +1404,7 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
res = results.values()
res = list(results.values())
for event in res:
# We sign these again because there was a bug where we
# incorrectly signed things the first time round
@@ -1446,7 +1445,7 @@ class FederationHandler(BaseHandler):
else:
results.pop((event.type, event.state_key), None)
defer.returnValue(results.values())
defer.returnValue(list(results.values()))
else:
defer.returnValue([])
@@ -1795,6 +1794,10 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
)
defer.returnValue(missing_events)
@defer.inlineCallbacks
@@ -1915,7 +1918,7 @@ class FederationHandler(BaseHandler):
})
new_state = self.state_handler.resolve_events(
[local_view.values(), remote_view.values()],
[list(local_view.values()), list(remote_view.values())],
event
)
+3
View File
@@ -572,6 +572,9 @@ class EventCreationHandler(object):
u = yield self.store.get_user_by_id(user_id)
assert u is not None
if u["appservice_id"] is not None:
# users registered by an appservice are exempt
return
if u["consent_version"] == self.config.user_consent_version:
return
+39 -34
View File
@@ -38,26 +38,29 @@ from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
import synapse.metrics
from synapse.metrics import LaterGauge
import logging
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_presence_counter = metrics.register_counter("notified_presence")
federation_presence_out_counter = metrics.register_counter("federation_presence_out")
presence_updates_counter = metrics.register_counter("presence_updates")
timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out", "")
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
state_transition_counter = metrics.register_counter(
"state_transition", labels=["from", "to"]
notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason", "", ["reason"])
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["from", "to"]
)
@@ -142,8 +145,9 @@ class PresenceHandler(object):
for state in active_presence
}
metrics.register_callback(
"user_to_current_state_size", lambda: len(self.user_to_current_state)
LaterGauge(
"synapse_handlers_presence_user_to_current_state_size", "", [],
lambda: len(self.user_to_current_state)
)
now = self.clock.time_msec()
@@ -213,7 +217,8 @@ class PresenceHandler(object):
60 * 1000,
)
metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
@defer.inlineCallbacks
def _on_shutdown(self):
@@ -316,11 +321,11 @@ class PresenceHandler(object):
# TODO: We should probably ensure there are no races hereafter
presence_updates_counter.inc_by(len(new_states))
presence_updates_counter.inc(len(new_states))
if to_notify:
notified_presence_counter.inc_by(len(to_notify))
yield self._persist_and_notify(to_notify.values())
notified_presence_counter.inc(len(to_notify))
yield self._persist_and_notify(list(to_notify.values()))
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
self.unpersisted_users_changes -= set(to_notify.keys())
@@ -330,7 +335,7 @@ class PresenceHandler(object):
if user_id not in to_notify
}
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
federation_presence_out_counter.inc(len(to_federation_ping))
self._push_to_remotes(to_federation_ping.values())
@@ -368,7 +373,7 @@ class PresenceHandler(object):
for user_id in users_to_check
]
timers_fired_counter.inc_by(len(states))
timers_fired_counter.inc(len(states))
changes = handle_timeouts(
states,
@@ -657,7 +662,7 @@ class PresenceHandler(object):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
federation_presence_counter.inc_by(len(updates))
federation_presence_counter.inc(len(updates))
yield self._update_states(updates)
@defer.inlineCallbacks
@@ -682,7 +687,7 @@ class PresenceHandler(object):
"""
updates = yield self.current_state_for_users(target_user_ids)
updates = updates.values()
updates = list(updates.values())
for user_id in set(target_user_ids) - set(u.user_id for u in updates):
updates.append(UserPresenceState.default(user_id))
@@ -748,11 +753,11 @@ class PresenceHandler(object):
self._push_to_remotes([state])
else:
user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.is_mine_id, user_ids)
user_ids = list(filter(self.is_mine_id, user_ids))
states = yield self.current_state_for_users(user_ids)
self._push_to_remotes(states.values())
self._push_to_remotes(list(states.values()))
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -932,28 +937,28 @@ def should_notify(old_state, new_state):
return False
if old_state.status_msg != new_state.status_msg:
notify_reason_counter.inc("status_msg_change")
notify_reason_counter.labels("status_msg_change").inc()
return True
if old_state.state != new_state.state:
notify_reason_counter.inc("state_change")
state_transition_counter.inc(old_state.state, new_state.state)
notify_reason_counter.labels("state_change").inc()
state_transition_counter.labels(old_state.state, new_state.state).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
notify_reason_counter.inc("current_active_change")
notify_reason_counter.labels("current_active_change").inc()
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
notify_reason_counter.inc("last_active_change_online")
notify_reason_counter.labels("last_active_change_online").inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
notify_reason_counter.inc("last_active_change_not_online")
notify_reason_counter.labels("last_active_change_not_online").inc()
return True
return False
@@ -1027,14 +1032,14 @@ class PresenceEventSource(object):
if changed is not None and len(changed) < 500:
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
get_updates_counter.inc("stream")
get_updates_counter.labels("stream").inc()
for other_user_id in changed:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.inc("full")
get_updates_counter.labels("full").inc()
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
@@ -1046,7 +1051,7 @@ class PresenceEventSource(object):
updates = yield presence.current_state_for_users(user_ids_changed)
if include_offline:
defer.returnValue((updates.values(), max_token))
defer.returnValue((list(updates.values()), max_token))
else:
defer.returnValue(([
s for s in itervalues(updates)
@@ -1107,7 +1112,7 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
if new_state:
changes[state.user_id] = new_state
return changes.values()
return list(changes.values())
def handle_timeout(state, is_mine, syncing_user_ids, now):
+1 -1
View File
@@ -455,7 +455,7 @@ class RoomContextHandler(BaseHandler):
state = yield self.store.get_state_for_events(
[last_event_id], None
)
results["state"] = state[last_event_id].values()
results["state"] = list(state[last_event_id].values())
results["start"] = now_token.copy_and_replace(
"room_key", results["start"]
+2 -1
View File
@@ -15,6 +15,7 @@
from twisted.internet import defer
from six import iteritems
from six.moves import range
from ._base import BaseHandler
@@ -307,7 +308,7 @@ class RoomListHandler(BaseHandler):
)
event_map = yield self.store.get_events([
event_id for key, event_id in current_state_ids.iteritems()
event_id for key, event_id in iteritems(current_state_ids)
if key[0] in (
EventTypes.JoinRules,
EventTypes.Name,
+14 -10
View File
@@ -298,16 +298,6 @@ class RoomMemberHandler(object):
is_blocked = yield self.store.is_room_blocked(room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
else:
# we don't allow people to reject invites to, or leave, the
# server notice room.
is_blocked = yield self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
http_client.FORBIDDEN,
"You cannot leave this room",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
if effective_membership_state == Membership.INVITE:
# block any attempts to invite the server notices mxid
@@ -383,6 +373,20 @@ class RoomMemberHandler(object):
if same_sender and same_membership and same_content:
defer.returnValue(old_state)
# we don't allow people to reject invites to the server notice
# room, but they can leave it once they are joined.
if (
old_membership == Membership.INVITE and
effective_membership_state == Membership.LEAVE
):
is_blocked = yield self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
http_client.FORBIDDEN,
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
is_host_in_room = yield self._is_host_in_room(current_state_ids)
if effective_membership_state == Membership.JOIN:
+1 -1
View File
@@ -348,7 +348,7 @@ class SearchHandler(BaseHandler):
rooms = set(e.room_id for e in allowed_events)
for room_id in rooms:
state = yield self.state_handler.get_current_state(room_id)
state_results[room_id] = state.values()
state_results[room_id] = list(state.values())
state_results.values()
+14 -4
View File
@@ -443,6 +443,10 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1,
)
@@ -537,11 +541,11 @@ class SyncHandler(object):
state = {}
if state_ids:
state = yield self.store.get_events(state_ids.values())
state = yield self.store.get_events(list(state_ids.values()))
defer.returnValue({
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(state.values())
for e in sync_config.filter_collection.filter_room_state(list(state.values()))
})
@defer.inlineCallbacks
@@ -890,7 +894,7 @@ class SyncHandler(object):
presence.extend(states)
# Deduplicate the presence entries so that there's at most one per user
presence = {p.user_id: p for p in presence}.values()
presence = list({p.user_id: p for p in presence}.values())
presence = sync_config.filter_collection.filter_presence(
presence
@@ -1042,7 +1046,13 @@ class SyncHandler(object):
Returns:
Deferred(tuple): Returns a tuple of the form:
`([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
`(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
where:
room_entries is a list [RoomSyncResultBuilder]
invited_rooms is a list [InvitedSyncResult]
newly_joined rooms is a list[str] of room ids
newly_left_rooms is a list[str] of room ids
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
+13
View File
@@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from twisted.internet.defer import CancelledError
from twisted.python import failure
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
uri
)
+12 -18
View File
@@ -19,11 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
import synapse.metrics
from synapse.http.endpoint import SpiderEndpoint
from canonicaljson import encode_canonical_json
@@ -42,6 +41,7 @@ from twisted.web._newclient import ResponseDone
from six import StringIO
from prometheus_client import Counter
import simplejson as json
import logging
import urllib
@@ -49,16 +49,9 @@ import urllib
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
incoming_responses_counter = Counter("synapse_http_client_responses", "",
["method", "code"])
class SimpleHttpClient(object):
@@ -95,9 +88,10 @@ class SimpleHttpClient(object):
def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
outgoing_requests_counter.labels(method).inc()
logger.info("Sending request %s %s", method, uri)
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = self.agent.request(
@@ -109,17 +103,17 @@ class SimpleHttpClient(object):
)
response = yield make_deferred_yieldable(request_deferred)
incoming_responses_counter.inc(method, response.code)
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.inc(method, "ERR")
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, type(e).__name__, e.message
method, redact_uri(uri), type(e).__name__, e.message
)
raise e
+6 -10
View File
@@ -45,19 +45,15 @@ from six.moves.urllib import parse as urlparse
from six import string_types
from prometheus_client import Counter
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
"", ["method", "code"])
MAX_LONG_RETRIES = 10
+92 -126
View File
@@ -16,137 +16,109 @@
import logging
import synapse.metrics
from prometheus_client.core import Counter, Histogram
from synapse.metrics import LaterGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for("synapse.http.server")
# total number of responses served, split by method/servlet/tag
response_count = metrics.register_counter(
"response_count",
labels=["method", "servlet", "tag"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_requests",
"_response_time:count",
"_response_ru_utime:count",
"_response_ru_stime:count",
"_response_db_txn_count:count",
"_response_db_txn_duration:count",
)
)
response_count = Counter(
"synapse_http_server_response_count", "", ["method", "servlet", "tag"]
)
requests_counter = metrics.register_counter(
"requests_received",
labels=["method", "servlet", ],
requests_counter = Counter(
"synapse_http_server_requests_received", "", ["method", "servlet"]
)
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
outgoing_responses_counter = Counter(
"synapse_http_server_responses", "", ["method", "code"]
)
response_timer = metrics.register_counter(
"response_time_seconds",
labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_time:total",
),
response_timer = Histogram(
"synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
)
response_ru_utime = metrics.register_counter(
"response_ru_utime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_utime:total",
),
response_ru_utime = Counter(
"synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"]
)
response_ru_stime = metrics.register_counter(
"response_ru_stime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_stime:total",
),
response_ru_stime = Counter(
"synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"]
)
response_db_txn_count = metrics.register_counter(
"response_db_txn_count", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_count:total",
),
response_db_txn_count = Counter(
"synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
response_db_txn_duration = metrics.register_counter(
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_duration:total",
),
response_db_txn_duration = Counter(
"synapse_http_server_response_db_txn_duration_seconds",
"",
["method", "servlet", "tag"],
)
# seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
response_db_sched_duration = Counter(
"synapse_http_server_response_db_sched_duration_seconds",
"",
["method", "servlet", "tag"],
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
response_size = Counter(
"synapse_http_server_response_size", "", ["method", "servlet", "tag"]
)
# In flight metrics are incremented while the requests are in flight, rather
# than when the response was written.
in_flight_requests_ru_utime = metrics.register_counter(
"in_flight_requests_ru_utime_seconds", labels=["method", "servlet"],
in_flight_requests_ru_utime = Counter(
"synapse_http_server_in_flight_requests_ru_utime_seconds",
"",
["method", "servlet"],
)
in_flight_requests_ru_stime = metrics.register_counter(
"in_flight_requests_ru_stime_seconds", labels=["method", "servlet"],
in_flight_requests_ru_stime = Counter(
"synapse_http_server_in_flight_requests_ru_stime_seconds",
"",
["method", "servlet"],
)
in_flight_requests_db_txn_count = metrics.register_counter(
"in_flight_requests_db_txn_count", labels=["method", "servlet"],
in_flight_requests_db_txn_count = Counter(
"synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"]
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
in_flight_requests_db_txn_duration = metrics.register_counter(
"in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"],
in_flight_requests_db_txn_duration = Counter(
"synapse_http_server_in_flight_requests_db_txn_duration_seconds",
"",
["method", "servlet"],
)
# seconds spent waiting for a db connection, when processing this request
in_flight_requests_db_sched_duration = metrics.register_counter(
"in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"]
in_flight_requests_db_sched_duration = Counter(
"synapse_http_server_in_flight_requests_db_sched_duration_seconds",
"",
["method", "servlet"],
)
# The set of all in flight requests, set[RequestMetrics]
_in_flight_requests = set()
def _collect_in_flight():
"""Called just before metrics are collected, so we use it to update all
the in flight request metrics
"""
for rm in _in_flight_requests:
rm.update_metrics()
metrics.register_collector(_collect_in_flight)
def _get_in_flight_counts():
"""Returns a count of all in flight requests by (method, server_name)
Returns:
dict[tuple[str, str], int]
"""
for rm in _in_flight_requests:
rm.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type
@@ -158,16 +130,17 @@ def _get_in_flight_counts():
return counts
metrics.register_callback(
"in_flight_requests_count",
LaterGauge(
"synapse_http_request_metrics_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,
labels=["method", "servlet"]
)
class RequestMetrics(object):
def start(self, time_msec, name, method):
self.start = time_msec
def start(self, time_sec, name, method):
self.start = time_sec
self.start_context = LoggingContext.current_context()
self.name = name
self.method = method
@@ -176,7 +149,7 @@ class RequestMetrics(object):
_in_flight_requests.add(self)
def stop(self, time_msec, request):
def stop(self, time_sec, request):
_in_flight_requests.discard(self)
context = LoggingContext.current_context()
@@ -192,34 +165,29 @@ class RequestMetrics(object):
)
return
outgoing_responses_counter.inc(request.method, str(request.code))
outgoing_responses_counter.labels(request.method, str(request.code)).inc()
response_count.inc(request.method, self.name, tag)
response_count.labels(request.method, self.name, tag).inc()
response_timer.inc_by(
time_msec - self.start, request.method,
self.name, tag
response_timer.labels(request.method, self.name, tag).observe(
time_sec - self.start
)
ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by(
ru_utime, request.method, self.name, tag
response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime)
response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime)
response_db_txn_count.labels(request.method, self.name, tag).inc(
context.db_txn_count
)
response_ru_stime.inc_by(
ru_stime, request.method, self.name, tag
response_db_txn_duration.labels(request.method, self.name, tag).inc(
context.db_txn_duration_sec
)
response_db_txn_count.inc_by(
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
response_db_sched_duration.labels(request.method, self.name, tag).inc(
context.db_sched_duration_sec
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
response_size.labels(request.method, self.name, tag).inc(request.sentLength)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
@@ -229,27 +197,21 @@ class RequestMetrics(object):
def update_metrics(self):
"""Updates the in flight metrics with values from this request.
"""
diff = self._request_stats.update(self.start_context)
in_flight_requests_ru_utime.inc_by(
diff.ru_utime, self.method, self.name,
in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count
)
in_flight_requests_ru_stime.inc_by(
diff.ru_stime, self.method, self.name,
in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(
diff.db_txn_duration_sec
)
in_flight_requests_db_txn_count.inc_by(
diff.db_txn_count, self.method, self.name,
)
in_flight_requests_db_txn_duration.inc_by(
diff.db_txn_duration_ms / 1000., self.method, self.name,
)
in_flight_requests_db_sched_duration.inc_by(
diff.db_sched_duration_ms / 1000., self.method, self.name,
in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(
diff.db_sched_duration_sec
)
@@ -258,17 +220,21 @@ class _RequestStats(object):
"""
__slots__ = [
"ru_utime", "ru_stime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"ru_utime",
"ru_stime",
"db_txn_count",
"db_txn_duration_sec",
"db_sched_duration_sec",
]
def __init__(self, ru_utime, ru_stime, db_txn_count,
db_txn_duration_ms, db_sched_duration_ms):
def __init__(
self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec
):
self.ru_utime = ru_utime
self.ru_stime = ru_stime
self.db_txn_count = db_txn_count
self.db_txn_duration_ms = db_txn_duration_ms
self.db_sched_duration_ms = db_sched_duration_ms
self.db_txn_duration_sec = db_txn_duration_sec
self.db_sched_duration_sec = db_sched_duration_sec
@staticmethod
def from_context(context):
@@ -277,8 +243,8 @@ class _RequestStats(object):
return _RequestStats(
ru_utime, ru_stime,
context.db_txn_count,
context.db_txn_duration_ms,
context.db_sched_duration_ms,
context.db_txn_duration_sec,
context.db_sched_duration_sec,
)
def update(self, context):
@@ -294,14 +260,14 @@ class _RequestStats(object):
new.ru_utime - self.ru_utime,
new.ru_stime - self.ru_stime,
new.db_txn_count - self.db_txn_count,
new.db_txn_duration_ms - self.db_txn_duration_ms,
new.db_sched_duration_ms - self.db_sched_duration_ms,
new.db_txn_duration_sec - self.db_txn_duration_sec,
new.db_sched_duration_sec - self.db_sched_duration_sec,
)
self.ru_utime = new.ru_utime
self.ru_stime = new.ru_stime
self.db_txn_count = new.db_txn_count
self.db_txn_duration_ms = new.db_txn_duration_ms
self.db_sched_duration_ms = new.db_sched_duration_ms
self.db_txn_duration_sec = new.db_txn_duration_sec
self.db_sched_duration_sec = new.db_sched_duration_sec
return diff
+2 -2
View File
@@ -210,8 +210,8 @@ def wrap_request_handler_with_logging(h):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.inc(request.method,
request.request_metrics.name)
requests_counter.labels(request.method,
request.request_metrics.name).inc()
yield d
return wrapped_request_handler
+12 -17
View File
@@ -14,18 +14,16 @@
import contextlib
import logging
import re
import time
from twisted.web.server import Site, Request
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
_next_request_seq = 0
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
self.uri
)
return redact_uri(self.uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -83,7 +78,7 @@ class SynapseRequest(Request):
return Request.render(self, resrc)
def _started_processing(self, servlet_name):
self.start_time = int(time.time() * 1000)
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
self.start_time, name=servlet_name, method=self.method,
@@ -102,26 +97,26 @@ class SynapseRequest(Request):
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
db_txn_count = context.db_txn_count
db_txn_duration_ms = context.db_txn_duration_ms
db_sched_duration_ms = context.db_sched_duration_ms
db_txn_duration_sec = context.db_txn_duration_sec
db_sched_duration_sec = context.db_sched_duration_sec
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_ms = (0, 0)
db_txn_count, db_txn_duration_sec = (0, 0)
end_time = int(time.time() * 1000)
end_time = time.time()
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
end_time - self.start_time,
int(ru_utime * 1000),
int(ru_stime * 1000),
db_sched_duration_ms,
db_txn_duration_ms,
ru_utime,
ru_stime,
db_sched_duration_sec,
db_txn_duration_sec,
int(db_txn_count),
self.sentLength,
self.code,
+138 -125
View File
@@ -17,165 +17,178 @@ import logging
import functools
import time
import gc
import os
import platform
import attr
from prometheus_client import Gauge, Histogram, Counter
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
running_on_pypy = platform.python_implementation() == "PyPy"
all_metrics = []
all_collectors = []
all_gauges = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class Metrics(object):
""" A single Metrics object gives a (mutable) slice view of the all_metrics
dict, allowing callers to easily register new metrics that are namespaced
nicely."""
class RegistryProxy(object):
def __init__(self, name):
self.name_prefix = name
def make_subspace(self, name):
return Metrics("%s_%s" % (self.name_prefix, name))
def register_collector(self, func):
all_collectors.append(func)
def _register(self, metric_class, name, *args, **kwargs):
full_name = "%s_%s" % (self.name_prefix, name)
metric = metric_class(full_name, *args, **kwargs)
all_metrics.append(metric)
return metric
def register_counter(self, *args, **kwargs):
"""
Returns:
CounterMetric
"""
return self._register(CounterMetric, *args, **kwargs)
def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
"""
Returns:
CallbackMetric
"""
return self._register(CallbackMetric, *args, **kwargs)
def register_distribution(self, *args, **kwargs):
"""
Returns:
DistributionMetric
"""
return self._register(DistributionMetric, *args, **kwargs)
def register_cache(self, *args, **kwargs):
"""
Returns:
CacheMetric
"""
return self._register(CacheMetric, *args, **kwargs)
@staticmethod
def collect():
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
def register_memory_metrics(hs):
try:
import psutil
process = psutil.Process()
process.memory_info().rss
except (ImportError, AttributeError):
logger.warn(
"psutil is not installed or incorrect version."
" Disabling memory metrics."
)
return
metric = MemoryUsageMetric(hs, psutil)
all_metrics.append(metric)
@attr.s(hash=True)
class LaterGauge(object):
name = attr.ib()
desc = attr.ib()
labels = attr.ib(hash=False)
caller = attr.ib()
def get_metrics_for(pkg_name):
""" Returns a Metrics instance for conveniently creating metrics
namespaced with the given name prefix. """
def collect(self):
# Convert a "package.name" to "package_name" because Prometheus doesn't
# let us use . in metric names
return Metrics(pkg_name.replace(".", "_"))
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
def render_all():
strs = []
for collector in all_collectors:
collector()
for metric in all_metrics:
try:
strs += metric.render()
calls = self.caller()
except Exception:
strs += ["# FAILED to render"]
logger.exception("Failed to render metric")
logger.exception(
"Exception running callback for LaterGuage(%s)",
self.name,
)
yield g
return
strs.append("") # to generate a final CRLF
if isinstance(calls, dict):
for k, v in calls.items():
g.add_metric(k, v)
else:
g.add_metric([], calls)
return "\n".join(strs)
yield g
def __attrs_post_init__(self):
self._register()
def _register(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
register_process_collector(get_metrics_for("process"))
#
# Detailed CPU metrics
#
class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
# Try and get the system config
ticks_per_sec = os.sysconf('SC_CLK_TCK')
except (ValueError, TypeError, AttributeError):
pass
self.ticks_per_sec = ticks_per_sec
def collect(self):
if not HAVE_PROC_SELF_STAT:
return
with open("/proc/self/stat") as s:
line = s.read()
raw_stats = line.split(") ", 1)[1].split(" ")
user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
yield user
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
yield sys
python_metrics = get_metrics_for("python")
REGISTRY.register(CPUMetrics())
gc_time = python_metrics.register_distribution("gc_time", labels=["gen"])
gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"])
python_metrics.register_callback(
"gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
#
# Python GC metrics
#
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
)
reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
class GCCounts(object):
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
yield cm
REGISTRY.register(GCCounts())
#
# Twisted reactor metrics
#
tick_time = Histogram(
"python_twisted_reactor_tick_time",
"Tick time of the Twisted reactor (sec)",
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
)
pending_calls_metric = Histogram(
"python_twisted_reactor_pending_calls",
"Pending calls",
buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
)
#
# Federation Metrics
#
sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
event_persisted_position = Gauge("synapse_event_persisted_position", "")
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
def runUntilCurrentTimer(func):
@@ -197,17 +210,17 @@ def runUntilCurrentTimer(func):
num_pending += 1
num_pending += len(reactor.threadCallQueue)
start = time.time() * 1000
start = time.time()
ret = func(*args, **kwargs)
end = time.time() * 1000
end = time.time()
# record the amount of wallclock time spent running pending calls.
# This is a proxy for the actual amount of time between reactor polls,
# since about 25% of time is actually spent running things triggered by
# I/O events, but that is harder to capture without rewriting half the
# reactor.
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
if running_on_pypy:
return ret
@@ -220,12 +233,12 @@ def runUntilCurrentTimer(func):
if threshold[i] < counts[i]:
logger.info("Collecting gc %d", i)
start = time.time() * 1000
start = time.time()
unreachable = gc.collect(i)
end = time.time() * 1000
end = time.time()
gc_time.inc_by(end - start, i)
gc_unreachable.inc_by(unreachable, i)
gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)
return ret
-328
View File
@@ -1,328 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
import logging
import re
logger = logging.getLogger(__name__)
def flatten(items):
"""Flatten a list of lists
Args:
items: iterable[iterable[X]]
Returns:
list[X]: flattened list
"""
return list(chain.from_iterable(items))
class BaseMetric(object):
"""Base class for metrics which report a single value per label set
"""
def __init__(self, name, labels=[], alternative_names=[]):
"""
Args:
name (str): principal name for this metric
labels (list(str)): names of the labels which will be reported
for this metric
alternative_names (iterable(str)): list of alternative names for
this metric. This can be useful to provide a migration path
when renaming metrics.
"""
self._names = [name] + list(alternative_names)
self.labels = labels # OK not to clone as we never write it
def dimension(self):
return len(self.labels)
def is_scalar(self):
return not len(self.labels)
def _render_labelvalue(self, value):
return '"%s"' % (_escape_label_value(value),)
def _render_key(self, values):
if self.is_scalar():
return ""
return "{%s}" % (
",".join(["%s=%s" % (k, self._render_labelvalue(v))
for k, v in zip(self.labels, values)])
)
def _render_for_labels(self, label_values, value):
"""Render this metric for a single set of labels
Args:
label_values (list[object]): values for each of the labels,
(which get stringified).
value: value of the metric at with these labels
Returns:
iterable[str]: rendered metric
"""
rendered_labels = self._render_key(label_values)
return (
"%s%s %.12g" % (name, rendered_labels, value)
for name in self._names
)
def render(self):
"""Render this metric
Each metric is rendered as:
name{label1="val1",label2="val2"} value
https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details
Returns:
iterable[str]: rendered metrics
"""
raise NotImplementedError()
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
value that counts events or running totals.
Example use cases for Counters:
- Number of requests processed
- Number of items that were inserted into a queue
- Total amount of data that a system has processed
Counters can only go up (and be reset when the process restarts).
"""
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0.
def inc_by(self, incr, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
if values not in self.counts:
self.counts[values] = incr
else:
self.counts[values] += incr
def inc(self, *values):
self.inc_by(1, *values)
def render(self):
return flatten(
self._render_for_labels(k, self.counts[k])
for k in sorted(self.counts.keys())
)
class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""
def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}
def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
self.guages[values] = v
def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
size or other state of some in-memory object by actively querying it."""
def __init__(self, name, callback, labels=[]):
super(CallbackMetric, self).__init__(name, labels=labels)
self.callback = callback
def render(self):
try:
value = self.callback()
except Exception:
logger.exception("Failed to render %s", self.name)
return ["# FAILED to render " + self.name]
if self.is_scalar():
return list(self._render_for_labels([], value))
return flatten(
self._render_for_labels(k, value[k])
for k in sorted(value.keys())
)
class DistributionMetric(object):
"""A combination of an event counter and an accumulator, which counts
both the number of events and accumulates the total value. Typically this
could be used to keep track of method-running times, or other distributions
of values that occur in discrete occurances.
TODO(paul): Try to export some heatmap-style stats?
"""
def __init__(self, name, *args, **kwargs):
self.counts = CounterMetric(name + ":count", **kwargs)
self.totals = CounterMetric(name + ":total", **kwargs)
def inc_by(self, inc, *values):
self.counts.inc(*values)
self.totals.inc_by(inc, *values)
def render(self):
return self.counts.render() + self.totals.render()
class CacheMetric(object):
__slots__ = (
"name", "cache_name", "hits", "misses", "evicted_size", "size_callback",
)
def __init__(self, name, size_callback, cache_name):
self.name = name
self.cache_name = cache_name
self.hits = 0
self.misses = 0
self.evicted_size = 0
self.size_callback = size_callback
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def render(self):
size = self.size_callback()
hits = self.hits
total = self.misses + self.hits
return [
"""%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
"""%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
"""%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
"""%s:evicted_size{name="%s"} %d""" % (
self.name, self.cache_name, self.evicted_size
),
]
class MemoryUsageMetric(object):
"""Keeps track of the current memory usage, using psutil.
The class will keep the current min/max/sum/counts of rss over the last
WINDOW_SIZE_SEC, by polling UPDATE_HZ times per second
"""
UPDATE_HZ = 2 # number of times to get memory per second
WINDOW_SIZE_SEC = 30 # the size of the window in seconds
def __init__(self, hs, psutil):
clock = hs.get_clock()
self.memory_snapshots = []
self.process = psutil.Process()
clock.looping_call(self._update_curr_values, 1000 / self.UPDATE_HZ)
def _update_curr_values(self):
max_size = self.UPDATE_HZ * self.WINDOW_SIZE_SEC
self.memory_snapshots.append(self.process.memory_info().rss)
self.memory_snapshots[:] = self.memory_snapshots[-max_size:]
def render(self):
if not self.memory_snapshots:
return []
max_rss = max(self.memory_snapshots)
min_rss = min(self.memory_snapshots)
sum_rss = sum(self.memory_snapshots)
len_rss = len(self.memory_snapshots)
return [
"process_psutil_rss:max %d" % max_rss,
"process_psutil_rss:min %d" % min_rss,
"process_psutil_rss:total %d" % sum_rss,
"process_psutil_rss:count %d" % len_rss,
]
def _escape_character(m):
"""Replaces a single character with its escape sequence.
Args:
m (re.MatchObject): A match object whose first group is the single
character to replace
Returns:
str
"""
c = m.group(1)
if c == "\\":
return "\\\\"
elif c == "\"":
return "\\\""
elif c == "\n":
return "\\n"
return c
def _escape_label_value(value):
"""Takes a label value and escapes quotes, newlines and backslashes
"""
return re.sub(r"([\n\"\\])", _escape_character, str(value))
-123
View File
@@ -1,123 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from six import iteritems
TICKS_PER_SEC = 100
BYTES_PER_PAGE = 4096
HAVE_PROC_STAT = os.path.exists("/proc/stat")
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits")
HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd")
# Field indexes from /proc/self/stat, taken from the proc(5) manpage
STAT_FIELDS = {
"utime": 14,
"stime": 15,
"starttime": 22,
"vsize": 23,
"rss": 24,
}
stats = {}
# In order to report process_start_time_seconds we need to know the
# machine's boot time, because the value in /proc/self/stat is relative to
# this
boot_time = None
if HAVE_PROC_STAT:
with open("/proc/stat") as _procstat:
for line in _procstat:
if line.startswith("btime "):
boot_time = int(line.split()[1])
def update_resource_metrics():
if HAVE_PROC_SELF_STAT:
global stats
with open("/proc/self/stat") as s:
line = s.read()
# line is PID (command) more stats go here ...
raw_stats = line.split(") ", 1)[1].split(" ")
for (name, index) in iteritems(STAT_FIELDS):
# subtract 3 from the index, because proc(5) is 1-based, and
# we've lost the first two fields in PID and COMMAND above
stats[name] = int(raw_stats[index - 3])
def _count_fds():
# Not every OS will have a /proc/self/fd directory
if not HAVE_PROC_SELF_FD:
return 0
return len(os.listdir("/proc/self/fd"))
def register_process_collector(process_metrics):
process_metrics.register_collector(update_resource_metrics)
if HAVE_PROC_SELF_STAT:
process_metrics.register_callback(
"cpu_user_seconds_total",
lambda: float(stats["utime"]) / TICKS_PER_SEC
)
process_metrics.register_callback(
"cpu_system_seconds_total",
lambda: float(stats["stime"]) / TICKS_PER_SEC
)
process_metrics.register_callback(
"cpu_seconds_total",
lambda: (float(stats["utime"] + stats["stime"])) / TICKS_PER_SEC
)
process_metrics.register_callback(
"virtual_memory_bytes",
lambda: int(stats["vsize"])
)
process_metrics.register_callback(
"resident_memory_bytes",
lambda: int(stats["rss"]) * BYTES_PER_PAGE
)
process_metrics.register_callback(
"start_time_seconds",
lambda: boot_time + int(stats["starttime"]) / TICKS_PER_SEC
)
if HAVE_PROC_SELF_FD:
process_metrics.register_callback(
"open_fds",
lambda: _count_fds()
)
if HAVE_PROC_SELF_LIMITS:
def _get_max_fds():
with open("/proc/self/limits") as limits:
for line in limits:
if not line.startswith("Max open files "):
continue
# Line is Max open files $SOFT $HARD
return int(line.split()[3])
return None
process_metrics.register_callback(
"max_fds",
lambda: _get_max_fds()
)
+2 -21
View File
@@ -13,27 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.web.resource import Resource
import synapse.metrics
from prometheus_client.twisted import MetricsResource
METRICS_PREFIX = "/_synapse/metrics"
class MetricsResource(Resource):
isLeaf = True
def __init__(self, hs):
Resource.__init__(self) # Resource is old-style, so no super()
self.hs = hs
def render_GET(self, request):
response = synapse.metrics.render_all()
request.setHeader("Content-Type", "text/plain")
request.setHeader("Content-Length", str(len(response)))
# Encode as UTF-8 (default)
return response.encode()
__all__ = ["MetricsResource", "METRICS_PREFIX"]
+11 -13
View File
@@ -28,22 +28,20 @@ from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
import synapse.metrics
from synapse.metrics import LaterGauge
from collections import namedtuple
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = Counter("synapse_notifier_notified_events", "")
notified_events_counter = metrics.register_counter("notified_events")
users_woken_by_stream_counter = metrics.register_counter(
"users_woken_by_stream", labels=["stream"]
)
users_woken_by_stream_counter = Counter(
"synapse_notifier_users_woken_by_stream", "", ["stream"])
# TODO(paul): Should be shared somewhere
@@ -108,7 +106,7 @@ class _NotifierUserStream(object):
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
users_woken_by_stream_counter.inc(stream_key)
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
@@ -197,14 +195,14 @@ class Notifier(object):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
metrics.register_callback(
"rooms",
LaterGauge(
"synapse_notifier_rooms", "", [],
lambda: count(bool, self.room_to_user_streams.values()),
)
metrics.register_callback(
"users",
LaterGauge(
"synapse_notifier_users", "", [],
lambda: len(self.user_to_user_stream),
)
+1 -1
View File
@@ -39,7 +39,7 @@ def list_with_base_rules(rawrules):
rawrules = [r for r in rawrules if r['priority_class'] >= 0]
# shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
ruleslist.extend(make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
+13 -17
View File
@@ -22,14 +22,13 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.event_auth import get_user_power_level
from synapse.api.constants import EventTypes, Membership
from synapse.metrics import get_metrics_for
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches import register_cache
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
from synapse.state import POWER_KEY
from collections import namedtuple
from prometheus_client import Counter
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@@ -37,21 +36,18 @@ logger = logging.getLogger(__name__)
rules_by_room = {}
push_metrics = get_metrics_for(__name__)
push_rules_invalidation_counter = push_metrics.register_counter(
"push_rules_invalidation_counter"
)
push_rules_state_size_counter = push_metrics.register_counter(
"push_rules_state_size_counter"
)
push_rules_invalidation_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
push_rules_state_size_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
# Measures whether we use the fast path of using state deltas, or if we have to
# recalculate from scratch
push_rules_delta_state_cache_metric = cache_metrics.register_cache(
push_rules_delta_state_cache_metric = register_cache(
"cache",
size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values
cache_name="push_rules_delta_state_cache_metric",
"push_rules_delta_state_cache_metric",
cache=[], # Meaningless size, as this isn't a cache that stores values
)
@@ -65,10 +61,10 @@ class BulkPushRuleEvaluator(object):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.room_push_rule_cache_metrics = cache_metrics.register_cache(
self.room_push_rule_cache_metrics = register_cache(
"cache",
size_callback=lambda: 0, # There's not good value for this
cache_name="room_push_rule_cache",
"room_push_rule_cache",
cache=[], # Meaningless size, as this isn't a cache that stores values
)
@defer.inlineCallbacks
@@ -310,7 +306,7 @@ class RulesForRoom(object):
current_state_ids = context.current_state_ids
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc_by(len(current_state_ids))
push_rules_state_size_counter.inc(len(current_state_ids))
logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids
+4 -9
View File
@@ -20,22 +20,17 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
from . import push_tools
import synapse
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
http_push_processed_counter = metrics.register_counter(
"http_pushes_processed",
)
http_push_failed_counter = metrics.register_counter(
"http_pushes_failed",
)
http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
class HttpPusher(object):
+2 -1
View File
@@ -229,7 +229,8 @@ class Mailer(object):
if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
prev_messages = room_vars['notifs'][-1]['messages']
for message in notifvars['messages']:
pm = filter(lambda pm: pm['id'] == message['id'], prev_messages)
pm = list(filter(lambda pm: pm['id'] == message['id'],
prev_messages))
if pm:
if not message["is_historical"]:
pm[0]["is_historical"] = False
+1 -1
View File
@@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
# so find out who is in the room that isn't the user.
if "m.room.member" in room_state_bytype_ids:
member_events = yield store.get_events(
room_state_bytype_ids["m.room.member"].values()
list(room_state_bytype_ids["m.room.member"].values())
)
all_members = [
ev for ev in member_events.values()
+1 -1
View File
@@ -152,7 +152,7 @@ class PushRuleEvaluatorForEvent(object):
# Caches (glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
register_cache("regex_push_cache", regex_cache)
register_cache("cache", "regex_push_cache", regex_cache)
def _glob_matches(glob, value, word_boundary=False):
+4 -1
View File
@@ -50,13 +50,16 @@ REQUIREMENTS = {
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
"blist": ["blist"],
"sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
"attr": ["attr"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
+40 -58
View File
@@ -60,22 +60,21 @@ from .commands import (
)
from .streams import STREAMS_MAP
from synapse.metrics import LaterGauge
from synapse.util.stringutils import random_string
from synapse.metrics.metric import CounterMetric
import logging
import synapse.metrics
import struct
import fcntl
from prometheus_client import Counter
from collections import defaultdict
from six import iterkeys, iteritems
metrics = synapse.metrics.get_metrics_for(__name__)
connection_close_counter = metrics.register_counter(
"close_reason", labels=["reason_type"],
)
import logging
import struct
import fcntl
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
# A list of all connected protocols. This allows us to send metrics about the
# connections.
@@ -137,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
self.inbound_commands_counter = CounterMetric(
"inbound_commands", labels=["command"],
)
self.outbound_commands_counter = CounterMetric(
"outbound_commands", labels=["command"],
)
self.inbound_commands_counter = defaultdict(int)
self.outbound_commands_counter = defaultdict(int)
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -202,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
self.inbound_commands_counter.inc(cmd_name)
self.inbound_commands_counter[cmd_name] = (
self.inbound_commands_counter[cmd_name] + 1)
cmd_cls = COMMAND_MAP[cmd_name]
try:
@@ -252,8 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
self.outbound_commands_counter.inc(cmd.NAME)
self.outbound_commands_counter[cmd.NAME] = (
self.outbound_commands_counter[cmd.NAME] + 1)
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
@@ -318,9 +314,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
if isinstance(reason, Failure):
connection_close_counter.inc(reason.type.__name__)
connection_close_counter.labels(reason.type.__name__).inc()
else:
connection_close_counter.inc(reason.__class__.__name__)
connection_close_counter.labels(reason.__class__.__name__).inc()
try:
# Remove us from list of connections to be monitored
@@ -519,7 +515,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_RDATA(self, cmd):
stream_name = cmd.stream_name
inbound_rdata_count.inc(stream_name)
inbound_rdata_count.labels(stream_name).inc()
try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
@@ -567,14 +563,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
metrics.register_callback(
"pending_commands",
pending_commands = LaterGauge(
"pending_commands", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
def transport_buffer_size(protocol):
@@ -584,14 +578,12 @@ def transport_buffer_size(protocol):
return 0
metrics.register_callback(
"transport_send_buffer",
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
def transport_kernel_read_buffer_size(protocol, read=True):
@@ -609,48 +601,38 @@ def transport_kernel_read_buffer_size(protocol, read=True):
return 0
metrics.register_callback(
"transport_kernel_send_buffer",
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
metrics.register_callback(
"transport_kernel_read_buffer",
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
metrics.register_callback(
"inbound_commands",
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter.counts)
},
labels=["command", "name", "conn_id"],
)
for k, count in iteritems(p.inbound_commands_counter)
})
metrics.register_callback(
"outbound_commands",
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter.counts)
},
labels=["command", "name", "conn_id"],
)
for k, count in iteritems(p.outbound_commands_counter)
})
# number of updates received for each RDATA stream
inbound_rdata_count = metrics.register_counter(
"inbound_rdata_count",
labels=["stream_name"],
)
inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
["stream_name"])
+17 -17
View File
@@ -22,21 +22,21 @@ from .streams import STREAMS_MAP, FederationStream
from .protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func
from synapse.metrics import LaterGauge
import logging
import synapse.metrics
from prometheus_client import Counter
from six import itervalues
metrics = synapse.metrics.get_metrics_for(__name__)
stream_updates_counter = metrics.register_counter(
"stream_updates", labels=["stream_name"]
)
user_sync_counter = metrics.register_counter("user_sync")
federation_ack_counter = metrics.register_counter("federation_ack")
remove_pusher_counter = metrics.register_counter("remove_pusher")
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
user_ip_cache_counter = metrics.register_counter("user_ip_cache")
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache",
"")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
logger = logging.getLogger(__name__)
@@ -75,7 +75,8 @@ class ReplicationStreamer(object):
# Current connections.
self.connections = []
metrics.register_callback("total_connections", lambda: len(self.connections))
LaterGauge("synapse_replication_tcp_resource_total_connections", "", [],
lambda: len(self.connections))
# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
@@ -87,17 +88,16 @@ class ReplicationStreamer(object):
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
metrics.register_callback(
"connections_per_stream",
LaterGauge(
"synapse_replication_tcp_resource_connections_per_stream", "",
["stream_name"],
lambda: {
(stream_name,): len([
conn for conn in self.connections
if stream_name in conn.replication_streams
])
for stream_name in self.streams_by_name
},
labels=["stream_name"],
)
})
self.federation_sender = None
if not hs.config.send_federation:
@@ -177,7 +177,7 @@ class ReplicationStreamer(object):
logger.info(
"Streaming: %s -> %s", stream.NAME, updates[-1][0]
)
stream_updates_counter.inc_by(len(updates), stream.NAME)
stream_updates_counter.labels(stream.NAME).inc(len(updates))
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
+1 -1
View File
@@ -104,7 +104,7 @@ class HttpTransactionCache(object):
def _cleanup(self):
now = self.clock.time_msec()
for key in self.transactions.keys():
for key in list(self.transactions):
ts = self.transactions[key][1]
if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period
del self.transactions[key]
+5 -7
View File
@@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
yield self.store.find_first_stream_ordering_after_ts(ts)
)
room_event_after_stream_ordering = (
r = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
if room_event_after_stream_ordering:
token = yield self.store.get_topological_token_for_event(
room_event_after_stream_ordering,
)
else:
if not r:
logger.warn(
"[purge] purging events not possible: No event found "
"(received_ts %i => stream_ordering %i)",
@@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
"there is no event to be purged",
errcode=Codes.NOT_FOUND,
)
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
logger.info(
"[purge] purging up to token %d (received_ts %i => "
"[purge] purging up to token %s (received_ts %i => "
"stream_ordering %i)",
token, ts, stream_ordering,
)
@@ -42,6 +42,7 @@ class ConsentServerNotices(object):
self._current_consent_version = hs.config.user_consent_version
self._server_notice_content = hs.config.user_consent_server_notice_content
self._send_to_guests = hs.config.user_consent_server_notice_to_guests
if self._server_notice_content is not None:
if not self._server_notices_manager.is_enabled():
@@ -78,6 +79,10 @@ class ConsentServerNotices(object):
try:
u = yield self._store.get_user_by_id(user_id)
if u["is_guest"] and not self._send_to_guests:
# don't send to guests
return
if u["consent_version"] == self._current_consent_version:
# user has already consented
return
+2 -1
View File
@@ -132,7 +132,8 @@ class StateHandler(object):
defer.returnValue(event)
return
state_map = yield self.store.get_events(state.values(), get_prev_content=False)
state_map = yield self.store.get_events(list(state.values()),
get_prev_content=False)
state = {
key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map
}
+17 -20
View File
@@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
import synapse.metrics
from prometheus_client import Histogram
from twisted.internet import defer
@@ -42,13 +42,10 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
perf_logger = logging.getLogger("synapse.storage.TIME")
sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
metrics = synapse.metrics.get_metrics_for("synapse.storage")
sql_scheduling_timer = metrics.register_distribution("schedule_time")
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
class LoggingTransaction(object):
@@ -113,7 +110,7 @@ class LoggingTransaction(object):
# Don't let logging failures stop SQL from working
pass
start = time.time() * 1000
start = time.time()
try:
return func(
@@ -123,9 +120,9 @@ class LoggingTransaction(object):
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
finally:
msecs = (time.time() * 1000) - start
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
sql_query_timer.inc_by(msecs, sql.split()[0])
secs = time.time() - start
sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
sql_query_timer.labels(sql.split()[0]).observe(secs)
class PerformanceCounters(object):
@@ -135,7 +132,7 @@ class PerformanceCounters(object):
def update(self, key, start_time, end_time=None):
if end_time is None:
end_time = time.time() * 1000
end_time = time.time()
duration = end_time - start_time
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
@@ -225,7 +222,7 @@ class SQLBaseStore(object):
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
logging_context, func, *args, **kwargs):
start = time.time() * 1000
start = time.time()
txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
@@ -285,17 +282,17 @@ class SQLBaseStore(object):
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
end = time.time() * 1000
end = time.time()
duration = end - start
if logging_context is not None:
logging_context.add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f", name, duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end)
sql_txn_timer.inc_by(duration, desc)
sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
@@ -352,13 +349,13 @@ class SQLBaseStore(object):
"""
current_context = LoggingContext.current_context()
start_time = time.time() * 1000
start_time = time.time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context:
sched_duration_ms = time.time() * 1000 - start_time
sql_scheduling_timer.inc_by(sched_duration_ms)
current_context.add_database_scheduled(sched_duration_ms)
sched_duration_sec = time.time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
current_context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
+39 -39
View File
@@ -40,30 +40,30 @@ import synapse.metrics
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from six.moves import range
from six import itervalues, iteritems
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
persist_event_counter = metrics.register_counter("persisted_events")
event_counter = metrics.register_counter(
"persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
["type", "origin_type", "origin_entity"])
# The number of times we are recalculating the current state
state_delta_counter = metrics.register_counter(
"state_delta",
)
state_delta_counter = Counter("synapse_storage_events_state_delta", "")
# The number of times we are recalculating state when there is only a
# single forward extremity
state_delta_single_event_counter = metrics.register_counter(
"state_delta_single_event",
)
state_delta_single_event_counter = Counter(
"synapse_storage_events_state_delta_single_event", "")
# The number of times we are reculating state when we could have resonably
# calculated the delta when we calculated the state for an event we were
# persisting.
state_delta_reuse_delta_counter = metrics.register_counter(
"state_delta_reuse_delta",
)
state_delta_reuse_delta_counter = Counter(
"synapse_storage_events_state_delta_reuse_delta", "")
def encode_json(json_object):
@@ -248,7 +248,7 @@ class EventsStore(EventsWorkerStore):
partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = []
for room_id, evs_ctxs in partitioned.iteritems():
for room_id, evs_ctxs in iteritems(partitioned):
d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs,
backfilled=backfilled,
@@ -333,7 +333,7 @@ class EventsStore(EventsWorkerStore):
chunks = [
events_and_contexts[x:x + 100]
for x in xrange(0, len(events_and_contexts), 100)
for x in range(0, len(events_and_contexts), 100)
]
for chunk in chunks:
@@ -367,7 +367,7 @@ class EventsStore(EventsWorkerStore):
(event, context)
)
for room_id, ev_ctx_rm in events_by_room.iteritems():
for room_id, ev_ctx_rm in iteritems(events_by_room):
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -445,7 +445,7 @@ class EventsStore(EventsWorkerStore):
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
persist_event_counter.inc(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
@@ -460,14 +460,14 @@ class EventsStore(EventsWorkerStore):
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
event_counter.inc(event.type, origin_type, origin_entity)
event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, new_state in current_state_for_room.iteritems():
for room_id, new_state in iteritems(current_state_for_room):
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
for room_id, latest_event_ids in new_forward_extremeties.iteritems():
for room_id, latest_event_ids in iteritems(new_forward_extremeties):
self.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
@@ -644,20 +644,20 @@ class EventsStore(EventsWorkerStore):
"""
existing_state = yield self.get_current_state_ids(room_id)
existing_events = set(existing_state.itervalues())
new_events = set(ev_id for ev_id in current_state.itervalues())
existing_events = set(itervalues(existing_state))
new_events = set(ev_id for ev_id in itervalues(current_state))
changed_events = existing_events ^ new_events
if not changed_events:
return
to_delete = {
key: ev_id for key, ev_id in existing_state.iteritems()
key: ev_id for key, ev_id in iteritems(existing_state)
if ev_id in changed_events
}
events_to_insert = (new_events - existing_events)
to_insert = {
key: ev_id for key, ev_id in current_state.iteritems()
key: ev_id for key, ev_id in iteritems(current_state)
if ev_id in events_to_insert
}
@@ -760,11 +760,11 @@ class EventsStore(EventsWorkerStore):
)
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
[(ev_id,) for ev_id in itervalues(to_delete)],
)
self._simple_insert_many_txn(
@@ -777,7 +777,7 @@ class EventsStore(EventsWorkerStore):
"type": key[0],
"state_key": key[1],
}
for key, ev_id in to_insert.iteritems()
for key, ev_id in iteritems(to_insert)
],
)
@@ -796,7 +796,7 @@ class EventsStore(EventsWorkerStore):
"event_id": ev_id,
"prev_event_id": to_delete.get(key, None),
}
for key, ev_id in state_deltas.iteritems()
for key, ev_id in iteritems(state_deltas)
]
)
@@ -839,7 +839,7 @@ class EventsStore(EventsWorkerStore):
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
for room_id, new_extrem in new_forward_extremities.iteritems():
for room_id, new_extrem in iteritems(new_forward_extremities):
self._simple_delete_txn(
txn,
table="event_forward_extremities",
@@ -857,7 +857,7 @@ class EventsStore(EventsWorkerStore):
"event_id": ev_id,
"room_id": room_id,
}
for room_id, new_extrem in new_forward_extremities.iteritems()
for room_id, new_extrem in iteritems(new_forward_extremities)
for ev_id in new_extrem
],
)
@@ -874,7 +874,7 @@ class EventsStore(EventsWorkerStore):
"event_id": event_id,
"stream_ordering": max_stream_order,
}
for room_id, new_extrem in new_forward_extremities.iteritems()
for room_id, new_extrem in iteritems(new_forward_extremities)
for event_id in new_extrem
]
)
@@ -902,7 +902,7 @@ class EventsStore(EventsWorkerStore):
new_events_and_contexts[event.event_id] = (event, context)
else:
new_events_and_contexts[event.event_id] = (event, context)
return new_events_and_contexts.values()
return list(new_events_and_contexts.values())
def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
"""Update min_depth for each room
@@ -928,7 +928,7 @@ class EventsStore(EventsWorkerStore):
event.depth, depth_updates.get(event.room_id, event.depth)
)
for room_id, depth in depth_updates.iteritems():
for room_id, depth in iteritems(depth_updates):
self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts):
@@ -1312,7 +1312,7 @@ class EventsStore(EventsWorkerStore):
" WHERE e.event_id IN (%s)"
) % (",".join(["?"] * len(ev_map)),)
txn.execute(sql, ev_map.keys())
txn.execute(sql, list(ev_map))
rows = self.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
@@ -1575,7 +1575,7 @@ class EventsStore(EventsWorkerStore):
chunks = [
event_ids[i:i + 100]
for i in xrange(0, len(event_ids), 100)
for i in range(0, len(event_ids), 100)
]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
@@ -1989,7 +1989,7 @@ class EventsStore(EventsWorkerStore):
logger.info("[purge] finding state groups which depend on redundant"
" state groups")
remaining_state_groups = []
for i in xrange(0, len(state_rows), 100):
for i in range(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
# look for state groups whose prev_state_group is one we are about
# to delete
@@ -2045,7 +2045,7 @@ class EventsStore(EventsWorkerStore):
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in curr_state.iteritems()
for key, state_id in iteritems(curr_state)
],
)
+11 -3
View File
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer
import six
import OpenSSL
from signedjson.key import decode_verify_key_bytes
@@ -26,6 +27,13 @@ import logging
logger = logging.getLogger(__name__)
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
class KeyStore(SQLBaseStore):
"""Persistence for signature verification keys and tls X.509 certificates
@@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore):
values={
"from_server": from_server,
"ts_added_ms": time_now_ms,
"tls_certificate": buffer(tls_certificate_bytes),
"tls_certificate": db_binary_type(tls_certificate_bytes),
},
desc="store_server_certificate",
)
@@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore):
values={
"from_server": from_server,
"ts_added_ms": time_now_ms,
"verify_key": buffer(verify_key.encode()),
"verify_key": db_binary_type(verify_key.encode()),
},
)
txn.call_after(
@@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore):
"from_server": from_server,
"ts_added_ms": ts_now_ms,
"ts_valid_until_ms": ts_expires_ms,
"key_json": buffer(key_json_bytes),
"key_json": db_binary_type(key_json_bytes),
},
desc="store_server_keys_json",
)
+1 -1
View File
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 49
SCHEMA_VERSION = 50
dir_path = os.path.abspath(os.path.dirname(__file__))
+2 -5
View File
@@ -16,6 +16,7 @@
from ._base import SQLBaseStore
from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util import batch_iter
from collections import namedtuple
from twisted.internet import defer
@@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore):
" AND user_id IN (%s)"
)
batches = (
presence_states[i:i + 50]
for i in xrange(0, len(presence_states), 50)
)
for states in batches:
for states in batch_iter(presence_states, 50):
args = [stream_id]
args.extend(s.user_id for s in states)
txn.execute(
+30 -29
View File
@@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)
stream_ordering = int(res["stream_ordering"]) if res else None
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
for so, eid in txn:
if int(so) >= stream_ordering:
logger.debug(
"Ignoring new receipt for %s in favour of existing "
"one for later event %s",
event_id, eid,
)
return False
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
@@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
(user_id, room_id, receipt_type)
)
res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)
topological_ordering = int(res["topological_ordering"]) if res else None
stream_ordering = int(res["stream_ordering"]) if res else None
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
if topological_ordering:
for to, so, _ in txn:
if int(to) > topological_ordering:
return False
elif int(to) == topological_ordering and int(so) >= stream_ordering:
return False
self._simple_delete_txn(
txn,
table="receipts_linearized",
@@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
}
)
if receipt_type == "m.read" and topological_ordering:
if receipt_type == "m.read" and stream_ordering is not None:
self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
+37
View File
@@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore):
retcols=[
"name", "password_hash", "is_guest",
"consent_version", "consent_server_notice_sent",
"appservice_id",
],
allow_none=True,
desc="get_user_by_id",
@@ -101,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore,
columns=["user_id", "device_id"],
)
self.register_background_index_update(
"users_creation_ts",
index_name="users_creation_ts",
table="users",
columns=["creation_ts"],
)
# we no longer use refresh tokens, but it's possible that some people
# might have a background update queued to build this index. Just
# clear the background update.
@@ -485,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore,
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
def count_daily_user_type(self):
"""
Counts 1) native non guest users
2) native guests users
3) bridged users
who registered on the homeserver in the past 24 hours
"""
def _count_daily_user_type(txn):
yesterday = int(self._clock.time()) - (60 * 60 * 24)
sql = """
SELECT user_type, COALESCE(count(*), 0) AS count FROM (
SELECT
CASE
WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
END AS user_type
FROM users
WHERE creation_ts > ?
) AS t GROUP BY user_type
"""
results = {'native': 0, 'guest': 0, 'bridged': 0}
txn.execute(sql, (yesterday,))
for row in txn:
results[row[0]] = row[1]
return results
return self.runInteraction("count_daily_user_type", _count_daily_user_type)
@defer.inlineCallbacks
def count_nonbridged_users(self):
def _count_users(txn):
-26
View File
@@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
txn.call_after(self.was_forgotten_at.invalidate_all)
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
txn, self.who_forgot_in_room, (room_id,)
@@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)
@cachedInlineCallbacks(num_args=3)
def was_forgotten_at(self, user_id, room_id, event_id):
"""Returns whether user_id has elected to discard history for room_id at
event_id.
event_id must be a membership event."""
def f(txn):
sql = (
"SELECT"
" forgotten"
" FROM"
" room_memberships"
" WHERE"
" user_id = ?"
" AND"
" room_id = ?"
" AND"
" event_id = ?"
)
txn.execute(sql, (user_id, room_id, event_id))
rows = txn.fetchall()
return rows[0][0]
forgot = yield self.runInteraction("did_forget_membership_at", f)
defer.returnValue(forgot == 1)
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
@@ -0,0 +1,19 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('users_creation_ts', '{}');
+5 -4
View File
@@ -18,13 +18,14 @@ import logging
import re
import simplejson as json
from six import string_types
from twisted.internet import defer
from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [
@@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore):
# skip over it.
continue
if not isinstance(value, basestring):
if not isinstance(value, string_types):
# If the event body, name or topic isn't a string
# then skip over it
continue
@@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore):
"search_msgs", self.cursor_to_dict, sql, *args
)
results = filter(lambda row: row["room_id"] in room_ids, results)
results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results])
@@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore):
"search_rooms", self.cursor_to_dict, sql, *args
)
results = filter(lambda row: row["room_id"] in room_ids, results)
results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results])
+10 -2
View File
@@ -14,6 +14,7 @@
# limitations under the License.
from twisted.internet import defer
import six
from ._base import SQLBaseStore
@@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
class SignatureWorkerStore(SQLBaseStore):
@cached()
@@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore):
for e_id, h in hashes.items()
}
defer.returnValue(hashes.items())
defer.returnValue(list(hashes.items()))
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
@@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore):
vals.append({
"event_id": event.event_id,
"algorithm": ref_alg,
"hash": buffer(ref_hash_bytes),
"hash": db_binary_type(ref_hash_bytes),
})
self._simple_insert_many_txn(
+28 -25
View File
@@ -16,11 +16,14 @@
from collections import namedtuple
import logging
from six import iteritems, itervalues
from six.moves import range
from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -54,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids,
)
groups = set(event_to_groups.itervalues())
groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups)
defer.returnValue(group_to_state)
@@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore):
state_event_map = yield self.get_events(
[
ev_id for group_ids in group_to_ids.itervalues()
for ev_id in group_ids.itervalues()
ev_id for group_ids in itervalues(group_to_ids)
for ev_id in itervalues(group_ids)
],
get_prev_content=False
)
defer.returnValue({
group: [
state_event_map[v] for v in event_id_map.itervalues()
state_event_map[v] for v in itervalues(event_id_map)
if v in state_event_map
]
for group, event_id_map in group_to_ids.iteritems()
for group, event_id_map in iteritems(group_to_ids)
})
@defer.inlineCallbacks
@@ -186,7 +189,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"""
results = {}
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)]
for chunk in chunks:
res = yield self.runInteraction(
"_get_state_groups_from_groups",
@@ -269,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore):
for typ in types:
if typ[1] is None:
where_clauses.append("(type = ?)")
where_args.extend(typ[0])
where_args.append(typ[0])
wildcard_types = True
else:
where_clauses.append("(type = ? AND state_key = ?)")
@@ -347,21 +350,21 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids,
)
groups = set(event_to_groups.itervalues())
groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups, types)
state_event_map = yield self.get_events(
[ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
[ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
get_prev_content=False
)
event_to_state = {
event_id: {
k: state_event_map[v]
for k, v in group_to_state[group].iteritems()
for k, v in iteritems(group_to_state[group])
if v in state_event_map
}
for event_id, group in event_to_groups.iteritems()
for event_id, group in iteritems(event_to_groups)
}
defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -384,12 +387,12 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids,
)
groups = set(event_to_groups.itervalues())
groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups, types)
event_to_state = {
event_id: group_to_state[group]
for event_id, group in event_to_groups.iteritems()
for event_id, group in iteritems(event_to_groups)
}
defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -503,7 +506,7 @@ class StateGroupWorkerStore(SQLBaseStore):
got_all = is_all or not missing_types
return {
k: v for k, v in state_dict_ids.iteritems()
k: v for k, v in iteritems(state_dict_ids)
if include(k[0], k[1])
}, missing_types, got_all
@@ -562,12 +565,12 @@ class StateGroupWorkerStore(SQLBaseStore):
# Now we want to update the cache with all the things we fetched
# from the database.
for group, group_state_dict in group_to_state_dict.iteritems():
for group, group_state_dict in iteritems(group_to_state_dict):
state_dict = results[group]
state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
for k, v in group_state_dict.iteritems()
for k, v in iteritems(group_state_dict)
)
self._state_group_cache.update(
@@ -654,7 +657,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in delta_ids.iteritems()
for key, state_id in iteritems(delta_ids)
],
)
else:
@@ -669,7 +672,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in current_state_ids.iteritems()
for key, state_id in iteritems(current_state_ids)
],
)
@@ -794,11 +797,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
"state_group": state_group_id,
"event_id": event_id,
}
for event_id, state_group_id in state_groups.iteritems()
for event_id, state_group_id in iteritems(state_groups)
],
)
for event_id, state_group_id in state_groups.iteritems():
for event_id, state_group_id in iteritems(state_groups):
txn.call_after(
self._get_state_group_for_event.prefill,
(event_id,), state_group_id
@@ -826,7 +829,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
def reindex_txn(txn):
new_last_state_group = last_state_group
for count in xrange(batch_size):
for count in range(batch_size):
txn.execute(
"SELECT id, room_id FROM state_groups"
" WHERE ? < id AND id <= ?"
@@ -884,7 +887,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
# of keys
delta_state = {
key: value for key, value in curr_state.iteritems()
key: value for key, value in iteritems(curr_state)
if prev_state.get(key, None) != value
}
@@ -924,7 +927,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in delta_state.iteritems()
for key, state_id in iteritems(delta_state)
],
)
+9 -1
View File
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
import six
from canonicaljson import encode_canonical_json
@@ -25,6 +26,13 @@ from collections import namedtuple
import logging
import simplejson as json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
logger = logging.getLogger(__name__)
@@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore):
"transaction_id": transaction_id,
"origin": origin,
"response_code": code,
"response_json": buffer(encode_canonical_json(response_dict)),
"response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(),
},
or_ignore=True,
+5 -3
View File
@@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
from six import iteritems
import re
import logging
@@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore):
user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
profile.display_name,
)
for user_id, profile in users_with_profile.iteritems()
for user_id, profile in iteritems(users_with_profile)
)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = """
@@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore):
user_id,
"%s %s" % (user_id, p.display_name,) if p.display_name else user_id
)
for user_id, p in users_with_profile.iteritems()
for user_id, p in iteritems(users_with_profile)
)
else:
# This should be unreachable.
@@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore):
"display_name": profile.display_name,
"avatar_url": profile.avatar_url,
}
for user_id, profile in users_with_profile.iteritems()
for user_id, profile in iteritems(users_with_profile)
]
)
for user_id in users_with_profile:
+70 -14
View File
@@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.metrics
from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
import os
from six.moves import intern
@@ -21,23 +22,78 @@ import six
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
def get_cache_factor_for(cache_name):
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
factor = os.environ.get(env_var)
if factor:
return float(factor)
return CACHE_SIZE_FACTOR
caches_by_name = {}
# cache_counter = metrics.register_cache(
# "cache",
# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
# labels=["name"],
# )
collectors_by_name = {}
cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
response_cache_evicted = Gauge(
"synapse_util_caches_response_cache:evicted_size", "", ["name"]
)
response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
def register_cache(name, cache):
caches_by_name[name] = cache
return metrics.register_cache(
"cache",
lambda: len(cache),
name,
)
def register_cache(cache_type, cache_name, cache):
# Check if the metric is already registered. Unregister it, if so.
# This usually happens during tests, as at runtime these caches are
# effectively singletons.
metric_name = "cache_%s_%s" % (cache_type, cache_name)
if metric_name in collectors_by_name.keys():
REGISTRY.unregister(collectors_by_name[metric_name])
class CacheMetric(object):
hits = 0
misses = 0
evicted_size = 0
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def describe(self):
return []
def collect(self):
if cache_type == "response_cache":
response_cache_size.labels(cache_name).set(len(cache))
response_cache_hits.labels(cache_name).set(self.hits)
response_cache_evicted.labels(cache_name).set(self.evicted_size)
response_cache_total.labels(cache_name).set(self.hits + self.misses)
else:
cache_size.labels(cache_name).set(len(cache))
cache_hits.labels(cache_name).set(self.hits)
cache_evicted.labels(cache_name).set(self.evicted_size)
cache_total.labels(cache_name).set(self.hits + self.misses)
yield GaugeMetricFamily("__unused", "")
metric = CacheMetric()
REGISTRY.register(metric)
caches_by_name[cache_name] = cache
collectors_by_name[metric_name] = metric
return metric
KNOWN_KEYS = {
+12 -8
View File
@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -31,6 +31,9 @@ import functools
import inspect
import threading
from six import string_types, itervalues
import six
logger = logging.getLogger(__name__)
@@ -80,7 +83,7 @@ class Cache(object):
self.name = name
self.keylen = keylen
self.thread = None
self.metrics = register_cache(name, self.cache)
self.metrics = register_cache("cache", name, self.cache)
def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)
@@ -205,7 +208,7 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
self.cache.clear()
for entry in self._pending_deferred_cache.itervalues():
for entry in itervalues(self._pending_deferred_cache):
entry.invalidate()
self._pending_deferred_cache.clear()
@@ -310,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree
@@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase):
ret.addErrback(onErr)
# If our cache_key is a string, try to convert to ascii to save
# a bit of space in large caches
if isinstance(cache_key, basestring):
# If our cache_key is a string on py2, try to convert to ascii
# to save a bit of space in large caches. Py3 does this
# internally automatically.
if six.PY2 and isinstance(cache_key, string_types):
cache_key = to_ascii(cache_key)
result_d = ObservableDeferred(ret, consumeErrors=True)
@@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
return results
return logcontext.make_deferred_yieldable(defer.gatherResults(
cached_defers.values(),
list(cached_defers.values()),
consumeErrors=True,
).addCallback(update_results_dict).addErrback(
unwrapFirstError
+1 -1
View File
@@ -55,7 +55,7 @@ class DictionaryCache(object):
__slots__ = []
self.sentinel = Sentinel()
self.metrics = register_cache(name, self.cache)
self.metrics = register_cache("dictionary", name, self.cache)
def check_thread(self):
expected_thread = self.thread
+2 -2
View File
@@ -52,12 +52,12 @@ class ExpiringCache(object):
self._cache = OrderedDict()
self.metrics = register_cache(cache_name, self)
self.iterable = iterable
self._size_estimate = 0
self.metrics = register_cache("expiring", cache_name, self)
def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
+6 -5
View File
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches import register_cache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -38,15 +38,16 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000.
self._name = name
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
cache_name=name,
self._metrics = register_cache(
"response_cache", name, self
)
def size(self):
return len(self.pending_result_cache)
def __len__(self):
return self.size()
def get(self, key):
"""Look up the given key.
+31 -26
View File
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
from synapse.util import caches
from blist import sorteddict
from sortedcontainers import SortedDict
import logging
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
self._max_size = int(max_size * CACHE_SIZE_FACTOR)
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
self._cache = sorteddict()
self._cache = SortedDict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = register_cache(self.name, self._cache)
self.metrics = caches.register_cache("cache", self.name, self._cache)
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
return False
def get_entities_changed(self, entities, stream_pos):
"""Returns subset of entities that have had new things since the
given position. If the position is too old it will just return the given list.
"""
Returns subset of entities that have had new things since the given
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
not_known_entities = set(entities) - set(self._entity_to_key)
result = set(
self._cache[k] for k in keys[i:]
).intersection(entities)
result = (
set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
.intersection(entities)
.union(not_known_entities)
)
self.metrics.inc_hits()
else:
result = entities
result = set(entities)
self.metrics.inc_misses()
return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
"""
assert type(stream_pos) is int
if not self._cache:
# If we have no cache, nothing can have changed.
return False
if stream_pos >= self._earliest_known_stream_pos:
self.metrics.inc_hits()
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
return i < len(keys)
return self._cache.bisect_right(stream_pos) < len(self._cache)
else:
self.metrics.inc_misses()
return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
return [self._cache[k] for k in keys[i:]]
return self._cache.values()[self._cache.bisect_right(stream_pos) :]
else:
return None
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
self._entity_to_key[entity] = stream_pos
while len(self._cache) > self._max_size:
k, r = self._cache.popitem()
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos,
)
self._entity_to_key.pop(r, None)
def get_max_pos_of_last_change(self, entity):
+4 -2
View File
@@ -1,3 +1,5 @@
from six import itervalues
SENTINEL = object()
@@ -49,7 +51,7 @@ class TreeCache(object):
if popped is SENTINEL:
return default
node_and_keys = zip(nodes, key)
node_and_keys = list(zip(nodes, key))
node_and_keys.reverse()
node_and_keys.append((self.root, None))
@@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d):
can contain dicts.
"""
if isinstance(d, dict):
for value_d in d.itervalues():
for value_d in itervalues(d):
for value in iterate_tree_cache_entry(value_d):
yield value
else:

Some files were not shown because too many files have changed in this diff Show More