1
0

Compare commits

..

165 Commits

Author SHA1 Message Date
Andrew Morgan 20c48d50ef Add debug logging 2020-01-13 11:51:20 +00:00
Erik Johnston 9dfcf47e9b Fixup changelog 2020-01-09 11:40:27 +00:00
Erik Johnston 24b2c940fb 1.8.0 2020-01-09 11:39:29 +00:00
Manuel Stahl 7caaa29daa Fix GET request on /_synapse/admin/v2/users endpoint (#6563)
Fixes #6552
2020-01-08 13:26:40 +00:00
Richard van der Hoff 573fee759c Back out ill-advised notary server hackery (#6657)
This was ill-advised. We can't modify verify_keys here, because the response
object has already been signed by the requested key.

Furthermore, it's somewhat unnecessary because existing versions of Synapse
(which get upset that the notary key isn't present in verify_keys) will fall
back to a direct fetch via `/key/v2/server`.

Also: more tests for fetching keys via perspectives: it would be nice if we actually tested when our fetcher can't talk to our notary impl.
2020-01-08 13:24:10 +00:00
Erik Johnston 235d977e1f Fixup changelog 2020-01-07 14:45:54 +00:00
Erik Johnston 2f1c175936 Fixup changelog 2020-01-07 14:39:35 +00:00
Erik Johnston 06958d5bb1 Fixup changelog 2020-01-07 14:38:40 +00:00
Erik Johnston 7f0e706ebf 1.8.0rc1 2020-01-07 14:31:13 +00:00
Brendan Abolivier 0ab5853ec9 Merge pull request #6652 from matrix-org/babolivier/depth_missing_events
Fix conditions failing if min_depth = 0
2020-01-07 15:22:07 +01:00
Richard van der Hoff 85db7f73be Add a background update to clear tombstoned rooms from the directory (#6648)
* Add a background update to clear tombstoned rooms from the directory

* use the ABC metaclass
2020-01-07 14:18:43 +00:00
Richard van der Hoff 9824a39d80 Async/await for background updates (#6647)
so that bg update routines can be async
2020-01-07 14:12:42 +00:00
Richard van der Hoff d20c346544 port BackgroundUpdateTestCase to HomeserverTestCase (#6653) 2020-01-07 14:09:07 +00:00
Richard van der Hoff 1ff5491117 Merge pull request #6645 from matrix-org/rav/fix_synchrotron_error
Fix exceptions in the synchrotron worker log when events are rejected.
2020-01-07 14:02:14 +00:00
Brendan Abolivier 1f2a5923d4 Changelog 2020-01-07 13:12:17 +00:00
Brendan Abolivier cd428a93e2 Fix conditions failing if min_depth = 0
This could result in Synapse not fetching prev_events for new events in the room if it has missed some events.
2020-01-07 12:08:58 +00:00
Richard van der Hoff 1807db5e73 Merge pull request #6629 from matrix-org/rav/kill_event_reference_hashes
Remove a bunch of unused code from event creation
2020-01-06 17:46:31 +00:00
Richard van der Hoff 055e6fbaa2 changelog 2020-01-06 17:17:40 +00:00
Richard van der Hoff 26c5d3d398 Fix exceptions in log when rejected event is replicated 2020-01-06 17:16:28 +00:00
Richard van der Hoff c74de81bfc async/await for SyncReplicationHandler.process_and_notify 2020-01-06 17:14:28 +00:00
Richard van der Hoff bc42da4ab8 Clarify documentation on get_event* methods
Make it clearer how they behave in the face of rejected and/or missing events.
2020-01-06 17:12:06 +00:00
Richard van der Hoff ba897a7590 Fix some test failures when frozen_dicts are enabled (#6642)
Fixes #4026
2020-01-06 15:22:46 +00:00
Erik Johnston 9f6c1befbb Add experimental 'databases' config (#6580) 2020-01-06 14:44:01 +00:00
Richard van der Hoff ab4b4ee6a7 Fix an error which was thrown by the PresenceHandler _on_shutdown handler. (#6640) 2020-01-06 14:34:02 +00:00
Richard van der Hoff 550b2946d8 changelog 2020-01-06 13:45:33 +00:00
Richard van der Hoff a7d2e5b37f Remove unused get_latest_event_ids_and_hashes_in_room 2020-01-06 13:45:33 +00:00
Richard van der Hoff dc41fbf0dd Remove unused get_prev_events_and_hashes_for_room 2020-01-06 13:45:33 +00:00
Richard van der Hoff 38e0829a4c Remove unused hashes and depths from _update_membership params 2020-01-06 13:45:33 +00:00
Richard van der Hoff 3bef62488e Remove unused hashes and depths from create_event params 2020-01-06 13:45:33 +00:00
Richard van der Hoff 66ca914dc0 Remove unused hashes and depths from create_new_client_event params 2020-01-06 13:45:33 +00:00
Richard van der Hoff 15720092ac replace get_prev_events_and_hashes_for_room with get_prev_events_for_room in create_new_client_event 2020-01-06 13:45:33 +00:00
Richard van der Hoff 5a04781643 rename get_prev_events_for_room to get_prev_events_and_hashes_for_room
... to make way for a new method which just returns the event ids
2020-01-06 13:45:33 +00:00
Richard van der Hoff 4b36b482e0 Fix exception when fetching notary server's old keys (#6625)
Lift the restriction that *all* the keys used for signing v2 key responses be
present in verify_keys.

Fixes #6596.
2020-01-06 12:33:56 +00:00
Richard van der Hoff 18674eebb1 Workaround for error when fetching notary's own key (#6620)
* Kill off redundant SynapseRequestFactory

We already get the Site via the Channel, so there's no need for a dedicated
RequestFactory: we can just use the right constructor.

* Workaround for error when fetching notary's own key

As a notary server, when we return our own keys, include all of our signing
keys in verify_keys.

This is a workaround for #6596.
2020-01-06 12:28:58 +00:00
Andrew Morgan 01c3c6c929 Fix power levels being incorrectly set in old and new rooms after a room upgrade (#6633)
Modify a copy of an upgraded room's PL before sending to the new room
2020-01-06 09:53:07 +00:00
Richard van der Hoff 08815566bc Automate generation of the sample and debian log configs (#6627) 2020-01-03 17:14:00 +00:00
Richard van der Hoff e484101306 Raise an error if someone tries to use the log_file config option (#6626)
This has caused some confusion for people who didn't notice it going away.
2020-01-03 17:11:29 +00:00
Richard van der Hoff 98247c4a0e Remove unused, undocumented "content repo" resource (#6628)
This looks like it got half-killed back in #888.

Fixes #6567.
2020-01-03 17:10:52 +00:00
Richard van der Hoff b6b57ecb4e Kill off redundant SynapseRequestFactory (#6619)
We already get the Site via the Channel, so there's no need for a dedicated
RequestFactory: we can just use the right constructor.
2020-01-03 14:19:48 +00:00
Richard van der Hoff 6964ea095b Reduce the reconnect time when replication fails. (#6617) 2020-01-03 14:19:09 +00:00
ewaf1 0495097a7f Added the section 'Configuration' in /docs/turn-howto.md (#6614)
put the 2nd part of the "source installation"-section into a new section, because it also applies to Debian packages
2020-01-02 10:41:30 +00:00
Aaron Raimist 32779b59fa Reword sections of federate.md that explained delegation at time of Synapse 1.0 transition (#6601)
* Remove sections of federate.md explaining delegation at time of Synapse 1.0 transition

Signed-off-by: Aaron Raimist <aaron@raim.ist>

* Add changelog

Signed-off-by: Aaron Raimist <aaron@raim.ist>
2020-01-02 10:28:20 +00:00
Richard van der Hoff dc96943d51 Merge branch 'master' into develop 2019-12-31 11:01:06 +00:00
Richard van der Hoff 77661ce81a 1.7.3 2019-12-31 10:45:12 +00:00
Richard van der Hoff 92eac974b9 Hacks to work around #6605 (#6608)
When we have an event which refers to non-existent auth_events, ignore said events rather than exploding in a ball of fire.

Fixes #6605.
2019-12-31 10:41:44 +00:00
Richard van der Hoff f03c877b32 sample log config
TODO: automate generation of this
2019-12-24 17:39:58 +00:00
dopple b2db382841 Update reverse proxy file name (#6590)
s/reverse_proxy.rst/reverse_proxy.md/
2019-12-22 22:17:09 +00:00
dopple 7c6b853558 Update reverse proxy file name (#6590)
s/reverse_proxy.rst/reverse_proxy.md/
2019-12-22 22:16:56 +00:00
Richard van der Hoff 56ad230efa Merge branch 'master' into develop 2019-12-20 11:11:34 +00:00
Richard van der Hoff 138708608b Merge tag 'v1.7.2'
Synapse 1.7.2 (2019-12-20)
==========================

This release fixes some regressions introduced in Synapse 1.7.0 and 1.7.1.

Bugfixes
--------

- Fix a regression introduced in Synapse 1.7.1 which caused errors when attempting to backfill rooms over federation. ([\#6576](https://github.com/matrix-org/synapse/issues/6576))
- Fix a bug introduced in Synapse 1.7.0 which caused an error on startup when upgrading from versions before 1.3.0. ([\#6578](https://github.com/matrix-org/synapse/issues/6578))
2019-12-20 11:09:59 +00:00
Richard van der Hoff 29794c6bc8 1.7.2 2019-12-20 10:58:07 +00:00
Erik Johnston 75d8f26ac8 Split state groups into a separate data store (#6296) 2019-12-20 10:48:24 +00:00
Richard van der Hoff 4caab0e95e Backport fixes to sqlite upgrade from develop (#6578)
Only run prepare_database on connection for in-memory databases.

Fixes #6569.
2019-12-20 10:46:46 +00:00
Erik Johnston fa780e9721 Change EventContext to use the Storage class (#6564) 2019-12-20 10:32:02 +00:00
Richard van der Hoff 03d3792f3c Fix exceptions when attempting to backfill (#6576)
Fixes #6575
2019-12-20 09:55:45 +00:00
Erik Johnston 0b5dbadd96 Explode on duplicate delta file names. (#6565) 2019-12-19 15:07:37 +00:00
Erik Johnston 3d46124ad0 Port some admin handlers to async/await (#6559) 2019-12-19 15:07:28 +00:00
Richard van der Hoff bca30cefee Improve diagnostics on database upgrade failure (#6570)
`Failed to upgrade database` is not helpful, and it's unlikely that UPGRADE.rst
has anything useful.
2019-12-19 14:53:15 +00:00
Richard van der Hoff 0b794cbd7b Fix sdnotify with acme enabled (#6571)
If acme was enabled, the sdnotify startup hook would never be run because we
would try to add it to a hook which had already fired.

There's no need to delay it: we can sdnotify as soon as we've started the
listeners.
2019-12-19 14:52:52 +00:00
Richard van der Hoff b95b762560 Add an export_signing_key script (#6546)
I want to do some key rotation, and it is silly that we don't have a way to do
this.
2019-12-19 11:11:14 +00:00
Richard van der Hoff d6752ce5da Clean up startup for the pusher (#6558)
* Remove redundant python2 support code

`str.decode()` doesn't exist on python3, so presumably this code was doing
nothing

* Filter out pushers with corrupt data

When we get a row with unparsable json, drop the row, rather than returning a
row with null `data`, which will then cause an explosion later on.

* Improve logging when we can't start a pusher

Log the ID to help us understand the problem

* Make email pusher setup more robust

We know we'll have a `data` member, since that comes from the database. What we
*don't* know is if that is a dict, and if that has a `brand` member, and if
that member is a string.
2019-12-18 14:26:58 +00:00
Andrew Morgan 7963ca83cb Add delta file to fix missing default table data (#6555) 2019-12-18 11:13:33 +00:00
Erik Johnston 2284eb3a53 Add database config class (#6513)
This encapsulates config for a given database and is the way to get new
connections.
2019-12-18 10:45:12 +00:00
Richard van der Hoff 91ccfe9f37 Merge branch 'master' into develop 2019-12-18 10:16:39 +00:00
Richard van der Hoff 4154637834 Merge tag 'v1.7.1'
Synapse 1.7.1 (2019-12-18)
==========================

This release includes several security fixes as well as a fix to a bug exposed by the security fixes. Administrators are encouraged to upgrade as soon as possible.

Security updates
----------------

- Fix a bug which could cause room events to be incorrectly authorized using events from a different room. ([\#6501](https://github.com/matrix-org/synapse/issues/6501), [\#6503](https://github.com/matrix-org/synapse/issues/6503), [\#6521](https://github.com/matrix-org/synapse/issues/6521), [\#6524](https://github.com/matrix-org/synapse/issues/6524), [\#6530](https://github.com/matrix-org/synapse/issues/6530), [\#6531](https://github.com/matrix-org/synapse/issues/6531))
- Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event. ([\#6553](https://github.com/matrix-org/synapse/issues/6553))
- Fix a cause of state resets in room versions 2 onwards. ([\#6556](https://github.com/matrix-org/synapse/issues/6556), [\#6560](https://github.com/matrix-org/synapse/issues/6560))

Bugfixes
--------

- Fix a bug which could cause the federation server to incorrectly return errors when handling certain obscure event graphs. ([\#6526](https://github.com/matrix-org/synapse/issues/6526), [\#6527](https://github.com/matrix-org/synapse/issues/6527))
2019-12-18 10:14:49 +00:00
Richard van der Hoff 6e8f8e14f2 Merge release-v1.7.1 into develop 2019-12-18 09:51:51 +00:00
Richard van der Hoff e156c86a7f too many parens 2019-12-18 09:40:03 +00:00
Richard van der Hoff d656e91fc2 1.7.1 2019-12-18 09:38:08 +00:00
Erik Johnston 5029422530 Fix bug where we added duplicate event IDs as auth_events (#6560) 2019-12-17 15:06:08 +00:00
Erik Johnston 02553901ce Remove unused get_pagination_rows methods. (#6557)
Remove unused get_pagination_rows methods
2019-12-17 11:44:32 +00:00
Erik Johnston 5ca2cfadc3 Add auth events as per spec. (#6556)
Previously we tried to be clever and filter out some unnecessary event
IDs to keep the auth chain small, but that had some annoying
interactions with state res v2 so we stop doing that for now.
2019-12-16 17:05:01 +00:00
Erik Johnston 3fbe5b7ec3 Add auth events as per spec. (#6556)
Previously we tried to be clever and filter out some unnecessary event
IDs to keep the auth chain small, but that had some annoying
interactions with state res v2 so we stop doing that for now.
2019-12-16 16:59:32 +00:00
Brendan Abolivier 6316530366 Merge pull request #6553 from matrix-org/babolivier/fix-context-filter
Use the filtered version of an event when responding to /context requests for that event
2019-12-16 16:24:10 +00:00
Will Hunt bfb95654c9 Add option to allow profile queries without sharing a room (#6523) 2019-12-16 16:11:55 +00:00
Brendan Abolivier 7f8d8fd2e2 Merge branch 'babolivier/fix-context-filter' of github.com:matrix-org/synapse into babolivier/fix-context-filter 2019-12-16 16:01:32 +00:00
Brendan Abolivier a820069549 Incorporate review 2019-12-16 16:00:18 +00:00
Brendan Abolivier 284e690aa0 Update changelog.d/6553.bugfix
Co-Authored-By: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2019-12-16 14:56:05 +00:00
Brendan Abolivier a29420f9f4 Lint 2019-12-16 14:55:50 +00:00
Brendan Abolivier 596dd9914d Add test case 2019-12-16 14:53:21 +00:00
Richard van der Hoff bbb75ff6ee Exclude rejected state events when calculating state at backwards extrems (#6527)
This fixes a weird bug where, if you were determined enough, you could end up with a rejected event forming part of the state at a backwards-extremity. Authing that backwards extrem would then lead to us trying to pull the rejected event from the db (with allow_rejected=False), which would fail with a 404.
2019-12-16 14:07:29 +00:00
Richard van der Hoff ff773ff724 Persist auth/state events at backwards extremities when we fetch them (#6526)
The main point here is to make sure that the state returned by _get_state_in_room has been authed before we try to use it as state in the room.
2019-12-16 14:07:11 +00:00
Richard van der Hoff 83895316d4 sanity-checking for events used in state res (#6531)
When we perform state resolution, check that all of the events involved are in
the right room.
2019-12-16 13:53:53 +00:00
Richard van der Hoff 6577f2d887 Sanity-check room ids in event auth (#6530)
When we do an event auth operation, check that all of the events involved are
in the right room.
2019-12-16 13:47:34 +00:00
Richard van der Hoff 35bbe4ca79 Check the room_id of events when fetching room state/auth (#6524)
When we request the state/auth_events to populate a backwards extremity (on
backfill or in the case of missing events in a transaction push), we should
check that the returned events are in the right room rather than blindly using
them in the room state or auth chain.

Given that _get_events_from_store_or_dest takes a room_id, it seems clear that
it should be sanity-checking the room_id of the requested events, so let's do
it there.
2019-12-16 13:47:07 +00:00
Richard van der Hoff 20d5ba16e6 Add include_event_in_state to _get_state_for_room (#6521)
Make it return the state *after* the requested event, rather than the one
before it. This is a bit easier and requires fewer calls to
get_events_from_store_or_dest.
2019-12-16 13:26:12 +00:00
Richard van der Hoff be294d6fde Move get_state methods into FederationHandler (#6503)
This is a non-functional refactor as a precursor to some other work.
2019-12-16 13:20:21 +00:00
Richard van der Hoff 4c7b1bb6cc Refactor get_events_from_store_or_dest to return a dict (#6501)
There was a bunch of unnecessary conversion back and forth between dict and
list going on here. We can simplify a bunch of the code.
2019-12-16 13:19:32 +00:00
Richard van der Hoff 6920d88892 Exclude rejected state events when calculating state at backwards extrems (#6527)
This fixes a weird bug where, if you were determined enough, you could end up with a rejected event forming part of the state at a backwards-extremity. Authing that backwards extrem would then lead to us trying to pull the rejected event from the db (with allow_rejected=False), which would fail with a 404.
2019-12-16 13:14:37 +00:00
Richard van der Hoff bc7de87650 Persist auth/state events at backwards extremities when we fetch them (#6526)
The main point here is to make sure that the state returned by _get_state_in_room has been authed before we try to use it as state in the room.
2019-12-16 12:26:28 +00:00
Brendan Abolivier 8b9f5c21c3 Changelog 2019-12-16 12:19:35 +00:00
Brendan Abolivier ac87ddb242 Update the documentation of the filtering function 2019-12-16 12:15:37 +00:00
Brendan Abolivier 487f1bb49d Use the filtered version of an event when responding to /context requests for that event
Sometimes the filtering function can return a pruned version of an event (on top of either the event itself or an empty list), if it thinks the user should be able to see that there's an event there but not the content of that event. Therefore, the previous logic of 'if filtered is empty then we can use the event we retrieved from the database' is flawed, and we should use the event returned by the filtering function.
2019-12-16 12:14:12 +00:00
Werner Sembach 9d173b312c Automatically delete empty groups/communities (#6453)
Signed-off-by: Werner Sembach <werner.sembach@fau.de>
2019-12-16 12:12:40 +00:00
Andrew Morgan 0b90fc6ed2 Document Shutdown Room admin API (#6541) 2019-12-13 15:28:48 +00:00
Richard van der Hoff 1da15f05f5 sanity-checking for events used in state res (#6531)
When we perform state resolution, check that all of the events involved are in
the right room.
2019-12-13 12:55:32 +00:00
Richard van der Hoff 971a0702b5 Sanity-check room ids in event auth (#6530)
When we do an event auth operation, check that all of the events involved are
in the right room.
2019-12-13 11:44:41 +00:00
Erik Johnston 5cadbd9ebb Merge pull request #6537 from matrix-org/erikj/bump_mypy_version
Bump mypy version
2019-12-13 11:32:53 +00:00
Erik Johnston f46e05d053 Merge branch 'master' into develop 2019-12-13 10:55:47 +00:00
Erik Johnston bee1982d17 Merge tag 'v1.7.0'
Synapse 1.7.0 (2019-12-13)
==========================

This release changes the default settings so that only local authenticated users can query the server's room directory. See the [upgrade notes](UPGRADE.rst#upgrading-to-v170) for details.

Support for SQLite versions before 3.11 is now deprecated. A future release will refuse to start if used with an SQLite version before 3.11.

Administrators are reminded that SQLite should not be used for production instances. Instructions for migrating to Postgres are available [here](docs/postgres.md). A future release of synapse will, by default, disable federation for servers using SQLite.

No significant changes since 1.7.0rc2.

Synapse 1.7.0rc2 (2019-12-11)
=============================

Bugfixes
--------

- Fix incorrect error message for invalid requests when setting user's avatar URL. ([\#6497](https://github.com/matrix-org/synapse/issues/6497))
- Fix support for SQLite 3.7. ([\#6499](https://github.com/matrix-org/synapse/issues/6499))
- Fix regression where sending email push would not work when using a pusher worker. ([\#6507](https://github.com/matrix-org/synapse/issues/6507), [\#6509](https://github.com/matrix-org/synapse/issues/6509))

Synapse 1.7.0rc1 (2019-12-09)
=============================

Features
--------

- Implement per-room message retention policies. ([\#5815](https://github.com/matrix-org/synapse/issues/5815), [\#6436](https://github.com/matrix-org/synapse/issues/6436))
- Add etag and count fields to key backup endpoints to help clients guess if there are new keys. ([\#5858](https://github.com/matrix-org/synapse/issues/5858))
- Add `/admin/v2/users` endpoint with pagination. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5925](https://github.com/matrix-org/synapse/issues/5925))
- Require User-Interactive Authentication for `/account/3pid/add`, meaning the user's password will be required to add a third-party ID to their account. ([\#6119](https://github.com/matrix-org/synapse/issues/6119))
- Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314. ([\#6176](https://github.com/matrix-org/synapse/issues/6176))
- Configure privacy-preserving settings by default for the room directory. ([\#6355](https://github.com/matrix-org/synapse/issues/6355))
- Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). ([\#6409](https://github.com/matrix-org/synapse/issues/6409))
- Add support for [MSC 2367](https://github.com/matrix-org/matrix-doc/pull/2367), which allows specifying a reason on all membership events. ([\#6434](https://github.com/matrix-org/synapse/issues/6434))

Bugfixes
--------

- Transfer non-standard power levels on room upgrade. ([\#6237](https://github.com/matrix-org/synapse/issues/6237))
- Fix error from the Pillow library when uploading RGBA images. ([\#6241](https://github.com/matrix-org/synapse/issues/6241))
- Correctly apply the event filter to the `state`, `events_before` and `events_after` fields in the response to `/context` requests. ([\#6329](https://github.com/matrix-org/synapse/issues/6329))
- Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices. ([\#6332](https://github.com/matrix-org/synapse/issues/6332))
- Prevent account data syncs getting lost across TCP replication. ([\#6333](https://github.com/matrix-org/synapse/issues/6333))
- Fix bug: TypeError in `register_user()` while using LDAP auth module. ([\#6406](https://github.com/matrix-org/synapse/issues/6406))
- Fix an intermittent exception when handling read-receipts. ([\#6408](https://github.com/matrix-org/synapse/issues/6408))
- Fix broken guest registration when there are existing blocks of numeric user IDs. ([\#6420](https://github.com/matrix-org/synapse/issues/6420))
- Fix startup error when http proxy is defined. ([\#6421](https://github.com/matrix-org/synapse/issues/6421))
- Fix error when using synapse_port_db on a vanilla synapse db. ([\#6449](https://github.com/matrix-org/synapse/issues/6449))
- Fix uploading multiple cross signing signatures for the same user. ([\#6451](https://github.com/matrix-org/synapse/issues/6451))
- Fix bug which lead to exceptions being thrown in a loop when a cross-signed device is deleted. ([\#6462](https://github.com/matrix-org/synapse/issues/6462))
- Fix `synapse_port_db` not exiting with a 0 code if something went wrong during the port process. ([\#6470](https://github.com/matrix-org/synapse/issues/6470))
- Improve sanity-checking when receiving events over federation. ([\#6472](https://github.com/matrix-org/synapse/issues/6472))
- Fix inaccurate per-block Prometheus metrics. ([\#6491](https://github.com/matrix-org/synapse/issues/6491))
- Fix small performance regression for sending invites. ([\#6493](https://github.com/matrix-org/synapse/issues/6493))
- Back out cross-signing code added in Synapse 1.5.0, which caused a performance regression. ([\#6494](https://github.com/matrix-org/synapse/issues/6494))

Improved Documentation
----------------------

- Update documentation and variables in user contributed systemd reference file. ([\#6369](https://github.com/matrix-org/synapse/issues/6369), [\#6490](https://github.com/matrix-org/synapse/issues/6490))
- Fix link in the user directory documentation. ([\#6388](https://github.com/matrix-org/synapse/issues/6388))
- Add build instructions to the docker readme. ([\#6390](https://github.com/matrix-org/synapse/issues/6390))
- Switch Ubuntu package install recommendation to use python3 packages in INSTALL.md. ([\#6443](https://github.com/matrix-org/synapse/issues/6443))
- Write some docs for the quarantine_media api. ([\#6458](https://github.com/matrix-org/synapse/issues/6458))
- Convert CONTRIBUTING.rst to markdown (among other small fixes). ([\#6461](https://github.com/matrix-org/synapse/issues/6461))

Deprecations and Removals
-------------------------

- Remove admin/v1/users_paginate endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5925](https://github.com/matrix-org/synapse/issues/5925))
- Remove fallback for federation with old servers which lack the /federation/v1/state_ids API. ([\#6488](https://github.com/matrix-org/synapse/issues/6488))

Internal Changes
----------------

- Add benchmarks for structured logging and improve output performance. ([\#6266](https://github.com/matrix-org/synapse/issues/6266))
- Improve the performance of outputting structured logging. ([\#6322](https://github.com/matrix-org/synapse/issues/6322))
- Refactor some code in the event authentication path for clarity. ([\#6343](https://github.com/matrix-org/synapse/issues/6343), [\#6468](https://github.com/matrix-org/synapse/issues/6468), [\#6480](https://github.com/matrix-org/synapse/issues/6480))
- Clean up some unnecessary quotation marks around the codebase. ([\#6362](https://github.com/matrix-org/synapse/issues/6362))
- Complain on startup instead of 500'ing during runtime when `public_baseurl` isn't set when necessary. ([\#6379](https://github.com/matrix-org/synapse/issues/6379))
- Add a test scenario to make sure room history purges don't break `/messages` in the future. ([\#6392](https://github.com/matrix-org/synapse/issues/6392))
- Clarifications for the email configuration settings. ([\#6423](https://github.com/matrix-org/synapse/issues/6423))
- Add more tests to the blacklist when running in worker mode. ([\#6429](https://github.com/matrix-org/synapse/issues/6429))
- Refactor data store layer to support multiple databases in the future. ([\#6454](https://github.com/matrix-org/synapse/issues/6454), [\#6464](https://github.com/matrix-org/synapse/issues/6464), [\#6469](https://github.com/matrix-org/synapse/issues/6469), [\#6487](https://github.com/matrix-org/synapse/issues/6487))
- Port synapse.rest.client.v1 to async/await. ([\#6482](https://github.com/matrix-org/synapse/issues/6482))
- Port synapse.rest.client.v2_alpha to async/await. ([\#6483](https://github.com/matrix-org/synapse/issues/6483))
- Port SyncHandler to async/await. ([\#6484](https://github.com/matrix-org/synapse/issues/6484))
2019-12-13 10:55:33 +00:00
Erik Johnston ba57a45644 More rewording of changelog. 2019-12-13 10:52:29 +00:00
Erik Johnston bac1578013 Reword changelog 2019-12-13 10:46:31 +00:00
Erik Johnston d046f367a4 Add deprecation notes 2019-12-13 10:36:35 +00:00
Erik Johnston f5aeea9e89 1.7.0 2019-12-13 10:19:53 +00:00
Richard van der Hoff 4ce05ec171 Adjust the sytest blacklist for worker mode (#6538)
Remove tests that got blacklisted while torturing was enabled, and add one that
fails.
2019-12-13 10:15:20 +00:00
Erik Johnston caa52836e4 Merge pull request #6496 from matrix-org/erikj/initial_sync_asnyc
Port synapse.handlers.initial_sync to async/await
2019-12-13 10:01:51 +00:00
Erik Johnston 7e67cfc87d Merge pull request #6534 from matrix-org/erikj/extend_mypy
Include more folders in mypy
2019-12-12 17:21:42 +00:00
Hubert Chathi cb2db17994 look up cross-signing keys from the DB in bulk (#6486) 2019-12-12 12:03:28 -05:00
Andrew Morgan 5bfd8855d6 Fix redacted events being returned in search results ordered by "recent" (#6522) 2019-12-12 15:53:49 +00:00
Erik Johnston 5056d6d90a Newsfile 2019-12-12 15:22:46 +00:00
Erik Johnston 495005360c Bump version of mypy 2019-12-12 15:21:12 +00:00
Erik Johnston c965253e4b Newsfile 2019-12-12 14:54:03 +00:00
Erik Johnston 324d4f61b8 Include more folders in mypy 2019-12-12 14:52:11 +00:00
Richard van der Hoff 25f1244329 Check the room_id of events when fetching room state/auth (#6524)
When we request the state/auth_events to populate a backwards extremity (on
backfill or in the case of missing events in a transaction push), we should
check that the returned events are in the right room rather than blindly using
them in the room state or auth chain.

Given that _get_events_from_store_or_dest takes a room_id, it seems clear that
it should be sanity-checking the room_id of the requested events, so let's do
it there.
2019-12-12 12:57:45 +00:00
Erik Johnston b8e4b39b69 Merge pull request #6511 from matrix-org/erikj/remove_db_config_from_apps
Move database config from apps into HomeServer object
2019-12-12 10:37:56 +00:00
Erik Johnston cfcfb57e58 Add new config param to docstring and add types 2019-12-11 17:27:46 +00:00
Erik Johnston 6828b47c45 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/initial_sync_asnyc 2019-12-11 17:01:41 +00:00
Richard van der Hoff 2045356517 Add include_event_in_state to _get_state_for_room (#6521)
Make it return the state *after* the requested event, rather than the one
before it. This is a bit easier and requires fewer calls to
get_events_from_store_or_dest.
2019-12-11 16:37:51 +00:00
Richard van der Hoff 894d2addac Merge pull request #6517 from matrix-org/rav/event_auth/13
Port some of FederationHandler to async/await
2019-12-11 16:36:06 +00:00
Mark Nowiasz 58fdcbdfe7 Update workers.md to make media_repository work (again) (#6519) 2019-12-11 16:23:38 +00:00
Erik Johnston 31905a518e Merge pull request #6504 from matrix-org/erikj/account_validity_async_await
Port handlers.account_validity to async/await.
2019-12-11 14:49:26 +00:00
Richard van der Hoff 5324bc20a6 changelog 2019-12-11 14:39:26 +00:00
Richard van der Hoff 6637d90d77 convert to async: FederationHandler._process_received_pdu
also fix user_joined_room to consistently return deferreds
2019-12-11 14:39:26 +00:00
Richard van der Hoff 4db394a4b3 convert to async: FederationHandler._get_state_for_room
... and _get_events_from_store_or_dest
2019-12-11 14:39:26 +00:00
Richard van der Hoff e77237b935 convert to async: FederationHandler.on_receive_pdu
and associated functions:
 * on_receive_pdu
 * handle_queued_pdus
 * get_missing_events_for_pdu
2019-12-11 14:39:26 +00:00
Richard van der Hoff 7712e751b8 Convert federation backfill to async
PaginationHandler.get_messages is only called by RoomMessageListRestServlet,
which is async.

Chase the code path down from there:
 - FederationHandler.maybe_backfill (and nested try_backfill)
 - FederationHandler.backfill
2019-12-11 14:39:25 +00:00
Richard van der Hoff 7c429f92d6 Clean up some logging (#6515)
This just makes some of the logging easier to follow when things start going
wrong.
2019-12-11 14:32:25 +00:00
Erik Johnston adb3a873fd Merge tag 'v1.7.0rc2' into develop
Synapse 1.7.0rc2 (2019-12-11)
=============================

Bugfixes
--------

- Fix incorrect error message for invalid requests when setting user's avatar URL. ([\#6497](https://github.com/matrix-org/synapse/issues/6497))
- Fix support for SQLite 3.7. ([\#6499](https://github.com/matrix-org/synapse/issues/6499))
- Fix regression where sending email push would not work when using a pusher worker. ([\#6507](https://github.com/matrix-org/synapse/issues/6507), [\#6509](https://github.com/matrix-org/synapse/issues/6509))
2019-12-11 14:14:30 +00:00
Erik Johnston d156912c4c 1.7.0rc2 2019-12-11 13:56:50 +00:00
Andrew Morgan fc316a4894 Prevent redacted events from appearing in message search (#6377) 2019-12-11 13:39:47 +00:00
Andrew Morgan 6676ee9c4a Add dev script to generate full SQL schema files (#6394) 2019-12-11 13:16:01 +00:00
Andrew Morgan ea0f0ad414 Prevent message search in upgraded rooms we're not in (#6385) 2019-12-11 13:07:25 +00:00
Erik Johnston 72acca6a32 Back out change preventing setting null avatar URLs 2019-12-11 11:46:55 +00:00
Brendan Abolivier 54ae52ba96 Merge pull request #6349 from matrix-org/babolivier/msc1802
Implement v2 APIs for send_join and send_leave
2019-12-11 11:41:47 +00:00
Erik Johnston d21577bdcb Merge branch 'erikj/fix_sqlite_7' of github.com:matrix-org/synapse into release-v1.7.0 2019-12-11 11:34:50 +00:00
Erik Johnston 239d86a134 Merge pull request #6512 from matrix-org/erikj/silence_mypy
Silence mypy errors for files outside those specified
2019-12-11 10:39:57 +00:00
Erik Johnston 3f97b4c16b Newsfile 2019-12-10 14:40:15 +00:00
Erik Johnston 257ef2c727 Port handlers.account_validity to async/await. 2019-12-10 14:40:15 +00:00
Erik Johnston d630c82349 Newsfile 2019-12-10 14:34:17 +00:00
Erik Johnston 67c991b78f Fix upgrade db script 2019-12-10 14:34:17 +00:00
Erik Johnston bc5cb8bfe8 Remove database config parsing from apps. 2019-12-10 14:34:17 +00:00
Brendan Abolivier ae49d29ef1 Fixup changelogs 2019-12-10 13:55:03 +00:00
Erik Johnston a964f18887 Merge pull request #6509 from matrix-org/babolivier/fix-room-store-config
Give the server config to the RoomWorkerStore
2019-12-10 13:43:07 +00:00
Erik Johnston 4643bb2a37 Newsfile 2019-12-10 13:36:00 +00:00
Erik Johnston 28b758fa0f Silence mypy errors for files outside those specified 2019-12-10 13:34:56 +00:00
Brendan Abolivier 451ec9b8b9 Changelog 2019-12-10 13:06:41 +00:00
Brendan Abolivier 3bd049bbb7 Give the server config to the RoomWorkerStore 2019-12-10 13:05:35 +00:00
Erik Johnston 332f3b36e5 Merge pull request #6507 from matrix-org/babolivier/pusher-room-store
Make the PusherSlaveStore inherit from the slave RoomStore
2019-12-10 12:57:37 +00:00
Brendan Abolivier ec5fdd1333 Changelog 2019-12-10 12:34:33 +00:00
Brendan Abolivier 2ac78438d8 Make the PusherSlaveStore inherit from the slave RoomStore
So that it has access to the get_retention_policy_for_room function which is required by filter_events_for_client.
2019-12-10 12:31:03 +00:00
Erik Johnston 353396e3a7 Port handlers.account_data to async/await. 2019-12-10 11:12:56 +00:00
Neil Johnson d95736a2bd Fix erroneous reference for new room directory defaults. 2019-12-10 10:05:33 +00:00
Neil Johnson 21aa0a458f Update CHANGES.md 2019-12-09 14:57:09 +00:00
Erik Johnston aeaeb72ee4 Newsfile 2019-12-09 13:48:14 +00:00
Erik Johnston a1f8ea9051 Port synapse.handlers.initial_sync to async/await 2019-12-09 13:46:45 +00:00
Brendan Abolivier e126d83f74 Merge branch 'develop' into babolivier/msc1802 2019-12-05 21:00:43 +00:00
Andrew Morgan e2cce15af1 Remove #6369 changelog 2019-12-05 15:44:02 +00:00
Brendan Abolivier c8444b4152 Merge branch 'develop' into babolivier/msc1802 2019-11-14 10:12:11 +00:00
Brendan Abolivier 94cdd6fffe Lint 2019-11-11 16:56:55 +00:00
Brendan Abolivier 21056ad12a Changelog 2019-11-11 16:53:29 +00:00
Brendan Abolivier edc4c7d4c5 Lint 2019-11-11 16:51:54 +00:00
Brendan Abolivier 5e18dc7955 Fix prefix for v2/send_leave 2019-11-11 16:46:09 +00:00
Brendan Abolivier 74897de01f Add server-side support to the v2 API 2019-11-11 16:40:45 +00:00
Brendan Abolivier 1e202a90f1 Implement v2 API for send_leave 2019-11-11 16:26:53 +00:00
Brendan Abolivier 92527d7b21 Add missing yield 2019-11-11 16:20:53 +00:00
Brendan Abolivier 4c131b2c78 Implement v2 API for send_join 2019-11-11 15:47:47 +00:00
170 changed files with 4189 additions and 2834 deletions
+3 -28
View File
@@ -34,33 +34,8 @@ Device list doesn't change if remote server is down
Remote servers cannot set power levels in rooms without existing powerlevels
Remote servers should reject attempts by non-creators to set the power levels
# new failures as of https://github.com/matrix-org/sytest/pull/753
GET /rooms/:room_id/messages returns a message
GET /rooms/:room_id/messages lazy loads members correctly
Read receipts are sent as events
Only original members of the room can see messages from erased users
Device deletion propagates over federation
If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes
Changing user-signing key notifies local users
Newly updated tags appear in an incremental v2 /sync
# https://buildkite.com/matrix-dot-org/synapse/builds/6134#6f67bf47-e234-474d-80e8-c6e1868b15c5
Server correctly handles incoming m.device_list_update
Local device key changes get to remote servers with correct prev_id
AS-ghosted users can use rooms via AS
Ghost user must register before joining room
Test that a message is pushed
Invites are pushed
Rooms with aliases are correctly named in pushed
Rooms with names are correctly named in pushed
Rooms with canonical alias are correctly named in pushed
Rooms with many users are correctly pushed
Don't get pushed for rooms you've muted
Rejected events are not pushed
Test that rejected pushers are removed.
Events come down the correct room
# https://buildkite.com/matrix-dot-org/sytest/builds/326#cca62404-a88a-4fcb-ad41-175fd3377603
Presence changes to UNAVAILABLE are reported to remote room members
If remote user leaves room, changes device and rejoins we see update in sync
uploading self-signing key notifies over federation
Inbound federation can receive redacted events
Outbound federation can request missing events
# this fails reliably with a torture level of 100 due to https://github.com/matrix-org/synapse/issues/6536
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state
+3
View File
@@ -46,3 +46,6 @@ Joseph Weston <joseph at weston.cloud>
Benjamin Saunders <ben.e.saunders at gmail dot com>
* Documentation improvements
Werner Sembach <werner.sembach at fau dot de>
* Automatically remove a group/community when it is empty
+148 -11
View File
@@ -1,17 +1,160 @@
Synapse 1.8.0 (2020-01-09)
==========================
Bugfixes
--------
- Fix `GET` request on `/_synapse/admin/v2/users` endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#6563](https://github.com/matrix-org/synapse/issues/6563))
- Fix incorrect signing of responses from the key server implementation. ([\#6657](https://github.com/matrix-org/synapse/issues/6657))
Synapse 1.8.0rc1 (2020-01-07)
=============================
Features
--------
- Add v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)). ([\#6349](https://github.com/matrix-org/synapse/issues/6349))
- Add a develop script to generate full SQL schemas. ([\#6394](https://github.com/matrix-org/synapse/issues/6394))
- Add custom SAML username mapping functinality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Automatically delete empty groups/communities. ([\#6453](https://github.com/matrix-org/synapse/issues/6453))
- Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. ([\#6523](https://github.com/matrix-org/synapse/issues/6523))
- Add an `export_signing_key` script to extract the public part of signing keys when rotating them. ([\#6546](https://github.com/matrix-org/synapse/issues/6546))
- Add experimental config option to specify multiple databases. ([\#6580](https://github.com/matrix-org/synapse/issues/6580))
- Raise an error if someone tries to use the `log_file` config option. ([\#6626](https://github.com/matrix-org/synapse/issues/6626))
Bugfixes
--------
- Prevent redacted events from being returned during message search. ([\#6377](https://github.com/matrix-org/synapse/issues/6377), [\#6522](https://github.com/matrix-org/synapse/issues/6522))
- Prevent error on trying to search a upgraded room when the server is not in the predecessor room. ([\#6385](https://github.com/matrix-org/synapse/issues/6385))
- Improve performance of looking up cross-signing keys. ([\#6486](https://github.com/matrix-org/synapse/issues/6486))
- Fix race which occasionally caused deleted devices to reappear. ([\#6514](https://github.com/matrix-org/synapse/issues/6514))
- Fix missing row in `device_max_stream_id` that could cause unable to decrypt errors after server restart. ([\#6555](https://github.com/matrix-org/synapse/issues/6555))
- Fix a bug which meant that we did not send systemd notifications on startup if acme was enabled. ([\#6571](https://github.com/matrix-org/synapse/issues/6571))
- Fix exception when fetching the `matrix.org:ed25519:auto` key. ([\#6625](https://github.com/matrix-org/synapse/issues/6625))
- Fix bug where a moderator upgraded a room and became an admin in the new room. ([\#6633](https://github.com/matrix-org/synapse/issues/6633))
- Fix an error which was thrown by the `PresenceHandler` `_on_shutdown` handler. ([\#6640](https://github.com/matrix-org/synapse/issues/6640))
- Fix exceptions in the synchrotron worker log when events are rejected. ([\#6645](https://github.com/matrix-org/synapse/issues/6645))
- Ensure that upgraded rooms are removed from the directory. ([\#6648](https://github.com/matrix-org/synapse/issues/6648))
- Fix a bug causing Synapse not to fetch missing events when it believes it has every event in the room. ([\#6652](https://github.com/matrix-org/synapse/issues/6652))
Improved Documentation
----------------------
- Document the Room Shutdown Admin API. ([\#6541](https://github.com/matrix-org/synapse/issues/6541))
- Reword sections of [docs/federate.md](docs/federate.md) that explained delegation at time of Synapse 1.0 transition. ([\#6601](https://github.com/matrix-org/synapse/issues/6601))
- Added the section 'Configuration' in [docs/turn-howto.md](docs/turn-howto.md). ([\#6614](https://github.com/matrix-org/synapse/issues/6614))
Deprecations and Removals
-------------------------
- Remove redundant code from event authorisation implementation. ([\#6502](https://github.com/matrix-org/synapse/issues/6502))
- Remove unused, undocumented `/_matrix/content` API. ([\#6628](https://github.com/matrix-org/synapse/issues/6628))
Internal Changes
----------------
- Add *experimental* support for multiple physical databases and split out state storage to separate data store. ([\#6245](https://github.com/matrix-org/synapse/issues/6245), [\#6510](https://github.com/matrix-org/synapse/issues/6510), [\#6511](https://github.com/matrix-org/synapse/issues/6511), [\#6513](https://github.com/matrix-org/synapse/issues/6513), [\#6564](https://github.com/matrix-org/synapse/issues/6564), [\#6565](https://github.com/matrix-org/synapse/issues/6565))
- Port sections of code base to async/await. ([\#6496](https://github.com/matrix-org/synapse/issues/6496), [\#6504](https://github.com/matrix-org/synapse/issues/6504), [\#6505](https://github.com/matrix-org/synapse/issues/6505), [\#6517](https://github.com/matrix-org/synapse/issues/6517), [\#6559](https://github.com/matrix-org/synapse/issues/6559), [\#6647](https://github.com/matrix-org/synapse/issues/6647), [\#6653](https://github.com/matrix-org/synapse/issues/6653))
- Remove `SnapshotCache` in favour of `ResponseCache`. ([\#6506](https://github.com/matrix-org/synapse/issues/6506))
- Silence mypy errors for files outside those specified. ([\#6512](https://github.com/matrix-org/synapse/issues/6512))
- Clean up some logging when handling incoming events over federation. ([\#6515](https://github.com/matrix-org/synapse/issues/6515))
- Test more folders against mypy. ([\#6534](https://github.com/matrix-org/synapse/issues/6534))
- Update `mypy` to new version. ([\#6537](https://github.com/matrix-org/synapse/issues/6537))
- Adjust the sytest blacklist for worker mode. ([\#6538](https://github.com/matrix-org/synapse/issues/6538))
- Remove unused `get_pagination_rows` methods from `EventSource` classes. ([\#6557](https://github.com/matrix-org/synapse/issues/6557))
- Clean up logs from the push notifier at startup. ([\#6558](https://github.com/matrix-org/synapse/issues/6558))
- Improve diagnostics on database upgrade failure. ([\#6570](https://github.com/matrix-org/synapse/issues/6570))
- Reduce the reconnect time when worker replication fails, to make it easier to catch up. ([\#6617](https://github.com/matrix-org/synapse/issues/6617))
- Simplify http handling by removing redundant `SynapseRequestFactory`. ([\#6619](https://github.com/matrix-org/synapse/issues/6619))
- Add a workaround for synapse raising exceptions when fetching the notary's own key from the notary. ([\#6620](https://github.com/matrix-org/synapse/issues/6620))
- Automate generation of the sample log config. ([\#6627](https://github.com/matrix-org/synapse/issues/6627))
- Simplify event creation code by removing redundant queries on the `event_reference_hashes` table. ([\#6629](https://github.com/matrix-org/synapse/issues/6629))
- Fix errors when `frozen_dicts` are enabled. ([\#6642](https://github.com/matrix-org/synapse/issues/6642))
Synapse 1.7.3 (2019-12-31)
==========================
This release fixes a long-standing bug in the state resolution algorithm.
Bugfixes
--------
- Fix exceptions caused by state resolution choking on malformed events. ([\#6608](https://github.com/matrix-org/synapse/issues/6608))
Synapse 1.7.2 (2019-12-20)
==========================
This release fixes some regressions introduced in Synapse 1.7.0 and 1.7.1.
Bugfixes
--------
- Fix a regression introduced in Synapse 1.7.1 which caused errors when attempting to backfill rooms over federation. ([\#6576](https://github.com/matrix-org/synapse/issues/6576))
- Fix a bug introduced in Synapse 1.7.0 which caused an error on startup when upgrading from versions before 1.3.0. ([\#6578](https://github.com/matrix-org/synapse/issues/6578))
Synapse 1.7.1 (2019-12-18)
==========================
This release includes several security fixes as well as a fix to a bug exposed by the security fixes. Administrators are encouraged to upgrade as soon as possible.
Security updates
----------------
- Fix a bug which could cause room events to be incorrectly authorized using events from a different room. ([\#6501](https://github.com/matrix-org/synapse/issues/6501), [\#6503](https://github.com/matrix-org/synapse/issues/6503), [\#6521](https://github.com/matrix-org/synapse/issues/6521), [\#6524](https://github.com/matrix-org/synapse/issues/6524), [\#6530](https://github.com/matrix-org/synapse/issues/6530), [\#6531](https://github.com/matrix-org/synapse/issues/6531))
- Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event. ([\#6553](https://github.com/matrix-org/synapse/issues/6553))
- Fix a cause of state resets in room versions 2 onwards. ([\#6556](https://github.com/matrix-org/synapse/issues/6556), [\#6560](https://github.com/matrix-org/synapse/issues/6560))
Bugfixes
--------
- Fix a bug which could cause the federation server to incorrectly return errors when handling certain obscure event graphs. ([\#6526](https://github.com/matrix-org/synapse/issues/6526), [\#6527](https://github.com/matrix-org/synapse/issues/6527))
Synapse 1.7.0 (2019-12-13)
==========================
This release changes the default settings so that only local authenticated users can query the server's room directory. See the [upgrade notes](UPGRADE.rst#upgrading-to-v170) for details.
Support for SQLite versions before 3.11 is now deprecated. A future release will refuse to start if used with an SQLite version before 3.11.
Administrators are reminded that SQLite should not be used for production instances. Instructions for migrating to Postgres are available [here](docs/postgres.md). A future release of synapse will, by default, disable federation for servers using SQLite.
No significant changes since 1.7.0rc2.
Synapse 1.7.0rc2 (2019-12-11)
=============================
Bugfixes
--------
- Fix incorrect error message for invalid requests when setting user's avatar URL. ([\#6497](https://github.com/matrix-org/synapse/issues/6497))
- Fix support for SQLite 3.7. ([\#6499](https://github.com/matrix-org/synapse/issues/6499))
- Fix regression where sending email push would not work when using a pusher worker. ([\#6507](https://github.com/matrix-org/synapse/issues/6507), [\#6509](https://github.com/matrix-org/synapse/issues/6509))
Synapse 1.7.0rc1 (2019-12-09)
=============================
Features
--------
- Implement per-room message retention policies. ([\#5815](https://github.com/matrix-org/synapse/issues/5815))
- Implement per-room message retention policies. ([\#5815](https://github.com/matrix-org/synapse/issues/5815), [\#6436](https://github.com/matrix-org/synapse/issues/6436))
- Add etag and count fields to key backup endpoints to help clients guess if there are new keys. ([\#5858](https://github.com/matrix-org/synapse/issues/5858))
- Add admin/v2/users endpoint with pagination. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5925](https://github.com/matrix-org/synapse/issues/5925))
- Add `/admin/v2/users` endpoint with pagination. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5925](https://github.com/matrix-org/synapse/issues/5925))
- Require User-Interactive Authentication for `/account/3pid/add`, meaning the user's password will be required to add a third-party ID to their account. ([\#6119](https://github.com/matrix-org/synapse/issues/6119))
- Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314. ([\#6176](https://github.com/matrix-org/synapse/issues/6176))
- Configure privacy preserving settings by default for the room directory. ([\#6354](https://github.com/matrix-org/synapse/issues/6354))
- Configure privacy-preserving settings by default for the room directory. ([\#6355](https://github.com/matrix-org/synapse/issues/6355))
- Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). ([\#6409](https://github.com/matrix-org/synapse/issues/6409))
- Add support for MSC 2367, which allows specifying a reason on all membership events. ([\#6434](https://github.com/matrix-org/synapse/issues/6434))
- Add support for [MSC 2367](https://github.com/matrix-org/matrix-doc/pull/2367), which allows specifying a reason on all membership events. ([\#6434](https://github.com/matrix-org/synapse/issues/6434))
Bugfixes
@@ -26,8 +169,6 @@ Bugfixes
- Fix an intermittent exception when handling read-receipts. ([\#6408](https://github.com/matrix-org/synapse/issues/6408))
- Fix broken guest registration when there are existing blocks of numeric user IDs. ([\#6420](https://github.com/matrix-org/synapse/issues/6420))
- Fix startup error when http proxy is defined. ([\#6421](https://github.com/matrix-org/synapse/issues/6421))
- Clean up local threepids from user on account deactivation. ([\#6426](https://github.com/matrix-org/synapse/issues/6426))
- Fix a bug where a room could become unusable with a low retention policy and a low activity. ([\#6436](https://github.com/matrix-org/synapse/issues/6436))
- Fix error when using synapse_port_db on a vanilla synapse db. ([\#6449](https://github.com/matrix-org/synapse/issues/6449))
- Fix uploading multiple cross signing signatures for the same user. ([\#6451](https://github.com/matrix-org/synapse/issues/6451))
- Fix bug which lead to exceptions being thrown in a loop when a cross-signed device is deleted. ([\#6462](https://github.com/matrix-org/synapse/issues/6462))
@@ -67,14 +208,10 @@ Internal Changes
- Add a test scenario to make sure room history purges don't break `/messages` in the future. ([\#6392](https://github.com/matrix-org/synapse/issues/6392))
- Clarifications for the email configuration settings. ([\#6423](https://github.com/matrix-org/synapse/issues/6423))
- Add more tests to the blacklist when running in worker mode. ([\#6429](https://github.com/matrix-org/synapse/issues/6429))
- Move data store specific code out of `SQLBaseStore`. ([\#6454](https://github.com/matrix-org/synapse/issues/6454))
- Prepare SQLBaseStore functions being moved out of the stores. ([\#6464](https://github.com/matrix-org/synapse/issues/6464))
- Move per database functionality out of the data stores and into a dedicated `Database` class. ([\#6469](https://github.com/matrix-org/synapse/issues/6469))
- Refactor data store layer to support multiple databases in the future. ([\#6454](https://github.com/matrix-org/synapse/issues/6454), [\#6464](https://github.com/matrix-org/synapse/issues/6464), [\#6469](https://github.com/matrix-org/synapse/issues/6469), [\#6487](https://github.com/matrix-org/synapse/issues/6487))
- Port synapse.rest.client.v1 to async/await. ([\#6482](https://github.com/matrix-org/synapse/issues/6482))
- Port synapse.rest.client.v2_alpha to async/await. ([\#6483](https://github.com/matrix-org/synapse/issues/6483))
- Port SyncHandler to async/await. ([\#6484](https://github.com/matrix-org/synapse/issues/6484))
- Pass in `Database` object to data stores. ([\#6487](https://github.com/matrix-org/synapse/issues/6487))
Synapse 1.6.1 (2019-11-28)
==========================
+1 -1
View File
@@ -393,4 +393,4 @@ something like the following in their logs::
2019-09-11 19:32:04,271 - synapse.federation.transport.server - 288 - WARNING - GET-11752 - authenticate_request failed: 401: Invalid signature for server <server> with key ed25519:a_EqML: Unable to verify signature for <server>
This is normally caused by a misconfiguration in your reverse-proxy. See
`<docs/reverse_proxy.rst>`_ and double-check that your settings are correct.
`<docs/reverse_proxy.md>`_ and double-check that your settings are correct.
-1
View File
@@ -1 +0,0 @@
Allow custom SAML username mapping functinality through an external provider plugin.
-1
View File
@@ -1 +0,0 @@
Improve performance of looking up cross-signing keys.
-1
View File
@@ -1 +0,0 @@
Fix error message when setting your profile's avatar URL mentioning displaynames, and prevent NoneType avatar_urls.
-1
View File
@@ -1 +0,0 @@
Fix support for SQLite 3.7.
-1
View File
@@ -1 +0,0 @@
Refactor get_events_from_store_or_dest to return a dict.
-1
View File
@@ -1 +0,0 @@
Remove redundant code from event authorisation implementation.
-1
View File
@@ -1 +0,0 @@
Move get_state methods into FederationHandler.
-1
View File
@@ -1 +0,0 @@
Make `make_deferred_yieldable` to work with async/await.
-1
View File
@@ -1 +0,0 @@
Remove `SnapshotCache` in favour of `ResponseCache`.
-1
View File
@@ -1 +0,0 @@
Change phone home stats to not assume there is a single database and report information about the database used by the main data store.
-1
View File
@@ -1 +0,0 @@
Fix race which occasionally caused deleted devices to reappear.
+3
View File
@@ -85,6 +85,9 @@ PYTHONPATH="$tmpdir" \
' > "${PACKAGE_BUILD_DIR}/etc/matrix-synapse/homeserver.yaml"
# build the log config file
"${TARGET_PYTHON}" -B "${VIRTUALENV_DIR}/bin/generate_log_config" \
--output-file="${PACKAGE_BUILD_DIR}/etc/matrix-synapse/log.yaml"
# add a dependency on the right version of python to substvars.
PYPKG=`basename $SNAKE`
+34
View File
@@ -1,3 +1,37 @@
matrix-synapse-py3 (1.8.0) stable; urgency=medium
[ Richard van der Hoff ]
* Automate generation of the default log configuration file.
[ Synapse Packaging team ]
* New synapse release 1.8.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 09 Jan 2020 11:39:27 +0000
matrix-synapse-py3 (1.7.3) stable; urgency=medium
* New synapse release 1.7.3.
-- Synapse Packaging team <packages@matrix.org> Tue, 31 Dec 2019 10:45:04 +0000
matrix-synapse-py3 (1.7.2) stable; urgency=medium
* New synapse release 1.7.2.
-- Synapse Packaging team <packages@matrix.org> Fri, 20 Dec 2019 10:56:50 +0000
matrix-synapse-py3 (1.7.1) stable; urgency=medium
* New synapse release 1.7.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 18 Dec 2019 09:37:59 +0000
matrix-synapse-py3 (1.7.0) stable; urgency=medium
* New synapse release 1.7.0.
-- Synapse Packaging team <packages@matrix.org> Fri, 13 Dec 2019 10:19:38 +0000
matrix-synapse-py3 (1.6.1) stable; urgency=medium
* New synapse release 1.6.1.
-1
View File
@@ -1,2 +1 @@
debian/log.yaml etc/matrix-synapse
debian/manage_debconf.pl /opt/venvs/matrix-synapse/lib/
-36
View File
@@ -1,36 +0,0 @@
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
formatter: precise
filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600
backupCount: 10
filters: [context]
encoding: utf8
console:
class: logging.StreamHandler
formatter: precise
level: WARN
loggers:
synapse:
level: INFO
synapse.storage.SQL:
level: INFO
root:
level: INFO
handlers: [file, console]
+72
View File
@@ -0,0 +1,72 @@
# Shutdown room API
Shuts down a room, preventing new joins and moves local users and room aliases automatically
to a new room. The new room will be created with the user specified by the
`new_room_user_id` parameter as room administrator and will contain a message
explaining what happened. Users invited to the new room will have power level
-10 by default, and thus be unable to speak. The old room's power levels will be changed to
disallow any further invites or joins.
The local server will only have the power to move local user and room aliases to
the new room. Users on other servers will be unaffected.
## API
You will need to authenticate with an access token for an admin user.
### URL
`POST /_synapse/admin/v1/shutdown_room/{room_id}`
### URL Parameters
* `room_id` - The ID of the room (e.g `!someroom:example.com`)
### JSON Body Parameters
* `new_room_user_id` - Required. A string representing the user ID of the user that will admin
the new room that all users in the old room will be moved to.
* `room_name` - Optional. A string representing the name of the room that new users will be
invited to.
* `message` - Optional. A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly convey why the
original room was shut down.
If not specified, the default value of `room_name` is "Content Violation
Notification". The default value of `message` is "Sharing illegal content on
othis server is not permitted and rooms in violation will be blocked."
### Response Parameters
* `kicked_users` - An integer number representing the number of users that
were kicked.
* `failed_to_kick_users` - An integer number representing the number of users
that were not kicked.
* `local_aliases` - An array of strings representing the local aliases that were migrated from
the old room to the new.
* `new_room_id` - A string representing the room ID of the new room.
## Example
Request:
```
POST /_synapse/admin/v1/shutdown_room/!somebadroom%3Aexample.com
{
"new_room_user_id": "@someuser:example.com",
"room_name": "Content Violation Notification",
"message": "Bad Room has been shutdown due to content violations on this server. Please review our Terms of Service."
}
```
Response:
```
{
"kicked_users": 5,
"failed_to_kick_users": 0,
"local_aliases": ["#badroom:example.com", "#evilsaloon:example.com],
"new_room_id": "!newroomid:example.com",
},
```
+7 -6
View File
@@ -137,6 +137,7 @@ Some guidelines follow:
correctly handles the top-level option being set to `None` (as it
will be if no sub-options are enabled).
- Lines should be wrapped at 80 characters.
- Use two-space indents.
Example:
@@ -155,13 +156,13 @@ Example:
# Settings for the frobber
#
frobber:
# frobbing speed. Defaults to 1.
#
#speed: 10
# frobbing speed. Defaults to 1.
#
#speed: 10
# frobbing distance. Defaults to 1000.
#
#distance: 100
# frobbing distance. Defaults to 1000.
#
#distance: 100
Note that the sample configuration is generated from the synapse code
and is maintained by a script, `scripts-dev/generate_sample_config`.
+3 -21
View File
@@ -66,10 +66,6 @@ therefore cannot gain access to the necessary certificate. With .well-known,
federation servers will check for a valid TLS certificate for the delegated
hostname (in our example: ``synapse.example.com``).
.well-known support first appeared in Synapse v0.99.0. To federate with older
servers you may need to additionally configure SRV delegation. Alternatively,
encourage the server admin in question to upgrade :).
### DNS SRV delegation
To use this delegation method, you need to have write access to your
@@ -111,29 +107,15 @@ giving it a `server_name` of `example.com`, and once [ACME](acme.md) support is
it would automatically generate a valid TLS certificate for you via Let's Encrypt
and no SRV record or .well-known URI would be needed.
This is the common case, although you can add an SRV record or
`.well-known/matrix/server` URI for completeness if you wish.
**However**, if your server does not listen on port 8448, or if your `server_name`
does not point to the host that your homeserver runs on, you will need to let
other servers know how to find it. The way to do this is via .well-known or an
SRV record.
#### I have created a .well-known URI. Do I still need an SRV record?
#### I have created a .well-known URI. Do I also need an SRV record?
As of Synapse 0.99, Synapse will first check for the existence of a .well-known
URI and follow any delegation it suggests. It will only then check for the
existence of an SRV record.
That means that the SRV record will often be redundant. However, you should
remember that there may still be older versions of Synapse in the federation
which do not understand .well-known URIs, so if you removed your SRV record
you would no longer be able to federate with them.
It is therefore best to leave the SRV record in place for now. Synapse 0.34 and
earlier will follow the SRV record (and not care about the invalid
certificate). Synapse 0.99 and later will follow the .well-known URI, with the
correct certificate chain.
No. You can use either `.well-known` delegation or use an SRV record for delegation. You
do not need to use both to delegate to the same location.
#### Can I manage my own certificates rather than having Synapse renew certificates itself?
+19 -11
View File
@@ -54,6 +54,13 @@ pid_file: DATADIR/homeserver.pid
#
#require_auth_for_profile_requests: true
# Uncomment to require a user to share a room with another user in order
# to retrieve their profile information. Only checked on Client-Server
# requests. Profile requests from other servers should be checked by the
# requesting server. Defaults to 'false'.
#
#limit_profile_requests_to_users_who_share_rooms: true
# If set to 'true', removes the need for authentication to access the server's
# public rooms directory through the client API, meaning that anyone can
# query the room directory. Defaults to 'false'.
@@ -685,10 +692,6 @@ media_store_path: "DATADIR/media_store"
# config:
# directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
#
uploads_path: "DATADIR/uploads"
# The largest allowed upload size in bytes
#
#max_upload_size: 10M
@@ -1115,14 +1118,19 @@ metrics_flags:
signing_key_path: "CONFDIR/SERVERNAME.signing.key"
# The keys that the server used to sign messages with but won't use
# to sign new messages. E.g. it has lost its private key
# to sign new messages.
#
#old_signing_keys:
# "ed25519:auto":
# # Base64 encoded public key
# key: "The public part of your old signing key."
# # Millisecond POSIX timestamp when the key expired.
# expired_ts: 123456789123
old_signing_keys:
# For each key, `key` should be the base64-encoded public key, and
# `expired_ts`should be the time (in milliseconds since the unix epoch) that
# it was last used.
#
# It is possible to build an entry from an old signing.key file using the
# `export_signing_key` script which is provided with synapse.
#
# For example:
#
#"ed25519:id": { key: "base64string", expired_ts: 123456789123 }
# How long key response published by this server is valid for.
# Used to set the valid_until_ts in /key/v2 APIs.
+43
View File
@@ -0,0 +1,43 @@
# Log configuration for Synapse.
#
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
formatter: precise
filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600
backupCount: 10
filters: [context]
encoding: utf8
console:
class: logging.StreamHandler
formatter: precise
filters: [context]
loggers:
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: INFO
root:
level: INFO
handlers: [file, console]
disable_existing_loggers: false
+2
View File
@@ -39,6 +39,8 @@ The TURN daemon `coturn` is available from a variety of sources such as native p
make
make install
### Configuration
1. Create or edit the config file in `/etc/turnserver.conf`. The relevant
lines, with example values, are:
+13 -1
View File
@@ -196,7 +196,7 @@ Handles the media repository. It can handle all endpoints starting with:
/_matrix/media/
And the following regular expressions matching media-specific administration APIs:
... and the following regular expressions matching media-specific administration APIs:
^/_synapse/admin/v1/purge_media_cache$
^/_synapse/admin/v1/room/.*/media$
@@ -206,6 +206,18 @@ You should also set `enable_media_repo: False` in the shared configuration
file to stop the main synapse running background jobs related to managing the
media repository.
In the `media_repository` worker configuration file, configure the http listener to
expose the `media` resource. For example:
```yaml
worker_listeners:
- type: http
port: 8085
resources:
- names:
- media
```
Note this worker cannot be load-balanced: only one instance should be active.
### `synapse.app.client_reader`
+1 -1
View File
@@ -1,7 +1,7 @@
[mypy]
namespace_packages = True
plugins = mypy_zope:plugin
follow_imports = normal
follow_imports = silent
check_untyped_defs = True
show_error_codes = True
show_traceback = True
+10
View File
@@ -7,12 +7,22 @@ set -e
cd `dirname $0`/..
SAMPLE_CONFIG="docs/sample_config.yaml"
SAMPLE_LOG_CONFIG="docs/sample_log_config.yaml"
check() {
diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || return 1
}
if [ "$1" == "--check" ]; then
diff -u "$SAMPLE_CONFIG" <(./scripts/generate_config --header-file docs/.sample_config_header.yaml) >/dev/null || {
echo -e "\e[1m\e[31m$SAMPLE_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2
exit 1
}
diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || {
echo -e "\e[1m\e[31m$SAMPLE_LOG_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2
exit 1
}
else
./scripts/generate_config --header-file docs/.sample_config_header.yaml -o "$SAMPLE_CONFIG"
./scripts/generate_log_config -o "$SAMPLE_LOG_CONFIG"
fi
+184
View File
@@ -0,0 +1,184 @@
#!/bin/bash
#
# This script generates SQL files for creating a brand new Synapse DB with the latest
# schema, on both SQLite3 and Postgres.
#
# It does so by having Synapse generate an up-to-date SQLite DB, then running
# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
POSTGRES_HOST="localhost"
POSTGRES_DB_NAME="synapse_full_schema.$$"
SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
POSTGRES_FULL_SCHEMA_OUTPUT_FILE="full.sql.postgres"
REQUIRED_DEPS=("matrix-synapse" "psycopg2")
usage() {
echo
echo "Usage: $0 -p <postgres_username> -o <path> [-c] [-n] [-h]"
echo
echo "-p <postgres_username>"
echo " Username to connect to local postgres instance. The password will be requested"
echo " during script execution."
echo "-c"
echo " CI mode. Enables coverage tracking and prints every command that the script runs."
echo "-o <path>"
echo " Directory to output full schema files to."
echo "-h"
echo " Display this help text."
}
while getopts "p:co:h" opt; do
case $opt in
p)
POSTGRES_USERNAME=$OPTARG
;;
c)
# Print all commands that are being executed
set -x
# Modify required dependencies for coverage
REQUIRED_DEPS+=("coverage" "coverage-enable-subprocess")
COVERAGE=1
;;
o)
command -v realpath > /dev/null || (echo "The -o flag requires the 'realpath' binary to be installed" && exit 1)
OUTPUT_DIR="$(realpath "$OPTARG")"
;;
h)
usage
exit
;;
\?)
echo "ERROR: Invalid option: -$OPTARG" >&2
usage
exit
;;
esac
done
# Check that required dependencies are installed
unsatisfied_requirements=()
for dep in "${REQUIRED_DEPS[@]}"; do
pip show "$dep" --quiet || unsatisfied_requirements+=("$dep")
done
if [ ${#unsatisfied_requirements} -ne 0 ]; then
echo "Please install the following python packages: ${unsatisfied_requirements[*]}"
exit 1
fi
if [ -z "$POSTGRES_USERNAME" ]; then
echo "No postgres username supplied"
usage
exit 1
fi
if [ -z "$OUTPUT_DIR" ]; then
echo "No output directory supplied"
usage
exit 1
fi
# Create the output directory if it doesn't exist
mkdir -p "$OUTPUT_DIR"
read -rsp "Postgres password for '$POSTGRES_USERNAME': " POSTGRES_PASSWORD
echo ""
# Exit immediately if a command fails
set -e
# cd to root of the synapse directory
cd "$(dirname "$0")/.."
# Create temporary SQLite and Postgres homeserver db configs and key file
TMPDIR=$(mktemp -d)
KEY_FILE=$TMPDIR/test.signing.key # default Synapse signing key path
SQLITE_CONFIG=$TMPDIR/sqlite.conf
SQLITE_DB=$TMPDIR/homeserver.db
POSTGRES_CONFIG=$TMPDIR/postgres.conf
# Ensure these files are delete on script exit
trap 'rm -rf $TMPDIR' EXIT
cat > "$SQLITE_CONFIG" <<EOF
server_name: "test"
signing_key_path: "$KEY_FILE"
macaroon_secret_key: "abcde"
report_stats: false
database:
name: "sqlite3"
args:
database: "$SQLITE_DB"
# Suppress the key server warning.
trusted_key_servers: []
EOF
cat > "$POSTGRES_CONFIG" <<EOF
server_name: "test"
signing_key_path: "$KEY_FILE"
macaroon_secret_key: "abcde"
report_stats: false
database:
name: "psycopg2"
args:
user: "$POSTGRES_USERNAME"
host: "$POSTGRES_HOST"
password: "$POSTGRES_PASSWORD"
database: "$POSTGRES_DB_NAME"
# Suppress the key server warning.
trusted_key_servers: []
EOF
# Generate the server's signing key.
echo "Generating SQLite3 db schema..."
python -m synapse.app.homeserver --generate-keys -c "$SQLITE_CONFIG"
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
echo "Running db background jobs..."
scripts-dev/update_database --database-config "$SQLITE_CONFIG"
# Create the PostgreSQL database.
echo "Creating postgres database..."
createdb $POSTGRES_DB_NAME
echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
if [ -z "$COVERAGE" ]; then
# No coverage needed
scripts/synapse_port_db --sqlite-database "$SQLITE_DB" --postgres-config "$POSTGRES_CONFIG"
else
# Coverage desired
coverage run scripts/synapse_port_db --sqlite-database "$SQLITE_DB" --postgres-config "$POSTGRES_CONFIG"
fi
# Delete schema_version, applied_schema_deltas and applied_module_schemas tables
# This needs to be done after synapse_port_db is run
echo "Dropping unwanted db tables..."
SQL="
DROP TABLE schema_version;
DROP TABLE applied_schema_deltas;
DROP TABLE applied_module_schemas;
"
sqlite3 "$SQLITE_DB" <<< "$SQL"
psql $POSTGRES_DB_NAME -U "$POSTGRES_USERNAME" -w <<< "$SQL"
echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"
echo "Dumping Postgres schema to '$OUTPUT_DIR/$POSTGRES_FULL_SCHEMA_OUTPUT_FILE'..."
pg_dump --format=plain --no-tablespaces --no-acl --no-owner $POSTGRES_DB_NAME | sed -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_FULL_SCHEMA_OUTPUT_FILE"
echo "Cleaning up temporary Postgres database..."
dropdb $POSTGRES_DB_NAME
echo "Done! Files dumped to: $OUTPUT_DIR"
+6 -32
View File
@@ -26,8 +26,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
logger = logging.getLogger("update_database")
@@ -35,21 +33,11 @@ logger = logging.getLogger("update_database")
class MockHomeserver(HomeServer):
DATASTORE_CLASS = DataStore
def __init__(self, config, database_engine, db_conn, **kwargs):
def __init__(self, config, **kwargs):
super(MockHomeserver, self).__init__(
config.server_name,
reactor=reactor,
config=config,
database_engine=database_engine,
**kwargs
config.server_name, reactor=reactor, config=config, **kwargs
)
self.database_engine = database_engine
self.db_conn = db_conn
def get_db_conn(self):
return self.db_conn
if __name__ == "__main__":
parser = argparse.ArgumentParser(
@@ -85,25 +73,11 @@ if __name__ == "__main__":
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")
# Create the database engine and a connection to it.
database_engine = create_engine(config.database_config)
db_conn = database_engine.module.connect(
**{
k: v
for k, v in config.database_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
# Update the database to the latest schema.
prepare_database(db_conn, database_engine, config=config)
db_conn.commit()
# Instantiate and initialise the homeserver object.
hs = MockHomeserver(
config, database_engine, db_conn, db_config=config.database_config,
)
# setup instantiates the store within the homeserver object.
hs = MockHomeserver(config)
# Setup instantiates the store within the homeserver object and updates the
# DB.
hs.setup()
store = hs.get_datastore()
+94
View File
@@ -0,0 +1,94 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
import time
from typing import Optional
import nacl.signing
from signedjson.key import encode_verify_key_base64, get_verify_key, read_signing_keys
def exit(status: int = 0, message: Optional[str] = None):
if message:
print(message, file=sys.stderr)
sys.exit(status)
def format_plain(public_key: nacl.signing.VerifyKey):
print(
"%s:%s %s"
% (public_key.alg, public_key.version, encode_verify_key_base64(public_key),)
)
def format_for_config(public_key: nacl.signing.VerifyKey, expiry_ts: int):
print(
' "%s:%s": { key: "%s", expired_ts: %i }'
% (
public_key.alg,
public_key.version,
encode_verify_key_base64(public_key),
expiry_ts,
)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"key_file", nargs="+", type=argparse.FileType("r"), help="The key file to read",
)
parser.add_argument(
"-x",
action="store_true",
dest="for_config",
help="format the output for inclusion in the old_signing_keys config setting",
)
parser.add_argument(
"--expiry-ts",
type=int,
default=int(time.time() * 1000) + 6*3600000,
help=(
"The expiry time to use for -x, in milliseconds since 1970. The default "
"is (now+6h)."
),
)
args = parser.parse_args()
formatter = (
(lambda k: format_for_config(k, args.expiry_ts))
if args.for_config
else format_plain
)
keys = []
for file in args.key_file:
try:
res = read_signing_keys(file)
except Exception as e:
exit(
status=1,
message="Error reading key from file %s: %s %s"
% (file.name, type(e), e),
)
res = []
for key in res:
formatter(get_verify_key(key))
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
from synapse.config.logger import DEFAULT_LOG_CONFIG
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-o",
"--output-file",
type=argparse.FileType("w"),
default=sys.stdout,
help="File to write the configuration to. Default: stdout",
)
parser.add_argument(
"-f",
"--log-file",
type=str,
default="/var/log/matrix-synapse/homeserver.log",
help="name of the log file",
)
args = parser.parse_args()
args.output_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file))
+30 -35
View File
@@ -30,6 +30,7 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
@@ -50,12 +51,13 @@ from synapse.storage.data_stores.main.registration import (
from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
from synapse.storage.data_stores.main.state import StateBackgroundUpdateStore
from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.database import Database
from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
@@ -137,6 +139,7 @@ class Store(
RoomMemberBackgroundUpdateStore,
SearchBackgroundUpdateStore,
StateBackgroundUpdateStore,
MainStateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
@@ -163,25 +166,24 @@ class Store(
logger.exception("Failed to insert: %s", table)
raise
def set_room_is_public(self, room_id, is_public):
raise Exception(
"Attempt to set room_is_public during port_db: database not empty?"
)
class MockHomeserver:
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
def __init__(self, config):
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name
def get_db_conn(self):
return self.db_conn
def get_db_pool(self):
return self.db_pool
def get_clock(self):
return self.clock
def get_reactor(self):
return reactor
class Porter(object):
def __init__(self, **kwargs):
@@ -445,45 +447,36 @@ class Porter(object):
else:
return
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
k: v
for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
prepare_database(db_conn, database_engine, config=None)
def setup_db(self, db_config: DatabaseConnectionConfig, engine):
db_conn = make_conn(db_config, engine)
prepare_database(db_conn, engine, config=None)
db_conn.commit()
return db_conn
@defer.inlineCallbacks
def build_db_store(self, config):
def build_db_store(self, db_config: DatabaseConnectionConfig):
"""Builds and returns a database store using the provided configuration.
Args:
config: The database configuration, i.e. a dict following the structure of
the "database" section of Synapse's configuration file.
config: The database configuration
Returns:
The built Store object.
"""
engine = create_engine(config)
self.progress.set_state("Preparing %s" % db_config.config["name"])
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)
engine = create_engine(db_config.config)
conn = self.setup_db(db_config, engine)
db_pool = adbapi.ConnectionPool(config["name"], **config["args"])
hs = MockHomeserver(self.hs_config)
hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
store = Store(Database(hs), conn, hs)
store = Store(Database(hs, db_config, engine), conn, hs)
yield store.db.runInteraction(
"%s_engine.check_database" % config["name"], engine.check_database,
"%s_engine.check_database" % db_config.config["name"],
engine.check_database,
)
return store
@@ -509,7 +502,9 @@ class Porter(object):
@defer.inlineCallbacks
def run(self):
try:
self.sqlite_store = yield self.build_db_store(self.sqlite_config)
self.sqlite_store = yield self.build_db_store(
DatabaseConnectionConfig("master-sqlite", self.sqlite_config)
)
# Check if all background updates are done, abort if not.
updates_complete = (
@@ -524,7 +519,7 @@ class Porter(object):
defer.returnValue(None)
self.postgres_store = yield self.build_db_store(
self.hs_config.database_config
self.hs_config.get_single_database()
)
yield self.run_background_updates_on_postgres()
+1 -1
View File
@@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.7.0rc1"
__version__ = "1.8.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
+36 -69
View File
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Dict, Tuple
from six import itervalues
@@ -25,13 +26,7 @@ from twisted.internet import defer
import synapse.logging.opentracing as opentracing
import synapse.types
from synapse import event_auth
from synapse.api.constants import (
EventTypes,
JoinRules,
LimitBlockingTypes,
Membership,
UserTypes,
)
from synapse.api.constants import EventTypes, LimitBlockingTypes, Membership, UserTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -84,7 +79,7 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, room_version, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
auth_events_ids = yield self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
@@ -513,71 +508,43 @@ class Auth(object):
"""
return self.store.is_server_admin(user)
@defer.inlineCallbacks
def compute_auth_events(self, event, current_state_ids, for_verification=False):
def compute_auth_events(
self,
event,
current_state_ids: Dict[Tuple[str, str], str],
for_verification: bool = False,
):
"""Given an event and current state return the list of event IDs used
to auth an event.
If `for_verification` is False then only return auth events that
should be added to the event's `auth_events`.
Returns:
defer.Deferred(list[str]): List of event IDs.
"""
if event.type == EventTypes.Create:
return []
return defer.succeed([])
# Currently we ignore the `for_verification` flag even though there are
# some situations where we can drop particular auth events when adding
# to the event's `auth_events` (e.g. joins pointing to previous joins
# when room is publically joinable). Dropping event IDs has the
# advantage that the auth chain for the room grows slower, but we use
# the auth chain in state resolution v2 to order events, which means
# care must be taken if dropping events to ensure that it doesn't
# introduce undesirable "state reset" behaviour.
#
# All of which sounds a bit tricky so we don't bother for now.
auth_ids = []
for etype, state_key in event_auth.auth_types_for_event(event):
auth_ev_id = current_state_ids.get((etype, state_key))
if auth_ev_id:
auth_ids.append(auth_ev_id)
key = (EventTypes.PowerLevels, "")
power_level_event_id = current_state_ids.get(key)
if power_level_event_id:
auth_ids.append(power_level_event_id)
key = (EventTypes.JoinRules, "")
join_rule_event_id = current_state_ids.get(key)
key = (EventTypes.Member, event.sender)
member_event_id = current_state_ids.get(key)
key = (EventTypes.Create, "")
create_event_id = current_state_ids.get(key)
if create_event_id:
auth_ids.append(create_event_id)
if join_rule_event_id:
join_rule_event = yield self.store.get_event(join_rule_event_id)
join_rule = join_rule_event.content.get("join_rule")
is_public = join_rule == JoinRules.PUBLIC if join_rule else False
else:
is_public = False
if event.type == EventTypes.Member:
e_type = event.content["membership"]
if e_type in [Membership.JOIN, Membership.INVITE]:
if join_rule_event_id:
auth_ids.append(join_rule_event_id)
if e_type == Membership.JOIN:
if member_event_id and not is_public:
auth_ids.append(member_event_id)
else:
if member_event_id:
auth_ids.append(member_event_id)
if for_verification:
key = (EventTypes.Member, event.state_key)
existing_event_id = current_state_ids.get(key)
if existing_event_id:
auth_ids.append(existing_event_id)
if e_type == Membership.INVITE:
if "third_party_invite" in event.content:
key = (
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"],
)
third_party_invite_id = current_state_ids.get(key)
if third_party_invite_id:
auth_ids.append(third_party_invite_id)
elif member_event_id:
member_event = yield self.store.get_event(member_event_id)
if member_event.content["membership"] == Membership.JOIN:
auth_ids.append(member_event.event_id)
return auth_ids
return defer.succeed(auth_ids)
@defer.inlineCallbacks
def check_can_change_room_list(self, room_id, user):
-1
View File
@@ -29,7 +29,6 @@ FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client"
CONTENT_REPO_PREFIX = "/_matrix/content"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
+7 -3
View File
@@ -237,6 +237,12 @@ def start(hs, listeners=None):
"""
Start a Synapse server or worker.
Should be called once the reactor is running and (if we're using ACME) the
TLS certificates are in place.
Will start the main HTTP listeners and do some other startup tasks, and then
notify systemd.
Args:
hs (synapse.server.HomeServer)
listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
@@ -311,9 +317,7 @@ def setup_sdnotify(hs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
hs.get_reactor().addSystemEventTrigger(
"after", "startup", sdnotify, b"READY=1\nMAINPID=%i" % (os.getpid(),)
)
sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", sdnotify, b"STOPPING=1"
+4 -7
View File
@@ -45,7 +45,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.logcontext import LoggingContext
from synapse.util.versionstring import get_version_string
@@ -105,8 +104,10 @@ def export_data_command(hs, args):
user_id = args.user_id
directory = args.output_directory
res = yield hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
res = yield defer.ensureDeferred(
hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
)
print(res)
@@ -229,14 +230,10 @@ def start(config_options):
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = AdminCmdServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
-5
View File
@@ -34,7 +34,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -143,8 +142,6 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
if config.notify_appservices:
sys.stderr.write(
"\nThe appservices must be disabled in the main synapse process"
@@ -159,10 +156,8 @@ def start(config_options):
ps = AppserviceServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)
-5
View File
@@ -62,7 +62,6 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -181,14 +180,10 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
-5
View File
@@ -57,7 +57,6 @@ from synapse.rest.client.v1.room import (
)
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -180,14 +179,10 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
-5
View File
@@ -46,7 +46,6 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -162,14 +161,10 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
-5
View File
@@ -41,7 +41,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
@@ -174,8 +173,6 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
if config.send_federation:
sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process"
@@ -190,10 +187,8 @@ def start(config_options):
ss = FederationSenderServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
-5
View File
@@ -39,7 +39,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -234,14 +233,10 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
+5 -23
View File
@@ -39,7 +39,6 @@ import synapse
import synapse.config.logger
from synapse import events
from synapse.api.urls import (
CONTENT_REPO_PREFIX,
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
MEDIA_PREFIX,
@@ -65,11 +64,10 @@ from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.engines import IncorrectDatabaseSetup
from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
@@ -223,13 +221,7 @@ class SynapseHomeServer(HomeServer):
if self.get_config().enable_media_repo:
media_repo = self.get_media_repository_resource()
resources.update(
{
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
}
{MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo}
)
elif name == "media":
raise ConfigError(
@@ -318,7 +310,7 @@ def setup(config_options):
"Synapse Homeserver", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.stderr.write("\nERROR: %s\n" % (e,))
sys.exit(1)
if not config:
@@ -328,15 +320,10 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
hs = SynapseHomeServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
@@ -347,13 +334,8 @@ def setup(config_options):
hs.setup()
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
"Have you checked for version specific instructions in"
" UPGRADES.rst?\n"
)
sys.exit(1)
except UpgradeDatabaseException as e:
quit_with_error("Failed to upgrade database: %s" % (e,))
hs.setup_master()
+1 -10
View File
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -37,10 +37,8 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -83,9 +81,6 @@ class MediaRepositoryServer(HomeServer):
{
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
"/_synapse/admin": admin_resource,
}
)
@@ -157,14 +152,10 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
+6 -6
View File
@@ -33,10 +33,10 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -45,7 +45,11 @@ logger = logging.getLogger("synapse.app.pusher")
class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore
SlavedEventStore,
SlavedPusherStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
RoomStore,
):
update_pusher_last_stream_ordering_and_success = __func__(
DataStore.update_pusher_last_stream_ordering_and_success
@@ -198,14 +202,10 @@ def start(config_options):
# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True
database_engine = create_engine(config.database_config)
ps = PusherServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)
+12 -11
View File
@@ -48,14 +48,13 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
@@ -372,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users()
@defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows):
async def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
@@ -381,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
event = yield self.store.get_event(row.data.event_id)
assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
@@ -413,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
room_ids = await self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
@@ -437,14 +442,10 @@ def start(config_options):
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = SynchrotronServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
application_service_handler=SynchrotronApplicationService(),
)
-5
View File
@@ -44,7 +44,6 @@ from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -200,8 +199,6 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
if config.update_user_directory:
sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process"
@@ -216,10 +213,8 @@ def start(config_options):
ss = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
+91 -16
View File
@@ -12,12 +12,45 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from textwrap import indent
import yaml
from ._base import Config
from synapse.config._base import Config, ConfigError
logger = logging.getLogger(__name__)
class DatabaseConnectionConfig:
"""Contains the connection config for a particular database.
Args:
name: A label for the database, used for logging.
db_config: The config for a particular database, as per `database`
section of main config. Has three fields: `name` for database
module name, `args` for the args to give to the database
connector, and optional `data_stores` that is a list of stores to
provision on this database (defaulting to all).
"""
def __init__(self, name: str, db_config: dict):
if db_config["name"] not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_config["name"],))
if db_config["name"] == "sqlite3":
db_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)
data_stores = db_config.get("data_stores")
if data_stores is None:
data_stores = ["main", "state"]
self.name = name
self.config = db_config
self.data_stores = data_stores
class DatabaseConfig(Config):
@@ -26,22 +59,43 @@ class DatabaseConfig(Config):
def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))
self.database_config = config.get("database")
# We *experimentally* support specifying multiple databases via the
# `databases` key. This is a map from a label to database config in the
# same format as the `database` config option, plus an extra
# `data_stores` key to specify which data store goes where. For example:
#
# databases:
# master:
# name: psycopg2
# data_stores: ["main"]
# args: {}
# state:
# name: psycopg2
# data_stores: ["state"]
# args: {}
if self.database_config is None:
self.database_config = {"name": "sqlite3", "args": {}}
multi_database_config = config.get("databases")
database_config = config.get("database")
if multi_database_config and database_config:
raise ConfigError("Can't specify both 'database' and 'datbases' in config")
if multi_database_config:
if config.get("database_path"):
raise ConfigError("Can't specify 'database_path' with 'databases'")
self.databases = [
DatabaseConnectionConfig(name, db_conf)
for name, db_conf in multi_database_config.items()
]
name = self.database_config.get("name", None)
if name == "psycopg2":
pass
elif name == "sqlite3":
self.database_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)
else:
raise RuntimeError("Unsupported database type '%s'" % (name,))
if database_config is None:
database_config = {"name": "sqlite3", "args": {}}
self.set_databasepath(config.get("database_path"))
self.databases = [DatabaseConnectionConfig("master", database_config)]
self.set_databasepath(config.get("database_path"))
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
if not database_conf:
@@ -76,11 +130,24 @@ class DatabaseConfig(Config):
self.set_databasepath(args.database_path)
def set_databasepath(self, database_path):
if database_path is None:
return
if database_path != ":memory:":
database_path = self.abspath(database_path)
if self.database_config.get("name", None) == "sqlite3":
if database_path is not None:
self.database_config["args"]["database"] = database_path
# We only support setting a database path if we have a single sqlite3
# database.
if len(self.databases) != 1:
raise ConfigError("Cannot specify 'database_path' with multiple databases")
database = self.get_single_database()
if database.config["name"] != "sqlite3":
# We don't raise here as we haven't done so before for this case.
logger.warn("Ignoring 'database_path' for non-sqlite3 database")
return
database.config["args"]["database"] = database_path
@staticmethod
def add_arguments(parser):
@@ -91,3 +158,11 @@ class DatabaseConfig(Config):
metavar="SQLITE_DATABASE_PATH",
help="The path to a sqlite database to use.",
)
def get_single_database(self) -> DatabaseConnectionConfig:
"""Returns the database if there is only one, useful for e.g. tests
"""
if len(self.databases) != 1:
raise Exception("More than one database exists")
return self.databases[0]
+2 -1
View File
@@ -21,6 +21,7 @@ from __future__ import print_function
import email.utils
import os
from enum import Enum
from typing import Optional
import pkg_resources
@@ -101,7 +102,7 @@ class EmailConfig(Config):
# both in RegistrationConfig and here. We should factor this bit out
self.account_threepid_delegate_email = self.trusted_third_party_id_servers[
0
]
] # type: Optional[str]
self.using_identity_server_from_trusted_list = True
else:
raise ConfigError(
+15 -8
View File
@@ -108,7 +108,7 @@ class KeyConfig(Config):
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})
config.get("old_signing_keys")
)
self.key_refresh_interval = self.parse_duration(
config.get("key_refresh_interval", "1d")
@@ -199,14 +199,19 @@ class KeyConfig(Config):
signing_key_path: "%(base_key_name)s.signing.key"
# The keys that the server used to sign messages with but won't use
# to sign new messages. E.g. it has lost its private key
# to sign new messages.
#
#old_signing_keys:
# "ed25519:auto":
# # Base64 encoded public key
# key: "The public part of your old signing key."
# # Millisecond POSIX timestamp when the key expired.
# expired_ts: 123456789123
old_signing_keys:
# For each key, `key` should be the base64-encoded public key, and
# `expired_ts`should be the time (in milliseconds since the unix epoch) that
# it was last used.
#
# It is possible to build an entry from an old signing.key file using the
# `export_signing_key` script which is provided with synapse.
#
# For example:
#
#"ed25519:id": { key: "base64string", expired_ts: 123456789123 }
# How long key response published by this server is valid for.
# Used to set the valid_until_ts in /key/v2 APIs.
@@ -290,6 +295,8 @@ class KeyConfig(Config):
raise ConfigError("Error reading %s: %s" % (name, str(e)))
def read_old_signing_keys(self, old_signing_keys):
if old_signing_keys is None:
return {}
keys = {}
for key_id, key_data in old_signing_keys.items():
if is_signing_algorithm_supported(key_id):
+23 -3
View File
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import logging.config
import os
@@ -37,10 +37,17 @@ from synapse.logging._structured import (
from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
from ._base import Config
from ._base import Config, ConfigError
DEFAULT_LOG_CONFIG = Template(
"""
"""\
# Log configuration for Synapse.
#
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
version: 1
formatters:
@@ -81,11 +88,18 @@ disable_existing_loggers: false
"""
)
LOG_FILE_ERROR = """\
Support for the log_file configuration option and --log-file command-line option was
removed in Synapse 1.3.0. You should instead set up a separate log configuration file.
"""
class LoggingConfig(Config):
section = "logging"
def read_config(self, config, **kwargs):
if config.get("log_file"):
raise ConfigError(LOG_FILE_ERROR)
self.log_config = self.abspath(config.get("log_config"))
self.no_redirect_stdio = config.get("no_redirect_stdio", False)
@@ -106,6 +120,8 @@ class LoggingConfig(Config):
def read_arguments(self, args):
if args.no_redirect_stdio is not None:
self.no_redirect_stdio = args.no_redirect_stdio
if args.log_file is not None:
raise ConfigError(LOG_FILE_ERROR)
@staticmethod
def add_arguments(parser):
@@ -118,6 +134,10 @@ class LoggingConfig(Config):
help="Do not redirect stdout/stderr to the log",
)
logging_group.add_argument(
"-f", "--log-file", dest="log_file", help=argparse.SUPPRESS,
)
def generate_files(self, config, config_dir_path):
log_config = config.get("log_config")
if log_config and not os.path.exists(log_config):
+1 -2
View File
@@ -83,10 +83,9 @@ class RatelimitConfig(Config):
)
rc_admin_redaction = config.get("rc_admin_redaction")
self.rc_admin_redaction = None
if rc_admin_redaction:
self.rc_admin_redaction = RateLimitConfig(rc_admin_redaction)
else:
self.rc_admin_redaction = None
def generate_config_section(self, **kwargs):
return """\
-5
View File
@@ -156,7 +156,6 @@ class ContentRepositoryConfig(Config):
(provider_class, parsed_config, wrapper_config)
)
self.uploads_path = self.ensure_directory(config.get("uploads_path", "uploads"))
self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements(
config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES)
@@ -231,10 +230,6 @@ class ContentRepositoryConfig(Config):
# config:
# directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
#
uploads_path: "%(uploads_path)s"
# The largest allowed upload size in bytes
#
#max_upload_size: 10M
+14 -1
View File
@@ -102,6 +102,12 @@ class ServerConfig(Config):
"require_auth_for_profile_requests", False
)
# Whether to require sharing a room with a user to retrieve their
# profile data
self.limit_profile_requests_to_users_who_share_rooms = config.get(
"limit_profile_requests_to_users_who_share_rooms", False,
)
if "restrict_public_rooms_to_local_users" in config and (
"allow_public_rooms_without_auth" in config
or "allow_public_rooms_over_federation" in config
@@ -200,7 +206,7 @@ class ServerConfig(Config):
self.admin_contact = config.get("admin_contact", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
self.federation_domain_whitelist = None # type: Optional[dict]
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
if federation_domain_whitelist is not None:
@@ -621,6 +627,13 @@ class ServerConfig(Config):
#
#require_auth_for_profile_requests: true
# Uncomment to require a user to share a room with another user in order
# to retrieve their profile information. Only checked on Client-Server
# requests. Profile requests from other servers should be checked by the
# requesting server. Defaults to 'false'.
#
#limit_profile_requests_to_users_who_share_rooms: true
# If set to 'true', removes the need for authentication to access the server's
# public rooms directory through the client API, meaning that anyone can
# query the room directory. Defaults to 'false'.
+6 -3
View File
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections.abc
import hashlib
import logging
@@ -40,8 +40,11 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
if not isinstance(hashes, dict):
raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
# nb it might be a frozendict or a dict
if not isinstance(hashes, collections.abc.Mapping):
raise SynapseError(
400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED
)
if name not in hashes:
raise SynapseError(
+7 -6
View File
@@ -511,17 +511,18 @@ class BaseV2KeyFetcher(object):
server_name = response_json["server_name"]
verified = False
for key_id in response_json["signatures"].get(server_name, {}):
# each of the keys used for the signature must be present in the response
# json.
key = verify_keys.get(key_id)
if not key:
raise KeyLookupError(
"Key response is signed by key id %s:%s but that key is not "
"present in the response" % (server_name, key_id)
)
# the key may not be present in verify_keys if:
# * we got the key from the notary server, and:
# * the key belongs to the notary server, and:
# * the notary server is using a different key to sign notary
# responses.
continue
verify_signed_json(response_json, server_name, key.verify_key)
verified = True
break
if not verified:
raise KeyLookupError(
+20 -7
View File
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Set, Tuple
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
@@ -50,6 +51,18 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
if not hasattr(event, "room_id"):
raise AuthError(500, "Event has no room_id: %s" % event)
room_id = event.room_id
# I'm not really expecting to get auth events in the wrong room, but let's
# sanity-check it
for auth_event in auth_events.values():
if auth_event.room_id != room_id:
raise Exception(
"During auth for event %s in room %s, found event %s in the state "
"which is in room %s"
% (event.event_id, room_id, auth_event.event_id, auth_event.room_id)
)
if do_sig_check:
sender_domain = get_domain_from_id(event.sender)
@@ -621,7 +634,7 @@ def get_public_keys(invite_event):
return public_keys
def auth_types_for_event(event):
def auth_types_for_event(event) -> Set[Tuple[str]]:
"""Given an event, return a list of (EventType, StateKey) that may be
needed to auth the event. The returned list may be a superset of what
would actually be required depending on the full state of the room.
@@ -630,20 +643,20 @@ def auth_types_for_event(event):
actually auth the event.
"""
if event.type == EventTypes.Create:
return []
return set()
auth_types = [
auth_types = {
(EventTypes.PowerLevels, ""),
(EventTypes.Member, event.sender),
(EventTypes.Create, ""),
]
}
if event.type == EventTypes.Member:
membership = event.content["membership"]
if membership in [Membership.JOIN, Membership.INVITE]:
auth_types.append((EventTypes.JoinRules, ""))
auth_types.add((EventTypes.JoinRules, ""))
auth_types.append((EventTypes.Member, event.state_key))
auth_types.add((EventTypes.Member, event.state_key))
if membership == Membership.INVITE:
if "third_party_invite" in event.content:
@@ -651,6 +664,6 @@ def auth_types_for_event(event):
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"],
)
auth_types.append(key)
auth_types.add(key)
return auth_types
+21 -15
View File
@@ -149,7 +149,7 @@ class EventContext:
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_ids = yield self.get_prev_state_ids()
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
@@ -167,12 +167,13 @@ class EventContext:
}
@staticmethod
def deserialize(store, input):
def deserialize(storage, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
Args:
store (DataStore): Used to convert AS ID to AS object
storage (Storage): Used to convert AS ID to AS object and fetch
state.
input (dict): A dict produced by `serialize`
Returns:
@@ -181,6 +182,7 @@ class EventContext:
context = _AsyncEventContextImpl(
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
storage=storage,
prev_state_id=input["prev_state_id"],
event_type=input["event_type"],
event_state_key=input["event_state_key"],
@@ -193,7 +195,7 @@ class EventContext:
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)
context.app_service = storage.main.get_app_service_by_id(app_service_id)
return context
@@ -216,7 +218,7 @@ class EventContext:
return self._state_group
@defer.inlineCallbacks
def get_current_state_ids(self, store):
def get_current_state_ids(self):
"""
Gets the room state map, including this event - ie, the state in ``state_group``
@@ -234,11 +236,11 @@ class EventContext:
if self.rejected:
raise RuntimeError("Attempt to access state_ids of rejected event")
yield self._ensure_fetched(store)
yield self._ensure_fetched()
return self._current_state_ids
@defer.inlineCallbacks
def get_prev_state_ids(self, store):
def get_prev_state_ids(self):
"""
Gets the room state map, excluding this event.
@@ -250,7 +252,7 @@ class EventContext:
Maps a (type, state_key) to the event ID of the state event matching
this tuple.
"""
yield self._ensure_fetched(store)
yield self._ensure_fetched()
return self._prev_state_ids
def get_cached_current_state_ids(self):
@@ -270,7 +272,7 @@ class EventContext:
return self._current_state_ids
def _ensure_fetched(self, store):
def _ensure_fetched(self):
return defer.succeed(None)
@@ -282,6 +284,8 @@ class _AsyncEventContextImpl(EventContext):
Attributes:
_storage (Storage)
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
been calculated. None if we haven't started calculating yet
@@ -295,28 +299,30 @@ class _AsyncEventContextImpl(EventContext):
that was replaced.
"""
# This needs to have a default as we're inheriting
_storage = attr.ib(default=None)
_prev_state_id = attr.ib(default=None)
_event_type = attr.ib(default=None)
_event_state_key = attr.ib(default=None)
_fetching_state_deferred = attr.ib(default=None)
def _ensure_fetched(self, store):
def _ensure_fetched(self):
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store
)
self._fetching_state_deferred = run_in_background(self._fill_out_state)
return make_deferred_yieldable(self._fetching_state_deferred)
@defer.inlineCallbacks
def _fill_out_state(self, store):
def _fill_out_state(self):
"""Called to populate the _current_state_ids and _prev_state_ids
attributes by loading from the database.
"""
if self.state_group is None:
return
self._current_state_ids = yield store.get_state_ids_for_group(self.state_group)
self._current_state_ids = yield self._storage.state.get_state_ids_for_group(
self.state_group
)
if self._prev_state_id and self._event_state_key is not None:
self._prev_state_ids = dict(self._current_state_ids)
+1 -1
View File
@@ -53,7 +53,7 @@ class ThirdPartyEventRules(object):
if self.third_party_rules is None:
return True
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
# Retrieve the state events from the database.
state_events = {}
+76 -12
View File
@@ -526,13 +526,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
content = yield self._do_send_join(destination, pdu)
logger.debug("Got content: %s", content)
@@ -599,6 +593,44 @@ class FederationClient(FederationBase):
return self._try_destination_list("send_join", destinations, send_request)
@defer.inlineCallbacks
def _do_send_join(self, destination, pdu):
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
resp = yield self.transport_layer.send_join_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
room_version = yield self.store.get_room_version(room_id)
@@ -708,18 +740,50 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_leave(
content = yield self._do_send_leave(destination, pdu)
logger.debug("Got content: %s", content)
return None
return self._try_destination_list("send_leave", destinations, send_request)
@defer.inlineCallbacks
def _do_send_leave(self, destination, pdu):
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_leave_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
logger.debug("Got content: %s", content)
return None
return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
return self._try_destination_list("send_leave", destinations, send_request)
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
resp = yield self.transport_layer.send_leave_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]
def get_public_rooms(
self,
+5 -10
View File
@@ -384,15 +384,10 @@ class FederationServer(FederationBase):
res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
return (
200,
{
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
],
},
)
return {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]],
}
async def on_make_leave_request(self, origin, room_id, user_id):
origin_host, _ = parse_server_name(origin)
@@ -419,7 +414,7 @@ class FederationServer(FederationBase):
pdu = await self._check_sigs_and_hash(room_version, pdu)
await self.handler.on_send_leave_request(origin, pdu)
return 200, {}
return {}
async def on_event_auth(self, origin, room_id, event_id):
with (await self._server_linearizer.queue((origin, room_id))):
+31 -2
View File
@@ -243,7 +243,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
def send_join_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
response = yield self.client.put_json(
@@ -254,7 +254,18 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_leave(self, destination, room_id, event_id, content):
def send_join_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination, path=path, data=content
)
return response
@defer.inlineCallbacks
@log_function
def send_leave_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
response = yield self.client.put_json(
@@ -270,6 +281,24 @@ class TransportLayerClient(object):
return response
@defer.inlineCallbacks
@log_function
def send_leave_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
# we want to do our best to send this through. The problem is
# that if it fails, we won't retry it later, so if the remote
# server was just having a momentary blip, the room will be out of
# sync.
ignore_backoff=True,
)
return response
@defer.inlineCallbacks
@log_function
def send_invite_v1(self, destination, room_id, event_id, content):
+28 -4
View File
@@ -506,9 +506,19 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
return 200, content
class FederationSendLeaveServlet(BaseFederationServlet):
class FederationV1SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, (200, content)
class FederationV2SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
PREFIX = FEDERATION_V2_PREFIX
async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, content
@@ -521,9 +531,21 @@ class FederationEventAuthServlet(BaseFederationServlet):
return await self.handler.on_event_auth(origin, context, event_id)
class FederationSendJoinServlet(BaseFederationServlet):
class FederationV1SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = await self.handler.on_send_join_request(origin, content, context)
return 200, (200, content)
class FederationV2SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
PREFIX = FEDERATION_V2_PREFIX
async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
@@ -1367,8 +1389,10 @@ FEDERATION_SERVLET_CLASSES = (
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
FederationEventServlet,
FederationSendJoinServlet,
FederationSendLeaveServlet,
FederationV1SendJoinServlet,
FederationV2SendJoinServlet,
FederationV1SendLeaveServlet,
FederationV2SendLeaveServlet,
FederationV1InviteServlet,
FederationV2InviteServlet,
FederationQueryAuthServlet,
+5
View File
@@ -773,6 +773,11 @@ class GroupsServerHandler(object):
if not self.hs.is_mine_id(user_id):
yield self.store.maybe_delete_remote_profile_cache(user_id)
# Delete group if the last user has left
users = yield self.store.get_users_in_group(group_id, include_private=True)
if not users:
yield self.store.delete_group(group_id)
return {}
@defer.inlineCallbacks
+1 -1
View File
@@ -134,7 +134,7 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = yield context.get_current_state_ids()
current_state = yield self.store.get_events(
list(current_state_ids.values())
)
+4 -11
View File
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
class AccountDataEventSource(object):
def __init__(self, hs):
@@ -23,15 +21,14 @@ class AccountDataEventSource(object):
def get_current_key(self, direction="f"):
return self.store.get_max_account_data_stream_id()
@defer.inlineCallbacks
def get_new_events(self, user, from_key, **kwargs):
async def get_new_events(self, user, from_key, **kwargs):
user_id = user.to_string()
last_stream_id = from_key
current_stream_id = yield self.store.get_max_account_data_stream_id()
current_stream_id = self.store.get_max_account_data_stream_id()
results = []
tags = yield self.store.get_updated_tags(user_id, last_stream_id)
tags = await self.store.get_updated_tags(user_id, last_stream_id)
for room_id, room_tags in tags.items():
results.append(
@@ -41,7 +38,7 @@ class AccountDataEventSource(object):
(
account_data,
room_account_data,
) = yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
) = await self.store.get_updated_account_data_for_user(user_id, last_stream_id)
for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content})
@@ -53,7 +50,3 @@ class AccountDataEventSource(object):
)
return results, current_stream_id
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
return [], config.to_id
+40 -46
View File
@@ -18,8 +18,7 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from twisted.internet import defer
from typing import List
from synapse.api.errors import StoreError
from synapse.logging.context import make_deferred_yieldable
@@ -78,42 +77,39 @@ class AccountValidityHandler(object):
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"send_renewals", self.send_renewal_emails
"send_renewals", self._send_renewal_emails
)
self.clock.looping_call(send_emails, 30 * 60 * 1000)
@defer.inlineCallbacks
def send_renewal_emails(self):
async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
configured in the ``renew_at`` parameter from the ``account_validity``
configuration, and sends renewal emails to all of these users as long as they
have an email 3PID attached to their account.
"""
expiring_users = yield self.store.get_users_expiring_soon()
expiring_users = await self.store.get_users_expiring_soon()
if expiring_users:
for user in expiring_users:
yield self._send_renewal_email(
await self._send_renewal_email(
user_id=user["user_id"], expiration_ts=user["expiration_ts_ms"]
)
@defer.inlineCallbacks
def send_renewal_email_to_user(self, user_id):
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
yield self._send_renewal_email(user_id, expiration_ts)
async def send_renewal_email_to_user(self, user_id: str):
expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
await self._send_renewal_email(user_id, expiration_ts)
@defer.inlineCallbacks
def _send_renewal_email(self, user_id, expiration_ts):
async def _send_renewal_email(self, user_id: str, expiration_ts: int):
"""Sends out a renewal email to every email address attached to the given user
with a unique link allowing them to renew their account.
Args:
user_id (str): ID of the user to send email(s) to.
expiration_ts (int): Timestamp in milliseconds for the expiration date of
user_id: ID of the user to send email(s) to.
expiration_ts: Timestamp in milliseconds for the expiration date of
this user's account (used in the email templates).
"""
addresses = yield self._get_email_addresses_for_user(user_id)
addresses = await self._get_email_addresses_for_user(user_id)
# Stop right here if the user doesn't have at least one email address.
# In this case, they will have to ask their server admin to renew their
@@ -125,7 +121,7 @@ class AccountValidityHandler(object):
return
try:
user_display_name = yield self.store.get_profile_displayname(
user_display_name = await self.store.get_profile_displayname(
UserID.from_string(user_id).localpart
)
if user_display_name is None:
@@ -133,7 +129,7 @@ class AccountValidityHandler(object):
except StoreError:
user_display_name = user_id
renewal_token = yield self._get_renewal_token(user_id)
renewal_token = await self._get_renewal_token(user_id)
url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % (
self.hs.config.public_baseurl,
renewal_token,
@@ -165,7 +161,7 @@ class AccountValidityHandler(object):
logger.info("Sending renewal email to %s", address)
yield make_deferred_yieldable(
await make_deferred_yieldable(
self.sendmail(
self.hs.config.email_smtp_host,
self._raw_from,
@@ -180,19 +176,18 @@ class AccountValidityHandler(object):
)
)
yield self.store.set_renewal_mail_status(user_id=user_id, email_sent=True)
await self.store.set_renewal_mail_status(user_id=user_id, email_sent=True)
@defer.inlineCallbacks
def _get_email_addresses_for_user(self, user_id):
async def _get_email_addresses_for_user(self, user_id: str) -> List[str]:
"""Retrieve the list of email addresses attached to a user's account.
Args:
user_id (str): ID of the user to lookup email addresses for.
user_id: ID of the user to lookup email addresses for.
Returns:
defer.Deferred[list[str]]: Email addresses for this account.
Email addresses for this account.
"""
threepids = yield self.store.user_get_threepids(user_id)
threepids = await self.store.user_get_threepids(user_id)
addresses = []
for threepid in threepids:
@@ -201,16 +196,15 @@ class AccountValidityHandler(object):
return addresses
@defer.inlineCallbacks
def _get_renewal_token(self, user_id):
async def _get_renewal_token(self, user_id: str) -> str:
"""Generates a 32-byte long random string that will be inserted into the
user's renewal email's unique link, then saves it into the database.
Args:
user_id (str): ID of the user to generate a string for.
user_id: ID of the user to generate a string for.
Returns:
defer.Deferred[str]: The generated string.
The generated string.
Raises:
StoreError(500): Couldn't generate a unique string after 5 attempts.
@@ -219,52 +213,52 @@ class AccountValidityHandler(object):
while attempts < 5:
try:
renewal_token = stringutils.random_string(32)
yield self.store.set_renewal_token_for_user(user_id, renewal_token)
await self.store.set_renewal_token_for_user(user_id, renewal_token)
return renewal_token
except StoreError:
attempts += 1
raise StoreError(500, "Couldn't generate a unique string as refresh string.")
@defer.inlineCallbacks
def renew_account(self, renewal_token):
async def renew_account(self, renewal_token: str) -> bool:
"""Renews the account attached to a given renewal token by pushing back the
expiration date by the current validity period in the server's configuration.
Args:
renewal_token (str): Token sent with the renewal request.
renewal_token: Token sent with the renewal request.
Returns:
bool: Whether the provided token is valid.
Whether the provided token is valid.
"""
try:
user_id = yield self.store.get_user_from_renewal_token(renewal_token)
user_id = await self.store.get_user_from_renewal_token(renewal_token)
except StoreError:
defer.returnValue(False)
return False
logger.debug("Renewing an account for user %s", user_id)
yield self.renew_account_for_user(user_id)
await self.renew_account_for_user(user_id)
defer.returnValue(True)
return True
@defer.inlineCallbacks
def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False):
async def renew_account_for_user(
self, user_id: str, expiration_ts: int = None, email_sent: bool = False
) -> int:
"""Renews the account attached to a given user by pushing back the
expiration date by the current validity period in the server's
configuration.
Args:
renewal_token (str): Token sent with the renewal request.
expiration_ts (int): New expiration date. Defaults to now + validity period.
email_sent (bool): Whether an email has been sent for this validity period.
renewal_token: Token sent with the renewal request.
expiration_ts: New expiration date. Defaults to now + validity period.
email_sen: Whether an email has been sent for this validity period.
Defaults to False.
Returns:
defer.Deferred[int]: New expiration date for this account, as a timestamp
in milliseconds since epoch.
New expiration date for this account, as a timestamp in
milliseconds since epoch.
"""
if expiration_ts is None:
expiration_ts = self.clock.time_msec() + self._account_validity.period
yield self.store.set_account_validity_for_user(
await self.store.set_account_validity_for_user(
user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
)
+17 -24
View File
@@ -15,8 +15,6 @@
import logging
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.types import RoomStreamToken
from synapse.visibility import filter_events_for_client
@@ -33,11 +31,10 @@ class AdminHandler(BaseHandler):
self.storage = hs.get_storage()
self.state_store = self.storage.state
@defer.inlineCallbacks
def get_whois(self, user):
async def get_whois(self, user):
connections = []
sessions = yield self.store.get_user_ip_and_agents(user)
sessions = await self.store.get_user_ip_and_agents(user)
for session in sessions:
connections.append(
{
@@ -54,20 +51,18 @@ class AdminHandler(BaseHandler):
return ret
@defer.inlineCallbacks
def get_users(self):
async def get_users(self):
"""Function to retrieve a list of users in users table.
Args:
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = yield self.store.get_users()
ret = await self.store.get_users()
return ret
@defer.inlineCallbacks
def get_users_paginate(self, start, limit, name, guests, deactivated):
async def get_users_paginate(self, start, limit, name, guests, deactivated):
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users.
@@ -80,14 +75,13 @@ class AdminHandler(BaseHandler):
Returns:
defer.Deferred: resolves to json list[dict[str, Any]]
"""
ret = yield self.store.get_users_paginate(
ret = await self.store.get_users_paginate(
start, limit, name, guests, deactivated
)
return ret
@defer.inlineCallbacks
def search_users(self, term):
async def search_users(self, term):
"""Function to search users list for one or more users with
the matched term.
@@ -96,7 +90,7 @@ class AdminHandler(BaseHandler):
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = yield self.store.search_users(term)
ret = await self.store.search_users(term)
return ret
@@ -119,8 +113,7 @@ class AdminHandler(BaseHandler):
"""
return self.store.set_server_admin(user, admin)
@defer.inlineCallbacks
def export_user_data(self, user_id, writer):
async def export_user_data(self, user_id, writer):
"""Write all data we have on the user to the given writer.
Args:
@@ -132,7 +125,7 @@ class AdminHandler(BaseHandler):
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = yield self.store.get_rooms_for_user_where_membership_is(
rooms = await self.store.get_rooms_for_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
@@ -145,7 +138,7 @@ class AdminHandler(BaseHandler):
# We only try and fetch events for rooms the user has been in. If
# they've been e.g. invited to a room without joining then we handle
# those seperately.
rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id)
rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)
for index, room in enumerate(rooms):
room_id = room.room_id
@@ -154,7 +147,7 @@ class AdminHandler(BaseHandler):
"[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
)
forgotten = yield self.store.did_forget(user_id, room_id)
forgotten = await self.store.did_forget(user_id, room_id)
if forgotten:
logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
continue
@@ -166,7 +159,7 @@ class AdminHandler(BaseHandler):
if room.membership == Membership.INVITE:
event_id = room.event_id
invite = yield self.store.get_event(event_id, allow_none=True)
invite = await self.store.get_event(event_id, allow_none=True)
if invite:
invited_state = invite.unsigned["invite_room_state"]
writer.write_invite(room_id, invite, invited_state)
@@ -177,7 +170,7 @@ class AdminHandler(BaseHandler):
# were joined. We estimate that point by looking at the
# stream_ordering of the last membership if it wasn't a join.
if room.membership == Membership.JOIN:
stream_ordering = yield self.store.get_room_max_stream_ordering()
stream_ordering = self.store.get_room_max_stream_ordering()
else:
stream_ordering = room.stream_ordering
@@ -203,7 +196,7 @@ class AdminHandler(BaseHandler):
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = yield self.store.paginate_room_events(
events, _ = await self.store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction="f"
)
if not events:
@@ -211,7 +204,7 @@ class AdminHandler(BaseHandler):
from_key = events[-1].internal_metadata.after
events = yield filter_events_for_client(self.storage, user_id, events)
events = await filter_events_for_client(self.storage, user_id, events)
writer.write_events(room_id, events)
@@ -247,7 +240,7 @@ class AdminHandler(BaseHandler):
for event_id in extremities:
if not event_to_unseen_prevs[event_id]:
continue
state = yield self.state_store.get_state_for_event(event_id)
state = await self.state_store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state)
return writer.finished()
+24 -30
View File
@@ -15,8 +15,6 @@
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
@@ -46,8 +44,7 @@ class DeactivateAccountHandler(BaseHandler):
self._account_validity_enabled = hs.config.account_validity.enabled
@defer.inlineCallbacks
def deactivate_account(self, user_id, erase_data, id_server=None):
async def deactivate_account(self, user_id, erase_data, id_server=None):
"""Deactivate a user's account
Args:
@@ -74,11 +71,11 @@ class DeactivateAccountHandler(BaseHandler):
identity_server_supports_unbinding = True
# Retrieve the 3PIDs this user has bound to an identity server
threepids = yield self.store.user_get_bound_threepids(user_id)
threepids = await self.store.user_get_bound_threepids(user_id)
for threepid in threepids:
try:
result = yield self._identity_handler.try_unbind_threepid(
result = await self._identity_handler.try_unbind_threepid(
user_id,
{
"medium": threepid["medium"],
@@ -91,33 +88,33 @@ class DeactivateAccountHandler(BaseHandler):
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
raise SynapseError(400, "Failed to remove threepid from ID server")
yield self.store.user_delete_threepid(
await self.store.user_delete_threepid(
user_id, threepid["medium"], threepid["address"]
)
# Remove all 3PIDs this user has bound to the homeserver
yield self.store.user_delete_threepids(user_id)
await self.store.user_delete_threepids(user_id)
# delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
await self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
await self._auth_handler.delete_access_tokens_for_user(user_id)
yield self.store.user_set_password_hash(user_id, None)
await self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
yield self.store.add_user_pending_deactivation(user_id)
await self.store.add_user_pending_deactivation(user_id)
# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
await self.user_directory_handler.handle_user_deactivated(user_id)
# Mark the user as erased, if they asked for that
if erase_data:
logger.info("Marking %s as erased", user_id)
yield self.store.mark_user_erased(user_id)
await self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
@@ -125,30 +122,29 @@ class DeactivateAccountHandler(BaseHandler):
# Reject all pending invites for the user, so that the user doesn't show up in the
# "invited" section of rooms' members list.
yield self._reject_pending_invites_for_user(user_id)
await self._reject_pending_invites_for_user(user_id)
# Remove all information on the user from the account_validity table.
if self._account_validity_enabled:
yield self.store.delete_account_validity_for_user(user_id)
await self.store.delete_account_validity_for_user(user_id)
# Mark the user as deactivated.
yield self.store.set_user_deactivated_status(user_id, True)
await self.store.set_user_deactivated_status(user_id, True)
return identity_server_supports_unbinding
@defer.inlineCallbacks
def _reject_pending_invites_for_user(self, user_id):
async def _reject_pending_invites_for_user(self, user_id):
"""Reject pending invites addressed to a given user ID.
Args:
user_id (str): The user ID to reject pending invites for.
"""
user = UserID.from_string(user_id)
pending_invites = yield self.store.get_invited_rooms_for_user(user_id)
pending_invites = await self.store.get_invited_rooms_for_user(user_id)
for room in pending_invites:
try:
yield self._room_member_handler.update_membership(
await self._room_member_handler.update_membership(
create_requester(user),
user,
room.room_id,
@@ -180,8 +176,7 @@ class DeactivateAccountHandler(BaseHandler):
if not self._user_parter_running:
run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
async def _user_parter_loop(self):
"""Loop that parts deactivated users from rooms
Returns:
@@ -191,19 +186,18 @@ class DeactivateAccountHandler(BaseHandler):
logger.info("Starting user parter")
try:
while True:
user_id = yield self.store.get_user_pending_deactivation()
user_id = await self.store.get_user_pending_deactivation()
if user_id is None:
break
logger.info("User parter parting %r", user_id)
yield self._part_user(user_id)
yield self.store.del_user_pending_deactivation(user_id)
await self._part_user(user_id)
await self.store.del_user_pending_deactivation(user_id)
logger.info("User parter finished parting %r", user_id)
logger.info("User parter finished: stopping")
finally:
self._user_parter_running = False
@defer.inlineCallbacks
def _part_user(self, user_id):
async def _part_user(self, user_id):
"""Causes the given user_id to leave all the rooms they're joined to
Returns:
@@ -211,11 +205,11 @@ class DeactivateAccountHandler(BaseHandler):
"""
user = UserID.from_string(user_id)
rooms_for_user = yield self.store.get_rooms_for_user(user_id)
rooms_for_user = await self.store.get_rooms_for_user(user_id)
for room_id in rooms_for_user:
logger.info("User parter parting %r from %r", user_id, room_id)
try:
yield self._room_member_handler.update_membership(
await self._room_member_handler.update_membership(
create_requester(user),
user,
room_id,
+236 -287
View File
@@ -19,7 +19,7 @@
import itertools
import logging
from typing import Dict, Iterable, Optional, Sequence, Tuple
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import six
from six import iteritems, itervalues
@@ -63,9 +63,9 @@ from synapse.replication.http.federation import (
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import UserID, get_domain_from_id
from synapse.util import batch_iter, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -164,8 +164,7 @@ class FederationHandler(BaseHandler):
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -175,17 +174,15 @@ class FederationHandler(BaseHandler):
pdu (FrozenEvent): received PDU
sent_to_us_directly (bool): True if this event was pushed to us; False if
we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
room_id = pdu.room_id
event_id = pdu.event_id
logger.info("[%s %s] handling received PDU: %s", room_id, event_id, pdu)
logger.info("handling received PDU: %s", pdu)
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
existing = await self.store.get_event(
event_id, allow_none=True, allow_rejected=True
)
@@ -229,7 +226,7 @@ class FederationHandler(BaseHandler):
#
# Note that if we were never in the room then we would have already
# dropped the event, since we wouldn't know the room version.
is_in_room = yield self.auth.check_host_in_room(room_id, self.server_name)
is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
if not is_in_room:
logger.info(
"[%s %s] Ignoring PDU from %s as we're not in the room",
@@ -240,25 +237,24 @@ class FederationHandler(BaseHandler):
return None
state = None
auth_chain = []
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = yield self.get_min_depth_for_context(pdu.room_id)
min_depth = await self.get_min_depth_for_context(pdu.room_id)
logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
prevs = set(pdu.prev_event_ids())
seen = yield self.store.have_seen_events(prevs)
seen = await self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
if min_depth is not None and pdu.depth < min_depth:
# This is so that we don't notify the user about this
# message, to work around the fact that some events will
# reference really really old events we really don't want to
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
elif min_depth is not None and pdu.depth > min_depth:
missing_prevs = prevs - seen
if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
@@ -270,7 +266,7 @@ class FederationHandler(BaseHandler):
len(missing_prevs),
shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
"[%s %s] Acquired room lock to fetch %d missing prev_events",
room_id,
@@ -278,13 +274,19 @@ class FederationHandler(BaseHandler):
len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
)
try:
await self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
)
except Exception as e:
raise Exception(
"Error fetching missing prev_events for %s: %s"
% (event_id, e)
)
# Update the set of things we've seen after trying to
# fetch the missing stuff
seen = yield self.store.have_seen_events(prevs)
seen = await self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@@ -292,14 +294,6 @@ class FederationHandler(BaseHandler):
room_id,
event_id,
)
elif missing_prevs:
logger.info(
"[%s %s] Not recursively fetching %d missing prev_events: %s",
room_id,
event_id,
len(missing_prevs),
shortstr(missing_prevs),
)
if prevs - seen:
# We've still not been able to get all of the prev_events for this event.
@@ -344,13 +338,18 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
logger.info(
"Event %s is missing prev_events: calculating state for a "
"backwards extremity",
event_id,
)
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
auth_chains = set()
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
ours = yield self.state_store.get_state_groups_ids(room_id, seen)
ours = await self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(
@@ -364,42 +363,17 @@ class FederationHandler(BaseHandler):
# know about
for p in prevs - seen:
logger.info(
"[%s %s] Requesting state at missing prev_event %s",
room_id,
event_id,
p,
"Requesting state at missing prev_event %s", event_id,
)
room_version = yield self.store.get_room_version(room_id)
with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
(
remote_state,
got_auth_chain,
) = yield self._get_state_for_room(origin, room_id, p)
# we want the state *after* p; _get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
[origin], p, room_version, outlier=True
(remote_state, _,) = await self._get_state_for_room(
origin, room_id, p, include_event_in_state=True
)
if remote_event is None:
raise Exception(
"Unable to get missing prev_event %s" % (p,)
)
if remote_event.is_state():
remote_state.append(remote_event)
# XXX hrm I'm not convinced that duplicate events will compare
# for equality, so I'm not sure this does what the author
# hoped.
auth_chains.update(got_auth_chain)
remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
@@ -408,7 +382,9 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
state_map = yield resolve_events_with_store(
room_version = await self.store.get_room_version(room_id)
state_map = await resolve_events_with_store(
room_id,
room_version,
state_maps,
event_map,
@@ -420,15 +396,14 @@ class FederationHandler(BaseHandler):
# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = yield self.store.get_events(
evs = await self.store.get_events(
list(state_map.values()),
get_prev_content=False,
check_redacted=False,
redact_behaviour=EventRedactBehaviour.AS_IS,
)
event_map.update(evs)
state = [event_map[e] for e in six.itervalues(state_map)]
auth_chain = list(auth_chains)
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
@@ -444,12 +419,9 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
yield self._process_received_pdu(
origin, pdu, state=state, auth_chain=auth_chain
)
await self._process_received_pdu(origin, pdu, state=state)
@defer.inlineCallbacks
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"""
Args:
origin (str): Origin of the pdu. Will be called to get the missing events
@@ -461,12 +433,14 @@ class FederationHandler(BaseHandler):
room_id = pdu.room_id
event_id = pdu.event_id
seen = yield self.store.have_seen_events(prevs)
seen = await self.store.have_seen_events(prevs)
if not prevs - seen:
return
latest = yield self.store.get_latest_event_ids_in_room(room_id)
logger.info("here1")
latest = await self.store.get_latest_event_ids_in_room(room_id)
logger.info("here2")
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -530,7 +504,7 @@ class FederationHandler(BaseHandler):
# All that said: Let's try increasing the timout to 60s and see what happens.
try:
missing_events = yield self.federation_client.get_missing_events(
missing_events = await self.federation_client.get_missing_events(
origin,
room_id,
earliest_events_ids=list(latest),
@@ -569,7 +543,7 @@ class FederationHandler(BaseHandler):
)
with nested_logging_context(ev.event_id):
try:
yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
await self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
except FederationError as e:
if e.code == 403:
logger.warning(
@@ -581,152 +555,159 @@ class FederationHandler(BaseHandler):
else:
raise
@defer.inlineCallbacks
@log_function
def _get_state_for_room(self, destination, room_id, event_id):
async def _get_state_for_room(
self,
destination: str,
room_id: str,
event_id: str,
include_event_in_state: bool = False,
) -> Tuple[List[EventBase], List[EventBase]]:
"""Requests all of the room state at a given event from a remote homeserver.
Args:
destination (str): The remote homeserver to query for the state.
room_id (str): The id of the room we're interested in.
event_id (str): The id of the event we want the state at.
destination: The remote homeserver to query for the state.
room_id: The id of the room we're interested in.
event_id: The id of the event we want the state at.
include_event_in_state: if true, the event itself will be included in the
returned state event list.
Returns:
Deferred[Tuple[List[EventBase], List[EventBase]]]:
A list of events in the state, and a list of events in the auth chain
for the given event.
A list of events in the state, possibly including the event itself, and
a list of events in the auth chain for the given event.
"""
(
state_event_ids,
auth_event_ids,
) = yield self.federation_client.get_room_state_ids(
) = await self.federation_client.get_room_state_ids(
destination, room_id, event_id=event_id
)
desired_events = set(state_event_ids + auth_event_ids)
event_map = yield self._get_events_from_store_or_dest(
if include_event_in_state:
desired_events.add(event_id)
event_map = await self._get_events_from_store_or_dest(
destination, room_id, desired_events
)
failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s: %s",
room_id,
"Failed to fetch missing state/auth events for %s %s",
event_id,
failed_to_fetch,
)
pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
remote_state = [
event_map[e_id] for e_id in state_event_ids if e_id in event_map
]
if include_event_in_state:
remote_event = event_map.get(event_id)
if not remote_event:
raise Exception("Unable to get missing prev_event %s" % (event_id,))
if remote_event.is_state() and remote_event.rejected_reason is None:
remote_state.append(remote_event)
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
auth_chain.sort(key=lambda e: e.depth)
return pdus, auth_chain
return remote_state, auth_chain
@defer.inlineCallbacks
def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
async def _get_events_from_store_or_dest(
self, destination: str, room_id: str, event_ids: Iterable[str]
) -> Dict[str, EventBase]:
"""Fetch events from a remote destination, checking if we already have them.
Args:
destination (str)
room_id (str)
event_ids (Iterable[str])
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
Returns:
Deferred[dict[str, EventBase]]: A deferred resolving to a map
from event_id to event
map from event_id to event
"""
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
missing_events = set(event_ids) - fetched_events.keys()
if not missing_events:
return fetched_events
if missing_events:
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
room_id,
)
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
event_ids,
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(await self.store.get_events(missing_events, allow_rejected=True))
)
# check for events which were in the wrong room.
#
# this can happen if a remote server claims that the state or
# auth_events at an event in room A are actually events in room B
bad_events = list(
(event_id, event.room_id)
for event_id, event in fetched_events.items()
if event.room_id != room_id
)
room_version = yield self.store.get_room_version(room_id)
# XXX 20 requests at once? really?
for batch in batch_iter(missing_events, 20):
deferreds = [
run_in_background(
self.federation_client.get_pdu,
destinations=[destination],
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]
res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
for bad_event_id, bad_room_id in bad_events:
# This is a bogus situation, but since we may only discover it a long time
# after it happened, we try our best to carry on, by just omitting the
# bad events from the returned auth/state set.
logger.warning(
"Remote server %s claims event %s in room %s is an auth/state "
"event in room %s",
destination,
bad_event_id,
bad_room_id,
room_id,
)
for success, result in res:
if success and result:
fetched_events[result.event_id] = result
del fetched_events[bad_event_id]
return fetched_events
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
async def _process_received_pdu(
self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]],
):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
Args:
origin: server sending the event
event: event to be persisted
state: Normally None, but if we are handling a gap in the graph
(ie, we are missing one or more prev_events), the resolved state at the
event
"""
room_id = event.room_id
event_id = event.event_id
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
event_ids = set()
if state:
event_ids |= {e.event_id for e in state}
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.)
event_infos = []
for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids:
continue
e.internal_metadata.outlier = True
auth_ids = e.auth_event_ids()
auth = {
(e.type, e.state_key): e
for e in auth_chain
if e.event_id in auth_ids or e.type == EventTypes.Create
}
event_infos.append(_NewEventInfo(event=e, auth_events=auth))
seen_ids.add(e.event_id)
logger.info(
"[%s %s] persisting newly-received auth/state events %s",
room_id,
event_id,
[e.event.event_id for e in event_infos],
)
yield self._handle_new_events(origin, event_infos)
try:
context = yield self._handle_new_event(origin, event, state=state)
context = await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
room = yield self.store.get_room(room_id)
room = await self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
await self.store.store_room(
room_id=room_id, room_creator_user_id="", is_public=False
)
except StoreError:
@@ -739,11 +720,11 @@ class FederationHandler(BaseHandler):
# changing their profile info.
newly_joined = True
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = await context.get_prev_state_ids()
prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id:
prev_state = yield self.store.get_event(
prev_state = await self.store.get_event(
prev_state_id, allow_none=True
)
if prev_state and prev_state.membership == Membership.JOIN:
@@ -751,11 +732,10 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
yield self.user_joined_room(user, room_id)
await self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
async def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. If the other side
@@ -772,9 +752,7 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
room_version = yield self.store.get_room_version(room_id)
events = yield self.federation_client.backfill(
events = await self.federation_client.backfill(
dest, room_id, limit=limit, extremities=extremities
)
@@ -789,7 +767,7 @@ class FederationHandler(BaseHandler):
# self._sanity_check_event(ev)
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
seen_events = await self.store.have_events_in_timeline(
set(e.event_id for e in events)
)
@@ -802,6 +780,9 @@ class FederationHandler(BaseHandler):
event_ids = set(e.event_id for e in events)
# build a list of events whose prev_events weren't in the batch.
# (XXX: this will include events whose prev_events we already have; that doesn't
# sound right?)
edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids]
logger.info("backfill: Got %d events with %d edges", len(events), len(edges))
@@ -812,8 +793,11 @@ class FederationHandler(BaseHandler):
state_events = {}
events_to_state = {}
for e_id in edges:
state, auth = yield self._get_state_for_room(
destination=dest, room_id=room_id, event_id=e_id
state, auth = await self._get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id,
include_event_in_state=False,
)
auth_events.update({a.event_id: a for a in auth})
auth_events.update({s.event_id: s for s in state})
@@ -830,95 +814,11 @@ class FederationHandler(BaseHandler):
auth_events.update(
{e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
)
missing_auth = required_auth - set(auth_events)
failed_to_fetch = set()
# Try and fetch any missing auth events from both DB and remote servers.
# We repeatedly do this until we stop finding new auth events.
while missing_auth - failed_to_fetch:
logger.info("Missing auth for backfill: %r", missing_auth)
ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
auth_events.update(ret_events)
required_auth.update(
a_id for event in ret_events.values() for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
if missing_auth - failed_to_fetch:
logger.info(
"Fetching missing auth for backfill: %r",
missing_auth - failed_to_fetch,
)
results = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.federation_client.get_pdu,
[dest],
event_id,
room_version=room_version,
outlier=True,
timeout=10000,
)
for event_id in missing_auth - failed_to_fetch
],
consumeErrors=True,
)
).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results if a})
required_auth.update(
a_id
for event in results
if event
for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
failed_to_fetch = missing_auth - set(auth_events)
seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
# We now have a chunk of events plus associated state and auth chain to
# persist. We do the persistence in two steps:
# 1. Auth events and state get persisted as outliers, plus the
# backward extremities get persisted (as non-outliers).
# 2. The rest of the events in the chunk get persisted one by one, as
# each one depends on the previous event for its state.
#
# The important thing is that events in the chunk get persisted as
# non-outliers, including when those events are also in the state or
# auth chain. Caution must therefore be taken to ensure that they are
# not accidentally marked as outliers.
# Step 1a: persist auth events that *don't* appear in the chunk
ev_infos = []
for a in auth_events.values():
# We only want to persist auth events as outliers that we haven't
# seen and aren't about to persist as part of the backfilled chunk.
if a.event_id in seen_events or a.event_id in event_map:
continue
a.internal_metadata.outlier = True
ev_infos.append(
_NewEventInfo(
event=a,
auth_events={
(
auth_events[a_id].type,
auth_events[a_id].state_key,
): auth_events[a_id]
for a_id in a.auth_event_ids()
if a_id in auth_events
},
)
)
# Step 1b: persist the events in the chunk we fetched state for (i.e.
# the backwards extremities) as non-outliers.
# Step 1: persist the events in the chunk we fetched state for (i.e.
# the backwards extremities), with custom auth events and state
for e_id in events_to_state:
# For paranoia we ensure that these events are marked as
# non-outliers
@@ -940,7 +840,7 @@ class FederationHandler(BaseHandler):
)
)
yield self._handle_new_events(dest, ev_infos, backfilled=True)
await self._handle_new_events(dest, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
@@ -956,16 +856,15 @@ class FederationHandler(BaseHandler):
# We store these one at a time since each event depends on the
# previous to work out the state.
# TODO: We can probably do something more clever here.
yield self._handle_new_event(dest, event, backfilled=True)
await self._handle_new_event(dest, event, backfilled=True)
return events
@defer.inlineCallbacks
def maybe_backfill(self, room_id, current_depth):
async def maybe_backfill(self, room_id, current_depth):
"""Checks the database to see if we should backfill before paginating,
and if so do.
"""
extremities = yield self.store.get_oldest_events_with_depth_in_room(room_id)
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
if not extremities:
logger.debug("Not backfilling as no extremeties found.")
@@ -997,15 +896,17 @@ class FederationHandler(BaseHandler):
# state *before* the event, ignoring the special casing certain event
# types have.
forward_events = yield self.store.get_successor_events(list(extremities))
forward_events = await self.store.get_successor_events(list(extremities))
extremities_events = yield self.store.get_events(
forward_events, check_redacted=False, get_prev_content=False
extremities_events = await self.store.get_events(
forward_events,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = yield filter_events_for_server(
filtered_extremities = await filter_events_for_server(
self.storage,
self.server_name,
list(extremities_events.values()),
@@ -1035,7 +936,7 @@ class FederationHandler(BaseHandler):
# First we try hosts that are already in the room
# TODO: HEURISTIC ALERT.
curr_state = yield self.state_handler.get_current_state(room_id)
curr_state = await self.state_handler.get_current_state(room_id)
def get_domains_from_state(state):
"""Get joined domains from state
@@ -1074,12 +975,11 @@ class FederationHandler(BaseHandler):
domain for domain, depth in curr_domains if domain != self.server_name
]
@defer.inlineCallbacks
def try_backfill(domains):
async def try_backfill(domains):
# TODO: Should we try multiple of these at a time?
for dom in domains:
try:
yield self.backfill(
await self.backfill(
dom, room_id, limit=100, extremities=extremities
)
# If this succeeded then we probably already have the
@@ -1110,7 +1010,7 @@ class FederationHandler(BaseHandler):
return False
success = yield try_backfill(likely_domains)
success = await try_backfill(likely_domains)
if success:
return True
@@ -1124,7 +1024,7 @@ class FederationHandler(BaseHandler):
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
states = yield make_deferred_yieldable(
states = await make_deferred_yieldable(
defer.gatherResults(
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
)
@@ -1134,7 +1034,7 @@ class FederationHandler(BaseHandler):
# event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
state_map = await self.store.get_events(
[e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False,
)
@@ -1150,7 +1050,7 @@ class FederationHandler(BaseHandler):
for e_id, _ in sorted_extremeties_tuple:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill(
success = await try_backfill(
[dom for dom, _ in likely_domains if dom not in tried_domains]
)
if success:
@@ -1160,6 +1060,56 @@ class FederationHandler(BaseHandler):
return False
async def _get_events_and_persist(
self, destination: str, room_id: str, events: Iterable[str]
):
"""Fetch the given events from a server, and persist them as outliers.
Logs a warning if we can't find the given event.
"""
room_version = await self.store.get_room_version(room_id)
event_infos = []
async def get_event(event_id: str):
with nested_logging_context(event_id):
try:
event = await self.federation_client.get_pdu(
[destination], event_id, room_version, outlier=True,
)
if event is None:
logger.warning(
"Server %s didn't return event %s", destination, event_id,
)
return
# recursively fetch the auth events for this event
auth_events = await self._get_events_from_store_or_dest(
destination, room_id, event.auth_event_ids()
)
auth = {}
for auth_event_id in event.auth_event_ids():
ae = auth_events.get(auth_event_id)
if ae:
auth[(ae.type, ae.state_key)] = ae
event_infos.append(_NewEventInfo(event, None, auth))
except Exception as e:
logger.warning(
"Error fetching missing state/auth event %s: %s %s",
event_id,
type(e),
e,
)
await concurrently_execute(get_event, events, 5)
await self._handle_new_events(
destination, event_infos,
)
def _sanity_check_event(self, ev):
"""
Do some early sanity checks of a received event
@@ -1299,7 +1249,7 @@ class FederationHandler(BaseHandler):
# Check whether this room is the result of an upgrade of a room we already know
# about. If so, migrate over user information
predecessor = yield self.store.get_room_predecessor(room_id)
if not predecessor:
if not predecessor or not isinstance(predecessor.get("room_id"), str):
return
old_room_id = predecessor["room_id"]
logger.debug(
@@ -1327,8 +1277,7 @@ class FederationHandler(BaseHandler):
return True
@defer.inlineCallbacks
def _handle_queued_pdus(self, room_queue):
async def _handle_queued_pdus(self, room_queue):
"""Process PDUs which got queued up while we were busy send_joining.
Args:
@@ -1344,7 +1293,7 @@ class FederationHandler(BaseHandler):
p.room_id,
)
with nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
await self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warning(
"Error handling queued PDU %s from %s: %s", p.event_id, origin, e
@@ -1474,7 +1423,7 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield self.user_joined_room(user, event.room_id)
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
@@ -1542,7 +1491,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts, room_id, user_id, "leave", content=content,
target_hosts, room_id, user_id, "leave", content=content
)
# Mark as outlier as we don't have any state for this event; we're not
# even in the room.
@@ -1983,7 +1932,7 @@ class FederationHandler(BaseHandler):
context = yield self.state_handler.compute_event_context(event, old_state=state)
if not auth_events:
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
auth_events_ids = yield self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
@@ -2392,12 +2341,12 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
}
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = yield context.get_current_state_ids()
current_state_ids = dict(current_state_ids)
current_state_ids.update(state_updates)
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = dict(prev_state_ids)
prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
@@ -2681,7 +2630,7 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"],
)
original_invite = None
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
@@ -2729,7 +2678,7 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
invite_event_id = prev_state_ids.get((EventTypes.ThirdPartyInvite, token))
invite_event = None
@@ -2903,7 +2852,7 @@ class FederationHandler(BaseHandler):
room_id=room_id, user_id=user.to_string(), change="joined"
)
else:
return user_joined_room(self.distributor, user, room_id)
return defer.succeed(user_joined_room(self.distributor, user, room_id))
@defer.inlineCallbacks
def get_room_complexity(self, remote_room_hosts, room_id):
+44 -52
View File
@@ -89,8 +89,7 @@ class InitialSyncHandler(BaseHandler):
include_archived,
)
@defer.inlineCallbacks
def _snapshot_all_rooms(
async def _snapshot_all_rooms(
self,
user_id=None,
pagin_config=None,
@@ -102,7 +101,7 @@ class InitialSyncHandler(BaseHandler):
if include_archived:
memberships.append(Membership.LEAVE)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
room_list = await self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, membership_list=memberships
)
@@ -110,33 +109,32 @@ class InitialSyncHandler(BaseHandler):
rooms_ret = []
now_token = yield self.hs.get_event_sources().get_current_token()
now_token = await self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
presence, _ = yield presence_stream.get_pagination_rows(
presence, _ = await presence_stream.get_pagination_rows(
user, pagination_config.get_source_config("presence"), None
)
receipt_stream = self.hs.get_event_sources().sources["receipt"]
receipt, _ = yield receipt_stream.get_pagination_rows(
receipt, _ = await receipt_stream.get_pagination_rows(
user, pagination_config.get_source_config("receipt"), None
)
tags_by_room = yield self.store.get_tags_for_user(user_id)
tags_by_room = await self.store.get_tags_for_user(user_id)
account_data, account_data_by_room = yield self.store.get_account_data_for_user(
account_data, account_data_by_room = await self.store.get_account_data_for_user(
user_id
)
public_room_ids = yield self.store.get_public_room_ids()
public_room_ids = await self.store.get_public_room_ids()
limit = pagin_config.limit
if limit is None:
limit = 10
@defer.inlineCallbacks
def handle_room(event):
async def handle_room(event):
d = {
"room_id": event.room_id,
"membership": event.membership,
@@ -149,8 +147,8 @@ class InitialSyncHandler(BaseHandler):
time_now = self.clock.time_msec()
d["inviter"] = event.sender
invite_event = yield self.store.get_event(event.event_id)
d["invite"] = yield self._event_serializer.serialize_event(
invite_event = await self.store.get_event(event.event_id)
d["invite"] = await self._event_serializer.serialize_event(
invite_event, time_now, as_client_event
)
@@ -174,7 +172,7 @@ class InitialSyncHandler(BaseHandler):
lambda states: states[event.event_id]
)
(messages, token), current_state = yield make_deferred_yieldable(
(messages, token), current_state = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
@@ -188,7 +186,7 @@ class InitialSyncHandler(BaseHandler):
)
).addErrback(unwrapFirstError)
messages = yield filter_events_for_client(
messages = await filter_events_for_client(
self.storage, user_id, messages
)
@@ -198,7 +196,7 @@ class InitialSyncHandler(BaseHandler):
d["messages"] = {
"chunk": (
yield self._event_serializer.serialize_events(
await self._event_serializer.serialize_events(
messages, time_now=time_now, as_client_event=as_client_event
)
),
@@ -206,7 +204,7 @@ class InitialSyncHandler(BaseHandler):
"end": end_token.to_string(),
}
d["state"] = yield self._event_serializer.serialize_events(
d["state"] = await self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event,
@@ -229,7 +227,7 @@ class InitialSyncHandler(BaseHandler):
except Exception:
logger.exception("Failed to get snapshot")
yield concurrently_execute(handle_room, room_list, 10)
await concurrently_execute(handle_room, room_list, 10)
account_data_events = []
for account_data_type, content in account_data.items():
@@ -253,8 +251,7 @@ class InitialSyncHandler(BaseHandler):
return ret
@defer.inlineCallbacks
def room_initial_sync(self, requester, room_id, pagin_config=None):
async def room_initial_sync(self, requester, room_id, pagin_config=None):
"""Capture the a snapshot of a room. If user is currently a member of
the room this will be what is currently in the room. If the user left
the room this will be what was in the room when they left.
@@ -271,32 +268,32 @@ class InitialSyncHandler(BaseHandler):
A JSON serialisable dict with the snapshot of the room.
"""
blocked = yield self.store.is_room_blocked(room_id)
blocked = await self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
user_id = requester.user.to_string()
membership, member_event_id = yield self._check_in_room_or_world_readable(
membership, member_event_id = await self._check_in_room_or_world_readable(
room_id, user_id
)
is_peeking = member_event_id is None
if membership == Membership.JOIN:
result = yield self._room_initial_sync_joined(
result = await self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
)
elif membership == Membership.LEAVE:
result = yield self._room_initial_sync_parted(
result = await self._room_initial_sync_parted(
user_id, room_id, pagin_config, membership, member_event_id, is_peeking
)
account_data_events = []
tags = yield self.store.get_tags_for_room(user_id, room_id)
tags = await self.store.get_tags_for_room(user_id, room_id)
if tags:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
account_data = yield self.store.get_account_data_for_room(user_id, room_id)
account_data = await self.store.get_account_data_for_room(user_id, room_id)
for account_data_type, content in account_data.items():
account_data_events.append({"type": account_data_type, "content": content})
@@ -304,11 +301,10 @@ class InitialSyncHandler(BaseHandler):
return result
@defer.inlineCallbacks
def _room_initial_sync_parted(
async def _room_initial_sync_parted(
self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
):
room_state = yield self.state_store.get_state_for_events([member_event_id])
room_state = await self.state_store.get_state_for_events([member_event_id])
room_state = room_state[member_event_id]
@@ -316,13 +312,13 @@ class InitialSyncHandler(BaseHandler):
if limit is None:
limit = 10
stream_token = yield self.store.get_stream_token_for_event(member_event_id)
stream_token = await self.store.get_stream_token_for_event(member_event_id)
messages, token = yield self.store.get_recent_events_for_room(
messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
)
messages = yield filter_events_for_client(
messages = await filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
)
@@ -336,13 +332,13 @@ class InitialSyncHandler(BaseHandler):
"room_id": room_id,
"messages": {
"chunk": (
yield self._event_serializer.serialize_events(messages, time_now)
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
"state": (
yield self._event_serializer.serialize_events(
await self._event_serializer.serialize_events(
room_state.values(), time_now
)
),
@@ -350,19 +346,18 @@ class InitialSyncHandler(BaseHandler):
"receipts": [],
}
@defer.inlineCallbacks
def _room_initial_sync_joined(
async def _room_initial_sync_joined(
self, user_id, room_id, pagin_config, membership, is_peeking
):
current_state = yield self.state.get_current_state(room_id=room_id)
current_state = await self.state.get_current_state(room_id=room_id)
# TODO: These concurrently
time_now = self.clock.time_msec()
state = yield self._event_serializer.serialize_events(
state = await self._event_serializer.serialize_events(
current_state.values(), time_now
)
now_token = yield self.hs.get_event_sources().get_current_token()
now_token = await self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
if limit is None:
@@ -377,28 +372,26 @@ class InitialSyncHandler(BaseHandler):
presence_handler = self.hs.get_presence_handler()
@defer.inlineCallbacks
def get_presence():
async def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
return []
states = yield presence_handler.get_states(
states = await presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
)
return states
@defer.inlineCallbacks
def get_receipts():
receipts = yield self.store.get_linearized_receipts_for_room(
async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(
room_id, to_key=now_token.receipt_key
)
if not receipts:
receipts = []
return receipts
presence, receipts, (messages, token) = yield make_deferred_yieldable(
presence, receipts, (messages, token) = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(get_presence),
@@ -414,7 +407,7 @@ class InitialSyncHandler(BaseHandler):
).addErrback(unwrapFirstError)
)
messages = yield filter_events_for_client(
messages = await filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
)
@@ -427,7 +420,7 @@ class InitialSyncHandler(BaseHandler):
"room_id": room_id,
"messages": {
"chunk": (
yield self._event_serializer.serialize_events(messages, time_now)
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
@@ -441,18 +434,17 @@ class InitialSyncHandler(BaseHandler):
return ret
@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
async def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
member_event = await self.auth.check_user_was_in_room(room_id, user_id)
return member_event.membership, member_event.event_id
except AuthError:
visibility = yield self.state_handler.get_current_state(
visibility = await self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
+23 -35
View File
@@ -46,8 +46,9 @@ from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID, create_requester
from synapse.types import Collection, RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.metrics import measure_func
@@ -421,7 +422,7 @@ class EventCreationHandler(object):
event_dict,
token_id=None,
txn_id=None,
prev_events_and_hashes=None,
prev_event_ids: Optional[Collection[str]] = None,
require_consent=True,
):
"""
@@ -438,10 +439,9 @@ class EventCreationHandler(object):
token_id (str)
txn_id (str)
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
prev_event_ids:
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
new event.
If None, they will be requested from the database.
@@ -497,9 +497,7 @@ class EventCreationHandler(object):
builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_events_and_hashes=prev_events_and_hashes,
builder=builder, requester=requester, prev_event_ids=prev_event_ids,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -514,7 +512,7 @@ class EventCreationHandler(object):
# federation as well as those created locally. As of room v3, aliases events
# can be created by users that are not in the room, therefore we have to
# tolerate them in event_auth.check().
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
prev_event = (
yield self.store.get_event(prev_event_id, allow_none=True)
@@ -664,7 +662,7 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
return
@@ -713,7 +711,7 @@ class EventCreationHandler(object):
@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(
self, builder, requester=None, prev_events_and_hashes=None
self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
):
"""Create a new event for a local client
@@ -722,10 +720,9 @@ class EventCreationHandler(object):
requester (synapse.types.Requester|None):
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
prev_event_ids:
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
new event.
If None, they will be requested from the database.
@@ -733,22 +730,15 @@ class EventCreationHandler(object):
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
if prev_events_and_hashes is not None:
assert len(prev_events_and_hashes) <= 10, (
if prev_event_ids is not None:
assert len(prev_event_ids) <= 10, (
"Attempting to create an event with %i prev_events"
% (len(prev_events_and_hashes),)
% (len(prev_event_ids),)
)
else:
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
builder.room_id
)
prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
event = yield builder.build(prev_event_ids=prev_event_ids)
context = yield self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
@@ -875,7 +865,7 @@ class EventCreationHandler(object):
if event.type == EventTypes.Redaction:
original_event = yield self.store.get_event(
event.redacts,
check_redacted=False,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
allow_rejected=False,
allow_none=True,
@@ -913,7 +903,7 @@ class EventCreationHandler(object):
def is_inviter_member_event(e):
return e.type == EventTypes.Member and e.sender == event.sender
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = yield context.get_current_state_ids()
state_to_include_ids = [
e_id
@@ -952,7 +942,7 @@ class EventCreationHandler(object):
if event.type == EventTypes.Redaction:
original_event = yield self.store.get_event(
event.redacts,
check_redacted=False,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
allow_rejected=False,
allow_none=True,
@@ -966,7 +956,7 @@ class EventCreationHandler(object):
if original_event.room_id != event.room_id:
raise SynapseError(400, "Cannot redact event from a different room")
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
auth_events_ids = yield self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
@@ -988,7 +978,7 @@ class EventCreationHandler(object):
event.internal_metadata.recheck_redaction = False
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
@@ -1041,9 +1031,7 @@ class EventCreationHandler(object):
# For each room we need to find a joined member we can use to send
# the dummy event with.
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids
@@ -1062,7 +1050,7 @@ class EventCreationHandler(object):
"room_id": room_id,
"sender": user_id,
},
prev_events_and_hashes=prev_events_and_hashes,
prev_event_ids=latest_event_ids,
)
event.internal_metadata.proactively_send = False
+13 -14
View File
@@ -280,8 +280,7 @@ class PaginationHandler(object):
await self.storage.purge_events.purge_room(room_id)
@defer.inlineCallbacks
def get_messages(
async def get_messages(
self,
requester,
room_id=None,
@@ -307,7 +306,7 @@ class PaginationHandler(object):
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token_for_pagination()
await self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key
@@ -319,11 +318,11 @@ class PaginationHandler(object):
source_config = pagin_config.get_source_config("room")
with (yield self.pagination_lock.read(room_id)):
with (await self.pagination_lock.read(room_id)):
(
membership,
member_event_id,
) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
) = await self.auth.check_in_room_or_world_readable(room_id, user_id)
if source_config.direction == "b":
# if we're going backwards, we might need to backfill. This
@@ -331,7 +330,7 @@ class PaginationHandler(object):
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
max_topo = await self.store.get_max_topological_token(
room_id, room_token.stream
)
@@ -339,18 +338,18 @@ class PaginationHandler(object):
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
await self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
@@ -365,7 +364,7 @@ class PaginationHandler(object):
if event_filter:
events = event_filter.filter(events)
events = yield filter_events_for_client(
events = await filter_events_for_client(
self.storage, user_id, events, is_peeking=(member_event_id is None)
)
@@ -385,19 +384,19 @@ class PaginationHandler(object):
(EventTypes.Member, event.sender) for event in events
)
state_ids = yield self.state_store.get_state_ids_for_event(
state_ids = await self.state_store.get_state_ids_for_event(
events[0].event_id, state_filter=state_filter
)
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
state = await self.store.get_events(list(state_ids.values()))
state = state.values()
time_now = self.clock.time_msec()
chunk = {
"chunk": (
yield self._event_serializer.serialize_events(
await self._event_serializer.serialize_events(
events, time_now, as_client_event=as_client_event
)
),
@@ -406,7 +405,7 @@ class PaginationHandler(object):
}
if state:
chunk["state"] = yield self._event_serializer.serialize_events(
chunk["state"] = await self._event_serializer.serialize_events(
state, time_now, as_client_event=as_client_event
)
+2 -7
View File
@@ -95,12 +95,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
@@ -230,7 +225,7 @@ class PresenceHandler(object):
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
if not self.store.db.is_running():
return
logger.info(
+5 -1
View File
@@ -295,12 +295,16 @@ class BaseProfileHandler(BaseHandler):
be found to be in any room the server is in, and therefore the query
is denied.
"""
# Implementation of MSC1301: don't allow looking up profiles if the
# requester isn't in the same room as the target. We expect requester to
# be None when this function is called outside of a profile query, e.g.
# when building a membership event. In this case, we must allow the
# lookup.
if not self.hs.config.require_auth_for_profile_requests or not requester:
if (
not self.hs.config.limit_profile_requests_to_users_who_share_rooms
or not requester
):
return
# Always allow the user to query their own profile.
+19 -22
View File
@@ -16,6 +16,7 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
import itertools
import logging
import math
@@ -184,7 +185,7 @@ class RoomCreationHandler(BaseHandler):
requester, tombstone_event, tombstone_context
)
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
old_room_state = yield tombstone_context.get_current_state_ids()
# update any aliases
yield self._move_aliases_to_new_room(
@@ -271,7 +272,7 @@ class RoomCreationHandler(BaseHandler):
except AuthError as e:
logger.warning("Unable to update PLs in old room: %s", e)
logger.info("Setting correct PLs in new room")
logger.info("Setting correct PLs in new room to %s", old_room_pl_state.content)
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
@@ -365,13 +366,18 @@ class RoomCreationHandler(BaseHandler):
needed_power_level = max(state_default, ban, max(event_power_levels.values()))
# Raise the requester's power level in the new room if necessary
current_power_level = power_levels["users"][requester.user.to_string()]
current_power_level = power_levels["users"][user_id]
if current_power_level < needed_power_level:
# Assign this power level to the requester
power_levels["users"][requester.user.to_string()] = needed_power_level
# make sure we copy the event content rather than overwriting it.
# note that if frozen_dicts are enabled, `power_levels` will be a frozen
# dict so we can't just copy.deepcopy it.
# Set the power levels to the modified state
initial_state[(EventTypes.PowerLevels, "")] = power_levels
new_power_levels = {k: v for k, v in power_levels.items() if k != "users"}
new_power_levels["users"] = {
k: v for k, v in power_levels.get("users", {}).items() if k != user_id
}
new_power_levels["users"][user_id] = needed_power_level
initial_state[(EventTypes.PowerLevels, "")] = new_power_levels
yield self._send_events_for_new_room(
requester,
@@ -733,7 +739,7 @@ class RoomCreationHandler(BaseHandler):
initial_state,
creation_content,
room_alias=None,
power_level_content_override=None,
power_level_content_override=None, # Doesn't apply when initial state has power level state event content
creator_join_profile=None,
):
def create(etype, content, **kwargs):
@@ -907,7 +913,10 @@ class RoomContextHandler(object):
results["events_before"] = yield filter_evts(results["events_before"])
results["events_after"] = yield filter_evts(results["events_after"])
results["event"] = event
# filter_evts can return a pruned event in case the user is allowed to see that
# there's something there but not see the content, so use the event that's in
# `filtered` rather than the event we retrieved from the datastore.
results["event"] = filtered[0]
if results["events_after"]:
last_event_id = results["events_after"][-1].event_id
@@ -938,7 +947,7 @@ class RoomContextHandler(object):
if event_filter:
state_events = event_filter.filter(state_events)
results["state"] = state_events
results["state"] = yield filter_evts(state_events)
# We use a dummy token here as we only care about the room portion of
# the token, which we replace.
@@ -1008,15 +1017,3 @@ class RoomEventSource(object):
def get_current_key_for_room(self, room_id):
return self.store.get_room_events_max_id(room_id)
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
events, next_key = yield self.store.paginate_room_events(
room_id=key,
from_key=config.from_key,
to_key=config.to_key,
direction=config.direction,
limit=config.limit,
)
return (events, next_key)
+9 -8
View File
@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
from synapse.types import Collection, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
target,
room_id,
membership,
prev_events_and_hashes,
prev_event_ids: Collection[str],
txn_id=None,
ratelimit=True,
content=None,
@@ -177,7 +177,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes,
prev_event_ids=prev_event_ids,
require_consent=require_consent,
)
@@ -193,7 +193,7 @@ class RoomMemberHandler(object):
requester, event, context, extra_users=[target], ratelimit=ratelimit
)
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -370,8 +370,7 @@ class RoomMemberHandler(object):
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids
@@ -485,7 +484,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes,
prev_event_ids=latest_event_ids,
content=content,
require_consent=require_consent,
)
@@ -507,6 +506,8 @@ class RoomMemberHandler(object):
Returns:
Deferred
"""
logger.info("Transferring room state from %s to %s", old_room_id, room_id)
# Find all local users that were in the old room and copy over each user's state
users = yield self.store.get_users_in_room(old_room_id)
yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
@@ -601,7 +602,7 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
if event.membership == Membership.JOIN:
if requester.is_guest:
guest_can_join = yield self._can_guest_join(prev_state_ids)
+25 -9
View File
@@ -21,7 +21,7 @@ from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
@@ -37,6 +37,7 @@ class SearchHandler(BaseHandler):
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self.auth = hs.get_auth()
@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
@@ -53,23 +54,38 @@ class SearchHandler(BaseHandler):
room_id (str): id of the room to search through.
Returns:
Deferred[iterable[unicode]]: predecessor room ids
Deferred[iterable[str]]: predecessor room ids
"""
historical_room_ids = []
while True:
predecessor = yield self.store.get_room_predecessor(room_id)
# The initial room must have been known for us to get this far
predecessor = yield self.store.get_room_predecessor(room_id)
# If no predecessor, assume we've hit a dead end
while True:
if not predecessor:
# We have reached the end of the chain of predecessors
break
# Add predecessor's room ID
historical_room_ids.append(predecessor["room_id"])
if not isinstance(predecessor.get("room_id"), str):
# This predecessor object is malformed. Exit here
break
# Scan through the old room for further predecessors
room_id = predecessor["room_id"]
predecessor_room_id = predecessor["room_id"]
# Don't add it to the list until we have checked that we are in the room
try:
next_predecessor_room = yield self.store.get_room_predecessor(
predecessor_room_id
)
except NotFoundError:
# The predecessor is not a known room, so we are done here
break
historical_room_ids.append(predecessor_room_id)
# And repeat
predecessor = next_predecessor_room
return historical_room_ids
-3
View File
@@ -317,6 +317,3 @@ class TypingNotificationEventSource(object):
def get_current_key(self):
return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
return [], pagination_config.from_key
+3 -15
View File
@@ -47,9 +47,9 @@ class SynapseRequest(Request):
logcontext(LoggingContext) : the log context for this request
"""
def __init__(self, site, channel, *args, **kw):
def __init__(self, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self.site = site
self.site = channel.site
self._channel = channel # this is used by the tests
self.authenticated_entity = None
self.start_time = 0
@@ -331,18 +331,6 @@ class XForwardedForRequest(SynapseRequest):
)
class SynapseRequestFactory(object):
def __init__(self, site, x_forwarded_for):
self.site = site
self.x_forwarded_for = x_forwarded_for
def __call__(self, *args, **kwargs):
if self.x_forwarded_for:
return XForwardedForRequest(self.site, *args, **kwargs)
else:
return SynapseRequest(self.site, *args, **kwargs)
class SynapseSite(Site):
"""
Subclass of a twisted http Site that does access logging with python's
@@ -364,7 +352,7 @@ class SynapseSite(Site):
self.site_tag = site_tag
proxied = config.get("x_forwarded", False)
self.requestFactory = SynapseRequestFactory(self, proxied)
self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii")
+1 -1
View File
@@ -171,7 +171,7 @@ class LogProducer(object):
def stopProducing(self):
self._paused = True
self._buffer = None
self._buffer = deque()
def resumeProducing(self):
self._paused = False
+3
View File
@@ -405,6 +405,9 @@ class LoggingContext(object):
"""
current = get_thread_resource_usage()
# Indicate to mypy that we know that self.usage_start is None.
assert self.usage_start is not None
utime_delta = current.ru_utime - self.usage_start.ru_utime
stime_delta = current.ru_stime - self.usage_start.ru_stime
+2 -2
View File
@@ -116,7 +116,7 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context):
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = yield context.get_prev_state_ids()
pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
@@ -304,7 +304,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits()
else:
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = yield context.get_current_state_ids()
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids))
+7 -5
View File
@@ -80,9 +80,11 @@ class PusherFactory(object):
return EmailPusher(self.hs, pusherdict, mailer)
def _app_name_from_pusherdict(self, pusherdict):
if "data" in pusherdict and "brand" in pusherdict["data"]:
app_name = pusherdict["data"]["brand"]
else:
app_name = self.config.email_app_name
data = pusherdict["data"]
return app_name
if isinstance(data, dict):
brand = data.get("brand")
if isinstance(brand, str):
return brand
return self.config.email_app_name
+6 -4
View File
@@ -232,7 +232,6 @@ class PusherPool:
Deferred
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
# Stagger starting up the pushers so we don't completely drown the
# process on start up.
@@ -245,7 +244,7 @@ class PusherPool:
"""Start the given pusher
Args:
pusherdict (dict):
pusherdict (dict): dict with the values pulled from the db table
Returns:
Deferred[EmailPusher|HttpPusher]
@@ -254,7 +253,8 @@ class PusherPool:
p = self.pusher_factory.create_pusher(pusherdict)
except PusherConfigException as e:
logger.warning(
"Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
"Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s",
pusherdict["id"],
pusherdict.get("user_name"),
pusherdict.get("app_id"),
pusherdict.get("pushkey"),
@@ -262,7 +262,9 @@ class PusherPool:
)
return
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
logger.exception(
"Couldn't start pusher id %i: caught Exception", pusherdict["id"],
)
return
if not p:
+4 -1
View File
@@ -51,6 +51,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.clock = hs.get_clock()
self.federation_handler = hs.get_handlers().federation_handler
@@ -100,7 +101,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
EventType = event_type_from_format_version(format_ver)
event = EventType(event_dict, internal_metadata, rejected_reason)
context = EventContext.deserialize(self.store, event_payload["context"])
context = EventContext.deserialize(
self.storage, event_payload["context"]
)
event_and_contexts.append((event, context))
+2 -1
View File
@@ -54,6 +54,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.clock = hs.get_clock()
@staticmethod
@@ -100,7 +101,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
event = EventType(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"])
context = EventContext.deserialize(self.store, content["context"])
context = EventContext.deserialize(self.storage, content["context"])
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
+2 -1
View File
@@ -46,7 +46,8 @@ class ReplicationClientFactory(ReconnectingClientFactory):
is required.
"""
maxDelay = 30 # Try at least once every N seconds
initialDelay = 0.1
maxDelay = 1 # Try at least once every N seconds
def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name
+6 -6
View File
@@ -15,6 +15,7 @@
""" This module contains REST servlets to do with profile: /profile/<paths> """
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.types import UserID
@@ -103,12 +104,11 @@ class ProfileAvatarURLRestServlet(RestServlet):
content = parse_json_object_from_request(request)
try:
new_avatar_url = content.get("avatar_url")
except Exception:
return 400, "Unable to parse avatar_url"
if new_avatar_url is None:
return 400, "Missing required key: avatar_url"
new_avatar_url = content["avatar_url"]
except KeyError:
raise SynapseError(
400, "Missing key 'avatar_url'", errcode=Codes.MISSING_PARAM
)
await self.profile_handler.set_avatar_url(
user, requester, new_avatar_url, is_admin
+15 -16
View File
@@ -28,6 +28,17 @@ from synapse.rest.client.v2_alpha._base import client_patterns
logger = logging.getLogger(__name__)
ALLOWED_KEYS = {
"app_display_name",
"app_id",
"data",
"device_display_name",
"kind",
"lang",
"profile_tag",
"pushkey",
}
class PushersRestServlet(RestServlet):
PATTERNS = client_patterns("/pushers$", v1=True)
@@ -43,23 +54,11 @@ class PushersRestServlet(RestServlet):
pushers = await self.hs.get_datastore().get_pushers_by_user_id(user.to_string())
allowed_keys = [
"app_display_name",
"app_id",
"data",
"device_display_name",
"kind",
"lang",
"profile_tag",
"pushkey",
]
filtered_pushers = list(
{k: v for k, v in p.items() if k in ALLOWED_KEYS} for p in pushers
)
for p in pushers:
for k, v in list(p.items()):
if k not in allowed_keys:
del p[k]
return 200, {"pushers": pushers}
return 200, {"pushers": filtered_pushers}
def on_OPTIONS(self, _):
return 200, {}
-103
View File
@@ -1,103 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
import os
import re
from canonicaljson import json
from twisted.protocols.basic import FileSender
from twisted.web import resource, server
from synapse.api.errors import Codes, cs_error
from synapse.http.server import finish_request, respond_with_json_bytes
logger = logging.getLogger(__name__)
class ContentRepoResource(resource.Resource):
"""Provides file uploading and downloading.
Uploads are POSTed to wherever this Resource is linked to. This resource
returns a "content token" which can be used to GET this content again. The
token is typically a path, but it may not be. Tokens can expire, be
one-time uses, etc.
In this case, the token is a path to the file and contains 3 interesting
sections:
- User ID base64d (for namespacing content to each user)
- random 24 char string
- Content type base64d (so we can return it when clients GET it)
"""
isLeaf = True
def __init__(self, hs, directory):
resource.Resource.__init__(self)
self.hs = hs
self.directory = directory
def render_GET(self, request):
# no auth here on purpose, to allow anyone to view, even across home
# servers.
# TODO: A little crude here, we could do this better.
filename = request.path.decode("ascii").split("/")[-1]
# be paranoid
filename = re.sub("[^0-9A-z.-_]", "", filename)
file_path = self.directory + "/" + filename
logger.debug("Searching for %s", file_path)
if os.path.isfile(file_path):
# filename has the content type
base64_contentype = filename.split(".")[1]
content_type = base64.urlsafe_b64decode(base64_contentype)
logger.info("Sending file %s", file_path)
f = open(file_path, "rb")
request.setHeader("Content-Type", content_type)
# cache for at least a day.
# XXX: we might want to turn this off for data we don't want to
# recommend caching as it's sensitive or private - or at least
# select private. don't bother setting Expires as all our matrix
# clients are smart enough to be happy with Cache-Control (right?)
request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400")
d = FileSender().beginFileTransfer(f, request)
# after the file has been sent, clean up and finish the request
def cbFinished(ignored):
f.close()
finish_request(request)
d.addCallback(cbFinished)
else:
respond_with_json_bytes(
request,
404,
json.dumps(cs_error("Not found", code=Codes.NOT_FOUND)),
send_cors=True,
)
return server.NOT_DONE_YET
def render_OPTIONS(self, request):
respond_with_json_bytes(request, 200, {}, send_cors=True)
return server.NOT_DONE_YET
+8 -31
View File
@@ -25,7 +25,6 @@ import abc
import logging
import os
from twisted.enterprise import adbapi
from twisted.mail.smtp import sendmail
from twisted.web.client import BrowserLikePolicyForHTTPS
@@ -34,6 +33,7 @@ from synapse.api.filtering import Filtering
from synapse.api.ratelimiting import Ratelimiter
from synapse.appservice.api import ApplicationServiceApi
from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
@@ -132,7 +132,6 @@ class HomeServer(object):
DEPENDENCIES = [
"http_client",
"db_pool",
"federation_client",
"federation_server",
"handlers",
@@ -209,16 +208,18 @@ class HomeServer(object):
# instantiated during setup() for future return by get_datastore()
DATASTORE_CLASS = abc.abstractproperty()
def __init__(self, hostname, reactor=None, **kwargs):
def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwargs):
"""
Args:
hostname : The hostname for the server.
config: The full config for the homeserver.
"""
if not reactor:
from twisted.internet import reactor
self._reactor = reactor
self.hostname = hostname
self.config = config
self._building = {}
self._listening_services = []
self.start_time = None
@@ -237,10 +238,8 @@ class HomeServer(object):
def setup(self):
logger.info("Setting up.")
with self.get_db_conn() as conn:
self.datastores = DataStores(self.DATASTORE_CLASS, conn, self)
conn.commit()
self.start_time = int(self.get_clock().time())
self.datastores = DataStores(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")
def setup_master(self):
@@ -274,6 +273,9 @@ class HomeServer(object):
def get_datastore(self):
return self.datastores.main
def get_datastores(self):
return self.datastores
def get_config(self):
return self.config
@@ -423,31 +425,6 @@ class HomeServer(object):
)
return MatrixFederationHttpClient(self, tls_client_options_factory)
def build_db_pool(self):
name = self.db_config["name"]
return adbapi.ConnectionPool(
name, cp_reactor=self.get_reactor(), **self.db_config.get("args", {})
)
def get_db_conn(self, run_new_connection=True):
"""Makes a new connection to the database, skipping the db pool
Returns:
Connection: a connection object implementing the PEP-249 spec
"""
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
db_params = {
k: v
for k, v in self.db_config.get("args", {}).items()
if not k.startswith("cp_")
}
db_conn = self.database_engine.module.connect(**db_params)
if run_new_connection:
self.database_engine.on_new_connection(db_conn)
return db_conn
def build_media_repository_resource(self):
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
+23 -12
View File
@@ -16,7 +16,7 @@
import logging
from collections import namedtuple
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional, Tuple
from six import iteritems, itervalues
@@ -32,6 +32,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
@@ -416,6 +417,7 @@ class StateHandler(object):
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
@@ -461,7 +463,7 @@ class StateResolutionHandler(object):
not be called for a single state group
Args:
room_id (str): room we are resolving for (used for logging)
room_id (str): room we are resolving for (used for logging and sanity checks)
room_version (str): version of the room
state_groups_ids (dict[int, dict[(str, str), str]]):
map from state group id to the state in that state group
@@ -517,6 +519,7 @@ class StateResolutionHandler(object):
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
room_id,
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
@@ -588,36 +591,44 @@ def _make_state_cache_entry(new_state, state_groups_ids):
)
def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
def resolve_events_with_store(
room_id: str,
room_version: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
):
"""
Args:
room_version(str): Version of the room
room_id: the room we are working in
state_sets(list): List of dicts of (type, state_key) -> event_id,
room_version: Version of the room
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_map_factory.
If None, all events will be fetched via state_res_store.
state_res_store (StateResolutionStore)
state_res_store: a place to fetch events from
Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
state_sets, event_map, state_res_store.get_events
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return v2.resolve_events_with_store(
room_version, state_sets, event_map, state_res_store
room_id, room_version, state_sets, event_map, state_res_store
)
@@ -645,7 +656,7 @@ class StateResolutionStore(object):
return self.store.get_events(
event_ids,
check_redacted=False,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
allow_rejected=allow_rejected,
)
+29 -5
View File
@@ -15,6 +15,7 @@
import hashlib
import logging
from typing import Callable, Dict, List, Optional, Tuple
from six import iteritems, iterkeys, itervalues
@@ -24,6 +25,7 @@ from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
logger = logging.getLogger(__name__)
@@ -32,13 +34,20 @@ POWER_KEY = (EventTypes.PowerLevels, "")
@defer.inlineCallbacks
def resolve_events_with_store(state_sets, event_map, state_map_factory):
def resolve_events_with_store(
room_id: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_map_factory: Callable,
):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
room_id: the room we are working in
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
@@ -46,11 +55,11 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
If None, all events will be fetched via state_map_factory.
state_map_factory(func): will be called
state_map_factory: will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.
Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
@@ -76,6 +85,14 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
if event_map is not None:
state_map.update(event_map)
# everything in the state map should be in the right room
for event in state_map.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)
# get the ids of the auth events which allow us to authenticate the
# conflicted state, picking only from the unconflicting state.
#
@@ -95,6 +112,13 @@ def resolve_events_with_store(state_sets, event_map, state_map_factory):
)
state_map_new = yield state_map_factory(new_needed_events)
for event in state_map_new.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)
state_map.update(state_map_new)
return _resolve_with_state(
+102 -34
View File
@@ -16,29 +16,40 @@
import heapq
import itertools
import logging
from typing import Dict, List, Optional, Tuple
from six import iteritems, itervalues
from twisted.internet import defer
import synapse.state
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.events import EventBase
logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
def resolve_events_with_store(
room_id: str,
room_version: str,
state_sets: List[Dict[Tuple[str, str], str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "synapse.state.StateResolutionStore",
):
"""Resolves the state using the v2 state resolution algorithm
Args:
room_version (str): The room version
room_id: the room we are working in
state_sets(list): List of dicts of (type, state_key) -> event_id,
room_version: The room version
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map(dict[str,FrozenEvent]|None):
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
@@ -46,9 +57,9 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
If None, all events will be fetched via state_res_store.
state_res_store (StateResolutionStore)
state_res_store:
Returns
Returns:
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
@@ -84,6 +95,14 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
)
event_map.update(events)
# everything in the event map should be in the right room
for event in event_map.values():
if event.room_id != room_id:
raise Exception(
"Attempting to state-resolve for room %s with event %s which is in %s"
% (room_id, event.event_id, event.room_id,)
)
full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map)
logger.debug("%d full_conflicted_set entries", len(full_conflicted_set))
@@ -94,13 +113,14 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
)
sorted_power_events = yield _reverse_topological_power_sort(
power_events, event_map, state_res_store, full_conflicted_set
room_id, power_events, event_map, state_res_store, full_conflicted_set
)
logger.debug("sorted %d power events", len(sorted_power_events))
# Now sequentially auth each one
resolved_state = yield _iterative_auth_checks(
room_id,
room_version,
sorted_power_events,
unconflicted_state,
@@ -121,13 +141,18 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
leftover_events = yield _mainline_sort(
leftover_events, pl, event_map, state_res_store
room_id, leftover_events, pl, event_map, state_res_store
)
logger.debug("resolving remaining events")
resolved_state = yield _iterative_auth_checks(
room_version, leftover_events, resolved_state, event_map, state_res_store
room_id,
room_version,
leftover_events,
resolved_state,
event_map,
state_res_store,
)
logger.debug("resolved")
@@ -141,11 +166,12 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
@defer.inlineCallbacks
def _get_power_level_for_sender(event_id, event_map, state_res_store):
def _get_power_level_for_sender(room_id, event_id, event_map, state_res_store):
"""Return the power level of the sender of the given event according to
their auth events.
Args:
room_id (str)
event_id (str)
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
@@ -153,20 +179,24 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store):
Returns:
Deferred[int]
"""
event = yield _get_event(event_id, event_map, state_res_store)
event = yield _get_event(room_id, event_id, event_map, state_res_store)
pl = None
for aid in event.auth_event_ids():
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
aev = yield _get_event(
room_id, aid, event_map, state_res_store, allow_none=True
)
if aev and (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
pl = aev
break
if pl is None:
# Couldn't find power level. Check if they're the creator of the room
for aid in event.auth_event_ids():
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.Create, ""):
aev = yield _get_event(
room_id, aid, event_map, state_res_store, allow_none=True
)
if aev and (aev.type, aev.state_key) == (EventTypes.Create, ""):
if aev.content.get("creator") == event.sender:
return 100
break
@@ -279,7 +309,7 @@ def _is_power_event(event):
@defer.inlineCallbacks
def _add_event_and_auth_chain_to_graph(
graph, event_id, event_map, state_res_store, auth_diff
graph, room_id, event_id, event_map, state_res_store, auth_diff
):
"""Helper function for _reverse_topological_power_sort that add the event
and its auth chain (that is in the auth diff) to the graph
@@ -287,6 +317,7 @@ def _add_event_and_auth_chain_to_graph(
Args:
graph (dict[str, set[str]]): A map from event ID to the events auth
event IDs
room_id (str): the room we are working in
event_id (str): Event to add to the graph
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
@@ -298,7 +329,7 @@ def _add_event_and_auth_chain_to_graph(
eid = state.pop()
graph.setdefault(eid, set())
event = yield _get_event(eid, event_map, state_res_store)
event = yield _get_event(room_id, eid, event_map, state_res_store)
for aid in event.auth_event_ids():
if aid in auth_diff:
if aid not in graph:
@@ -308,11 +339,14 @@ def _add_event_and_auth_chain_to_graph(
@defer.inlineCallbacks
def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff):
def _reverse_topological_power_sort(
room_id, event_ids, event_map, state_res_store, auth_diff
):
"""Returns a list of the event_ids sorted by reverse topological ordering,
and then by power level and origin_server_ts
Args:
room_id (str): the room we are working in
event_ids (list[str]): The events to sort
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
@@ -325,12 +359,14 @@ def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_
graph = {}
for event_id in event_ids:
yield _add_event_and_auth_chain_to_graph(
graph, event_id, event_map, state_res_store, auth_diff
graph, room_id, event_id, event_map, state_res_store, auth_diff
)
event_to_pl = {}
for event_id in graph:
pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store)
pl = yield _get_power_level_for_sender(
room_id, event_id, event_map, state_res_store
)
event_to_pl[event_id] = pl
def _get_power_order(event_id):
@@ -348,12 +384,13 @@ def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_
@defer.inlineCallbacks
def _iterative_auth_checks(
room_version, event_ids, base_state, event_map, state_res_store
room_id, room_version, event_ids, base_state, event_map, state_res_store
):
"""Sequentially apply auth checks to each event in given list, updating the
state as it goes along.
Args:
room_id (str)
room_version (str)
event_ids (list[str]): Ordered list of events to apply auth checks to
base_state (dict[tuple[str, str], str]): The set of state to start with
@@ -370,15 +407,22 @@ def _iterative_auth_checks(
auth_events = {}
for aid in event.auth_event_ids():
ev = yield _get_event(aid, event_map, state_res_store)
ev = yield _get_event(
room_id, aid, event_map, state_res_store, allow_none=True
)
if ev.rejected_reason is None:
auth_events[(ev.type, ev.state_key)] = ev
if not ev:
logger.warning(
"auth_event id %s for event %s is missing", aid, event_id
)
else:
if ev.rejected_reason is None:
auth_events[(ev.type, ev.state_key)] = ev
for key in event_auth.auth_types_for_event(event):
if key in resolved_state:
ev_id = resolved_state[key]
ev = yield _get_event(ev_id, event_map, state_res_store)
ev = yield _get_event(room_id, ev_id, event_map, state_res_store)
if ev.rejected_reason is None:
auth_events[key] = event_map[ev_id]
@@ -400,11 +444,14 @@ def _iterative_auth_checks(
@defer.inlineCallbacks
def _mainline_sort(event_ids, resolved_power_event_id, event_map, state_res_store):
def _mainline_sort(
room_id, event_ids, resolved_power_event_id, event_map, state_res_store
):
"""Returns a sorted list of event_ids sorted by mainline ordering based on
the given event resolved_power_event_id
Args:
room_id (str): room we're working in
event_ids (list[str]): Events to sort
resolved_power_event_id (str): The final resolved power level event ID
event_map (dict[str,FrozenEvent])
@@ -417,12 +464,14 @@ def _mainline_sort(event_ids, resolved_power_event_id, event_map, state_res_stor
pl = resolved_power_event_id
while pl:
mainline.append(pl)
pl_ev = yield _get_event(pl, event_map, state_res_store)
pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
auth_events = pl_ev.auth_event_ids()
pl = None
for aid in auth_events:
ev = yield _get_event(aid, event_map, state_res_store)
if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
ev = yield _get_event(
room_id, aid, event_map, state_res_store, allow_none=True
)
if ev and (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
pl = aid
break
@@ -457,6 +506,8 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor
Deferred[int]
"""
room_id = event.room_id
# We do an iterative search, replacing `event with the power level in its
# auth events (if any)
while event:
@@ -468,8 +519,10 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor
event = None
for aid in auth_events:
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
aev = yield _get_event(
room_id, aid, event_map, state_res_store, allow_none=True
)
if aev and (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
event = aev
break
@@ -478,22 +531,37 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor
@defer.inlineCallbacks
def _get_event(event_id, event_map, state_res_store):
def _get_event(room_id, event_id, event_map, state_res_store, allow_none=False):
"""Helper function to look up event in event_map, falling back to looking
it up in the store
Args:
room_id (str)
event_id (str)
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
allow_none (bool): if the event is not found, return None rather than raising
an exception
Returns:
Deferred[FrozenEvent]
Deferred[Optional[FrozenEvent]]
"""
if event_id not in event_map:
events = yield state_res_store.get_events([event_id], allow_rejected=True)
event_map.update(events)
return event_map[event_id]
event = event_map.get(event_id)
if event is None:
if allow_none:
return None
raise Exception("Unknown event %s" % (event_id,))
if event.room_id != room_id:
raise Exception(
"In state res for room %s, event %s is in %s"
% (room_id, event_id, event.room_id)
)
return event
def lexicographical_topological_sort(graph, key):
+4 -2
View File
@@ -16,6 +16,7 @@
# limitations under the License.
import logging
import random
from abc import ABCMeta
from six import PY2
from six.moves import builtins
@@ -30,7 +31,8 @@ from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
class SQLBaseStore(object):
# some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
class SQLBaseStore(metaclass=ABCMeta):
"""Base class for data stores that holds helper functions.
Note that multiple instances of this class will exist as there will be one
@@ -40,7 +42,7 @@ class SQLBaseStore(object):
def __init__(self, database: Database, db_conn, hs):
self.hs = hs
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.database_engine = database.engine
self.db = database
self.rand = random.SystemRandom()
+35 -16
View File
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Optional
from canonicaljson import json
@@ -97,15 +98,14 @@ class BackgroundUpdater(object):
def start_doing_background_updates(self):
run_as_background_process("background_updates", self.run_background_updates)
@defer.inlineCallbacks
def run_background_updates(self, sleep=True):
async def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates")
while True:
if sleep:
yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
result = yield self.do_next_background_update(
result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except Exception:
@@ -170,20 +170,21 @@ class BackgroundUpdater(object):
return not update_exists
@defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
async def do_next_background_update(
self, desired_duration_ms: float
) -> Optional[int]:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
Args:
desired_duration_ms(float): How long we want to spend
updating.
Returns:
A deferred that completes once some amount of work is done.
The deferred will have a value of None if there is currently
no more work to do.
None if there is no more work to do, otherwise an int
"""
if not self._background_update_queue:
updates = yield self.db.simple_select_list(
updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),
@@ -201,11 +202,12 @@ class BackgroundUpdater(object):
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
res = yield self._do_background_update(update_name, desired_duration_ms)
res = await self._do_background_update(update_name, desired_duration_ms)
return res
@defer.inlineCallbacks
def _do_background_update(self, update_name, desired_duration_ms):
async def _do_background_update(
self, update_name: str, desired_duration_ms: float
) -> int:
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@@ -225,7 +227,7 @@ class BackgroundUpdater(object):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
progress_json = yield self.db.simple_select_one_onecol(
progress_json = await self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
@@ -234,7 +236,7 @@ class BackgroundUpdater(object):
progress = json.loads(progress_json)
time_start = self._clock.time_msec()
items_updated = yield update_handler(progress, batch_size)
items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start
@@ -263,7 +265,9 @@ class BackgroundUpdater(object):
* A dict of the current progress
* An integer count of the number of items to update in this batch.
The handler should return a deferred integer count of items updated.
The handler should return a deferred or coroutine which returns an integer count
of items updated.
The handler is responsible for updating the progress of the update.
Args:
@@ -432,6 +436,21 @@ class BackgroundUpdater(object):
"background_updates", keyvalues={"update_name": update_name}
)
def _background_update_progress(self, update_name: str, progress: dict):
"""Update the progress of a background update
Args:
update_name: The name of the background update task
progress: The progress of the update.
"""
return self.db.runInteraction(
"background_update_progress",
self._background_update_progress_txn,
update_name,
progress,
)
def _background_update_progress_txn(self, txn, update_name, progress):
"""Update the progress of a background update
+59 -7
View File
@@ -13,24 +13,76 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.database import Database
import logging
from synapse.storage.data_stores.state import StateGroupDataStore
from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
logger = logging.getLogger(__name__)
class DataStores(object):
"""The various data stores.
These are low level interfaces to physical databases.
Attributes:
main (DataStore)
"""
def __init__(self, main_store_class, db_conn, hs):
def __init__(self, main_store_class, hs):
# Note we pass in the main store class here as workers use a different main
# store.
database = Database(hs)
# Check that db is correctly configured.
database.engine.check_database(db_conn.cursor())
self.databases = []
self.main = None
self.state = None
prepare_database(db_conn, database.engine, config=hs.config)
for database_config in hs.config.database.databases:
db_name = database_config.name
engine = create_engine(database_config.config)
self.main = main_store_class(database, db_conn, hs)
with make_conn(database_config, engine) as db_conn:
logger.info("Preparing database %r...", db_name)
engine.check_database(db_conn.cursor())
prepare_database(
db_conn, engine, hs.config, data_stores=database_config.data_stores,
)
database = Database(hs, database_config, engine)
if "main" in database_config.data_stores:
logger.info("Starting 'main' data store")
# Sanity check we don't try and configure the main store on
# multiple databases.
if self.main:
raise Exception("'main' data store already configured")
self.main = main_store_class(database, db_conn, hs)
if "state" in database_config.data_stores:
logger.info("Starting 'state' data store")
# Sanity check we don't try and configure the state store on
# multiple databases.
if self.state:
raise Exception("'state' data store already configured")
self.state = StateGroupDataStore(database, db_conn, hs)
db_conn.commit()
self.databases.append(database)
logger.info("Database %r prepared", db_name)
# Sanity check that we have actually configured all the required stores.
if not self.main:
raise Exception("No 'main' data store configured")
if not self.state:
raise Exception("No 'main' data store configured")

Some files were not shown because too many files have changed in this diff Show More