1
0

Compare commits

..

37 Commits

Author SHA1 Message Date
Erik Johnston
77d9357226 1.9.1 2020-01-28 13:09:36 +00:00
Erik Johnston
bdbeeb94ec Fix setting mau_limit_reserved_threepids config (#6793)
Calling the invalidation function during initialisation of the data
stores introduces a circular dependency, causing Synapse to fail to
start.
2020-01-28 13:05:24 +00:00
Brendan Abolivier
9bae740527 Fixup changelog 2020-01-23 13:13:19 +00:00
Brendan Abolivier
1755326d8a Fixup changelog 2020-01-23 13:11:07 +00:00
Brendan Abolivier
1dc5a791cf Fixup changelog 2020-01-23 12:59:29 +00:00
Brendan Abolivier
ba64c3b615 Merge branch 'release-v1.9.0' of github.com:matrix-org/synapse into release-v1.9.0 2020-01-23 12:58:03 +00:00
Brendan Abolivier
f3eac2b3e9 1.9.0 2020-01-23 12:57:55 +00:00
Andrew Morgan
d31f5f4d89 Update admin room docs with correct endpoints (#6770) 2020-01-23 11:37:26 +00:00
Brendan Abolivier
33f7e5ce2a Fixup warning about workers changes 2020-01-22 14:49:21 +00:00
Brendan Abolivier
91085ef49e Add deprecation headers 2020-01-22 14:30:22 +00:00
Brendan Abolivier
ffa637050d Fixup changelog 2020-01-22 14:19:23 +00:00
Brendan Abolivier
0d0f32bc53 1.9.0rc1 2020-01-22 14:03:46 +00:00
Andrew Morgan
90a28fb475 Admin API to list, filter and sort rooms (#6720) 2020-01-22 13:36:43 +00:00
Brendan Abolivier
ae6cf586b0 Merge pull request #6764 from matrix-org/babolivier/fix-thumbnail
Fix typo in _select_thumbnail
2020-01-22 13:21:00 +00:00
Brendan Abolivier
6ae0c8db33 Lint + changelog 2020-01-22 12:38:18 +00:00
Brendan Abolivier
d9a8728b11 Remove unused import 2020-01-22 12:30:49 +00:00
Brendan Abolivier
67aa18e8dc Add tests for thumbnailing 2020-01-22 12:28:07 +00:00
Brendan Abolivier
ed83c3a018 Fix typo in _select_thumbnail 2020-01-22 12:27:42 +00:00
Andrew Morgan
aa9b00fb2f Fix and add test to deprecated quarantine media admin api (#6756) 2020-01-22 11:05:50 +00:00
Neil Johnson
5e52d8563b Allow monthly active user limiting support for worker mode, fixes #4639. (#6742) 2020-01-22 11:05:14 +00:00
Erik Johnston
5d7a6ad223 Allow streaming cache invalidate all to workers. (#6749) 2020-01-22 10:37:00 +00:00
Erik Johnston
2093f83ea0 Remove unused CI docker compose files (#6754)
These now exist in the pipelines repo.
2020-01-22 10:36:48 +00:00
Ivan Vilata-i-Balaguer
837f62266b Avoid attribute error when password_config present but empty (#6753)
The old statement returned `None` for such a `password_config` (like the one
created on first run), thus retrieval of the `pepper` key failed with
`AttributeError`.

Fixes #5315

Signed-off-by: Ivan Vilata i Balaguer <ivan@selidor.net>
2020-01-22 07:32:52 +00:00
Brendan Abolivier
07124d028d Port synapse_port_db to async/await (#6718)
* Raise an exception if there are pending background updates

So we return with a non-0 code

* Changelog

* Port synapse_port_db to async/await

* Port update_database to async/await

* Add version string to mocked homeservers

* Remove unused imports

* Convert overseen bits to async/await

* Fixup logging contexts

* Fix imports

* Add a way to print an error without raising an exception

* Incorporate review
2020-01-21 19:04:58 +00:00
Erik Johnston
0e68760078 Add a DeltaState to track changes to be made to current state (#6716) 2020-01-20 18:07:20 +00:00
Erik Johnston
b0a66ab83c Fixup synapse.rest to pass mypy (#6732) 2020-01-20 17:38:21 +00:00
Erik Johnston
74b74462f1 Fix /events/:event_id deprecated API. (#6731) 2020-01-20 17:38:09 +00:00
Erik Johnston
0f6e525be3 Fixup synapse.api to pass mypy (#6733) 2020-01-20 17:34:13 +00:00
Erik Johnston
ceecedc68b Fix changing password via user admin API. (#6730) 2020-01-20 17:23:59 +00:00
Andrew Morgan
e9e066055f Fix empty account_validity config block (#6747) 2020-01-20 16:21:59 +00:00
Andrew Morgan
351fdfede6 Update changelog.d/6747.bugfix
Co-Authored-By: Erik Johnston <erik@matrix.org>
2020-01-20 15:58:44 +00:00
Erik Johnston
2f23eb27b3 Revert "Newsfile"
This reverts commit 11c23af465.
2020-01-20 15:12:58 +00:00
Erik Johnston
11c23af465 Newsfile 2020-01-20 15:11:38 +00:00
Andrew Morgan
026f4bdf3c Add changelog 2020-01-20 14:12:21 +00:00
Andrew Morgan
198d52da3a Fix empty account_validity config block 2020-01-20 14:05:29 +00:00
Brendan Abolivier
a17f64361c Add more logging around message retention policies support (#6717)
So we can debug issues like #6683 more easily
2020-01-17 20:51:44 +00:00
Erik Johnston
5909751936 Fix up changelog 2020-01-17 15:13:27 +00:00
83 changed files with 1494 additions and 447 deletions

View File

@@ -1,22 +0,0 @@
version: '3.1'
services:
postgres:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.5
depends_on:
- postgres
env_file: .env
environment:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
volumes:
- ..:/src

View File

@@ -1,22 +0,0 @@
version: '3.1'
services:
postgres:
image: postgres:11
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
depends_on:
- postgres
env_file: .env
environment:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
volumes:
- ..:/src

View File

@@ -1,22 +0,0 @@
version: '3.1'
services:
postgres:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
depends_on:
- postgres
env_file: .env
environment:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
volumes:
- ..:/src

View File

@@ -1,3 +1,96 @@
Synapse 1.9.1 (2020-01-28)
==========================
Bugfixes
--------
- Fix bug where setting `mau_limit_reserved_threepids` config would cause Synapse to refuse to start. ([\#6793](https://github.com/matrix-org/synapse/issues/6793))
Synapse 1.9.0 (2020-01-23)
==========================
**WARNING**: As of this release, Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)).
If your Synapse deployment uses workers, note that the reverse-proxy configurations for the `synapse.app.media_repository`, `synapse.app.federation_reader` and `synapse.app.event_creator` workers have changed, with the addition of a few paths (see the updated configurations [here](docs/workers.md#available-worker-applications)). Existing configurations will continue to work.
Improved Documentation
----------------------
- Fix endpoint documentation for the List Rooms admin API. ([\#6770](https://github.com/matrix-org/synapse/issues/6770))
Synapse 1.9.0rc1 (2020-01-22)
=============================
Features
--------
- Allow admin to create or modify a user. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5742](https://github.com/matrix-org/synapse/issues/5742))
- Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media. ([\#6681](https://github.com/matrix-org/synapse/issues/6681), [\#6756](https://github.com/matrix-org/synapse/issues/6756))
- Add `org.matrix.e2e_cross_signing` to `unstable_features` in `/versions` as per [MSC1756](https://github.com/matrix-org/matrix-doc/pull/1756). ([\#6712](https://github.com/matrix-org/synapse/issues/6712))
- Add a new admin API to list and filter rooms on the server. ([\#6720](https://github.com/matrix-org/synapse/issues/6720))
Bugfixes
--------
- Correctly proxy HTTP errors due to API calls to remote group servers. ([\#6654](https://github.com/matrix-org/synapse/issues/6654))
- Fix media repo admin APIs when using a media worker. ([\#6664](https://github.com/matrix-org/synapse/issues/6664))
- Fix "CRITICAL" errors being logged when a request is received for a uri containing non-ascii characters. ([\#6682](https://github.com/matrix-org/synapse/issues/6682))
- Fix a bug where we would assign a numeric user ID if somebody tried registering with an empty username. ([\#6690](https://github.com/matrix-org/synapse/issues/6690))
- Fix `purge_room` admin API. ([\#6711](https://github.com/matrix-org/synapse/issues/6711))
- Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs. ([\#6714](https://github.com/matrix-org/synapse/issues/6714))
- Fix the `synapse_port_db` not correctly running background updates. Thanks @tadzik for reporting. ([\#6718](https://github.com/matrix-org/synapse/issues/6718))
- Fix changing password via user admin API. ([\#6730](https://github.com/matrix-org/synapse/issues/6730))
- Fix `/events/:event_id` deprecated API. ([\#6731](https://github.com/matrix-org/synapse/issues/6731))
- Fix monthly active user limiting support for worker mode, fixes [#4639](https://github.com/matrix-org/synapse/issues/4639). ([\#6742](https://github.com/matrix-org/synapse/issues/6742))
- Fix bug when setting `account_validity` to an empty block in the config. Thanks to @Sorunome for reporting. ([\#6747](https://github.com/matrix-org/synapse/issues/6747))
- Fix `AttributeError: 'NoneType' object has no attribute 'get'` in `hash_password` when configuration has an empty `password_config`. Contributed by @ivilata. ([\#6753](https://github.com/matrix-org/synapse/issues/6753))
- Fix the `docker-compose.yaml` overriding the entire `/etc` folder of the container. Contributed by Fabian Meyer. ([\#6656](https://github.com/matrix-org/synapse/issues/6656))
Improved Documentation
----------------------
- Fix a typo in the configuration example for purge jobs in the sample configuration file. ([\#6621](https://github.com/matrix-org/synapse/issues/6621))
- Add complete documentation of the message retention policies support. ([\#6624](https://github.com/matrix-org/synapse/issues/6624), [\#6665](https://github.com/matrix-org/synapse/issues/6665))
- Add some helpful tips about changelog entries to the GitHub pull request template. ([\#6663](https://github.com/matrix-org/synapse/issues/6663))
- Clarify the `account_validity` and `email` sections of the sample configuration. ([\#6685](https://github.com/matrix-org/synapse/issues/6685))
- Add more endpoints to the documentation for Synapse workers. ([\#6698](https://github.com/matrix-org/synapse/issues/6698))
Deprecations and Removals
-------------------------
- Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)). ([\#6675](https://github.com/matrix-org/synapse/issues/6675))
Internal Changes
----------------
- Add `local_current_membership` table for tracking local user membership state in rooms. ([\#6655](https://github.com/matrix-org/synapse/issues/6655), [\#6728](https://github.com/matrix-org/synapse/issues/6728))
- Port `synapse.replication.tcp` to async/await. ([\#6666](https://github.com/matrix-org/synapse/issues/6666))
- Fixup `synapse.replication` to pass mypy checks. ([\#6667](https://github.com/matrix-org/synapse/issues/6667))
- Allow `additional_resources` to implement `IResource` directly. ([\#6686](https://github.com/matrix-org/synapse/issues/6686))
- Allow REST endpoint implementations to raise a `RedirectException`, which will redirect the user's browser to a given location. ([\#6687](https://github.com/matrix-org/synapse/issues/6687))
- Updates and extensions to the module API. ([\#6688](https://github.com/matrix-org/synapse/issues/6688))
- Updates to the SAML mapping provider API. ([\#6689](https://github.com/matrix-org/synapse/issues/6689), [\#6723](https://github.com/matrix-org/synapse/issues/6723))
- Remove redundant `RegistrationError` class. ([\#6691](https://github.com/matrix-org/synapse/issues/6691))
- Don't block processing of incoming EDUs behind processing PDUs in the same transaction. ([\#6697](https://github.com/matrix-org/synapse/issues/6697))
- Remove duplicate check for the `session` query parameter on the `/auth/xxx/fallback/web` Client-Server endpoint. ([\#6702](https://github.com/matrix-org/synapse/issues/6702))
- Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data. ([\#6706](https://github.com/matrix-org/synapse/issues/6706))
- Add `StateMap` type alias to simplify types. ([\#6715](https://github.com/matrix-org/synapse/issues/6715))
- Add a `DeltaState` to track changes to be made to current state during event persistence. ([\#6716](https://github.com/matrix-org/synapse/issues/6716))
- Add more logging around message retention policies support. ([\#6717](https://github.com/matrix-org/synapse/issues/6717))
- When processing a SAML response, log the assertions for easier configuration. ([\#6724](https://github.com/matrix-org/synapse/issues/6724))
- Fixup `synapse.rest` to pass mypy. ([\#6732](https://github.com/matrix-org/synapse/issues/6732), [\#6764](https://github.com/matrix-org/synapse/issues/6764))
- Fixup `synapse.api` to pass mypy. ([\#6733](https://github.com/matrix-org/synapse/issues/6733))
- Allow streaming cache 'invalidate all' to workers. ([\#6749](https://github.com/matrix-org/synapse/issues/6749))
- Remove unused CI docker compose files. ([\#6754](https://github.com/matrix-org/synapse/issues/6754))
Synapse 1.8.0 (2020-01-09)
==========================

View File

@@ -1 +0,0 @@
Allow admin to create or modify a user. Contributed by Awesome Technologies Innovationslabor GmbH.

View File

@@ -1 +0,0 @@
Fix a typo in the configuration example for purge jobs in the sample configuration file.

View File

@@ -1 +0,0 @@
Add complete documentation of the message retention policies support.

View File

@@ -1 +0,0 @@
Correctly proxy HTTP errors due to API calls to remote group servers.

View File

@@ -1 +0,0 @@
Add `local_current_membership` table for tracking local user membership state in rooms.

View File

@@ -1 +0,0 @@
No more overriding the entire /etc folder of the container in docker-compose.yaml. Contributed by Fabian Meyer.

View File

@@ -1 +0,0 @@
Add some helpful tips about changelog entries to the github pull request template.

View File

@@ -1 +0,0 @@
Fix media repo admin APIs when using a media worker.

View File

@@ -1 +0,0 @@
Add complete documentation of the message retention policies support.

View File

@@ -1 +0,0 @@
Port `synapse.replication.tcp` to async/await.

View File

@@ -1 +0,0 @@
Fixup `synapse.replication` to pass mypy checks.

View File

@@ -1 +0,0 @@
Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)).

View File

@@ -1 +0,0 @@
Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media.

View File

@@ -1,2 +0,0 @@
Fix "CRITICAL" errors being logged when a request is received for a uri containing non-ascii characters.

View File

@@ -1 +0,0 @@
Clarify the `account_validity` and `email` sections of the sample configuration.

View File

@@ -1 +0,0 @@
Allow additional_resources to implement IResource directly.

View File

@@ -1 +0,0 @@
Allow REST endpoint implementations to raise a RedirectException, which will redirect the user's browser to a given location.

View File

@@ -1 +0,0 @@
Updates and extensions to the module API.

View File

@@ -1 +0,0 @@
Updates to the SAML mapping provider API.

View File

@@ -1 +0,0 @@
Fix a bug where we would assign a numeric userid if somebody tried registering with an empty username.

View File

@@ -1 +0,0 @@
Remove redundant RegistrationError class.

View File

@@ -1 +0,0 @@
Don't block processing of incoming EDUs behind processing PDUs in the same transaction.

View File

@@ -1 +0,0 @@
Add more endpoints to the documentation for Synapse workers.

View File

@@ -1 +0,0 @@
Remove duplicate check for the `session` query parameter on the `/auth/xxx/fallback/web` Client-Server endpoint.

View File

@@ -1 +0,0 @@
Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data.

View File

@@ -1 +0,0 @@
Fix `purge_room` admin API.

View File

@@ -1 +0,0 @@
Add org.matrix.e2e_cross_signing to unstable_features in /versions as per [MSC1756](https://github.com/matrix-org/matrix-doc/pull/1756).

View File

@@ -1 +0,0 @@
Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs.

View File

@@ -1 +0,0 @@
Add StateMap type alias to simplify types.

View File

@@ -1 +0,0 @@
Updates to the SAML mapping provider API.

View File

@@ -1 +0,0 @@
When processing a SAML response, log the assertions for easier configuration.

View File

@@ -1 +0,0 @@
Fix a bug causing `ValueError: unsupported format character ''' (0x27) at index 312` error when running the schema 57 upgrade script.

12
debian/changelog vendored
View File

@@ -1,3 +1,15 @@
matrix-synapse-py3 (1.9.1) stable; urgency=medium
* New synapse release 1.9.1.
-- Synapse Packaging team <packages@matrix.org> Tue, 28 Jan 2020 13:09:23 +0000
matrix-synapse-py3 (1.9.0) stable; urgency=medium
* New synapse release 1.9.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 23 Jan 2020 12:56:31 +0000
matrix-synapse-py3 (1.8.0) stable; urgency=medium
[ Richard van der Hoff ]

173
docs/admin_api/rooms.md Normal file
View File

@@ -0,0 +1,173 @@
# List Room API
The List Room admin API allows server admins to get a list of rooms on their
server. There are various parameters available that allow for filtering and
sorting the returned list. This API supports pagination.
## Parameters
The following query parameters are available:
* `from` - Offset in the returned list. Defaults to `0`.
* `limit` - Maximum amount of rooms to return. Defaults to `100`.
* `order_by` - The method in which to sort the returned list of rooms. Valid values are:
- `alphabetical` - Rooms are ordered alphabetically by room name. This is the default.
- `size` - Rooms are ordered by the number of members. Largest to smallest.
* `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.
* `search_term` - Filter rooms by their room name. Search term can be contained in any
part of the room name. Defaults to no filtering.
The following fields are possible in the JSON response body:
* `rooms` - An array of objects, each containing information about a room.
- Room objects contain the following fields:
- `room_id` - The ID of the room.
- `name` - The name of the room.
- `canonical_alias` - The canonical (main) alias address of the room.
- `joined_members` - How many users are currently in the room.
* `offset` - The current pagination offset in rooms. This parameter should be
used instead of `next_token` for room offset as `next_token` is
not intended to be parsed.
* `total_rooms` - The total number of rooms this query can return. Using this
and `offset`, you have enough information to know the current
progression through the list.
* `next_batch` - If this field is present, we know that there are potentially
more rooms on the server that did not all fit into this response.
We can use `next_batch` to get the "next page" of results. To do
so, simply repeat your request, setting the `from` parameter to
the value of `next_batch`.
* `prev_batch` - If this field is present, it is possible to paginate backwards.
Use `prev_batch` for the `from` value in the next request to
get the "previous page" of results.
## Usage
A standard request with no filtering:
```
GET /_synapse/admin/v1/rooms
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"name": "Matrix HQ",
"canonical_alias": "#matrix:matrix.org",
"joined_members": 8326
},
... (8 hidden items) ...
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
}
],
"offset": 0,
"total_rooms": 10
}
```
Filtering by room name:
```
GET /_synapse/admin/v1/rooms?search_term=TWIM
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
}
],
"offset": 0,
"total_rooms": 1
}
```
Paginating through a list of rooms:
```
GET /_synapse/admin/v1/rooms?order_by=size
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"name": "Matrix HQ",
"canonical_alias": "#matrix:matrix.org",
"joined_members": 8326
},
... (98 hidden items) ...
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
}
],
"offset": 0,
"total_rooms": 150
"next_token": 100
}
```
The presence of the `next_token` parameter tells us that there are more rooms
than returned in this request, and we need to make another request to get them.
To get the next batch of room results, we repeat our request, setting the `from`
parameter to the value of `next_token`.
```
GET /_synapse/admin/v1/rooms?order_by=size&from=100
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!mscvqgqpHYjBGDxNym:matrix.org",
"name": "Music Theory",
"canonical_alias": "#musictheory:matrix.org",
"joined_members": 127
},
... (48 hidden items) ...
{
"room_id": "!twcBhHVdZlQWuuxBhN:termina.org.uk",
"name": "weechat-matrix",
"canonical_alias": "#weechat-matrix:termina.org.uk",
"joined_members": 137
}
],
"offset": 100,
"prev_batch": 0,
"total_rooms": 150
}
```
Once the `next_token` parameter is no longer present, we know we've reached the
end of the list.

View File

@@ -254,6 +254,11 @@ and they key to invalidate. For example:
> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
Alternatively, an entire cache can be invalidated by sending down a `null`
instead of the key. For example:
> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
However, there are times when a number of caches need to be invalidated
at the same time with the same key. To reduce traffic we batch those
invalidations into a single poke by defining a special cache name that

View File

@@ -7,6 +7,9 @@ show_error_codes = True
show_traceback = True
mypy_path = stubs
[mypy-pymacaroons.*]
ignore_missing_imports = True
[mypy-zope]
ignore_missing_imports = True
@@ -63,3 +66,12 @@ ignore_missing_imports = True
[mypy-sentry_sdk]
ignore_missing_imports = True
[mypy-PIL.*]
ignore_missing_imports = True
[mypy-lxml]
ignore_missing_imports = True
[mypy-jwt.*]
ignore_missing_imports = True

View File

@@ -22,10 +22,12 @@ import yaml
from twisted.internet import defer, reactor
import synapse
from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("update_database")
@@ -38,6 +40,8 @@ class MockHomeserver(HomeServer):
config.server_name, reactor=reactor, config=config, **kwargs
)
self.version_string = "Synapse/"+get_version_string(synapse)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
@@ -81,15 +85,17 @@ if __name__ == "__main__":
hs.setup()
store = hs.get_datastore()
@defer.inlineCallbacks
def run_background_updates():
yield store.db.updates.run_background_updates(sleep=False)
async def run_background_updates():
await store.db.updates.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()
# Apply all background updates on the database.
reactor.callWhenRunning(
lambda: run_as_background_process("background_updates", run_background_updates)
)
def run():
# Apply all background updates on the database.
defer.ensureDeferred(
run_as_background_process("background_updates", run_background_updates)
)
reactor.callWhenRunning(run)
reactor.run()

View File

@@ -52,7 +52,7 @@ if __name__ == "__main__":
if "config" in args and args.config:
config = yaml.safe_load(args.config)
bcrypt_rounds = config.get("bcrypt_rounds", bcrypt_rounds)
password_config = config.get("password_config", {})
password_config = config.get("password_config", None) or {}
password_pepper = password_config.get("pepper", password_pepper)
password = args.password

View File

@@ -27,13 +27,16 @@ from six import string_types
import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
import synapse
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.data_stores.main.deviceinbox import (
DeviceInboxBackgroundUpdateStore,
@@ -61,6 +64,7 @@ from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse_port_db")
@@ -125,6 +129,13 @@ APPEND_ONLY_TABLES = [
]
# Error returned by the run function. Used at the top-level part of the script to
# handle errors and return codes.
end_error = None
# The exec_info for the error, if any. If error is defined but not exec_info the script
# will show only the error message without the stacktrace, if exec_info is defined but
# not the error then the script will show nothing outside of what's printed in the run
# function. If both are defined, the script will print both the error and the stacktrace.
end_error_exec_info = None
@@ -177,6 +188,7 @@ class MockHomeserver:
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name
self.version_string = "Synapse/"+get_version_string(synapse)
def get_clock(self):
return self.clock
@@ -189,11 +201,10 @@ class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
@defer.inlineCallbacks
def setup_table(self, table):
async def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
row = yield self.postgres_store.db.simple_select_one(
row = await self.postgres_store.db.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
@@ -207,10 +218,10 @@ class Porter(object):
forward_chunk,
already_ported,
total_to_port,
) = yield self._setup_sent_transactions()
) = await self._setup_sent_transactions()
backward_chunk = 0
else:
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
@@ -227,7 +238,7 @@ class Porter(object):
backward_chunk = row["backward_rowid"]
if total_to_port is None:
already_ported, total_to_port = yield self._get_total_count_to_port(
already_ported, total_to_port = await self._get_total_count_to_port(
table, forward_chunk, backward_chunk
)
else:
@@ -238,9 +249,9 @@ class Porter(object):
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
yield self.postgres_store.execute(delete_all)
await self.postgres_store.execute(delete_all)
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
@@ -248,16 +259,13 @@ class Porter(object):
forward_chunk = 1
backward_chunk = 0
already_ported, total_to_port = yield self._get_total_count_to_port(
already_ported, total_to_port = await self._get_total_count_to_port(
table, forward_chunk, backward_chunk
)
defer.returnValue(
(table, already_ported, total_to_port, forward_chunk, backward_chunk)
)
return table, already_ported, total_to_port, forward_chunk, backward_chunk
@defer.inlineCallbacks
def handle_table(
async def handle_table(
self, table, postgres_size, table_size, forward_chunk, backward_chunk
):
logger.info(
@@ -275,7 +283,7 @@ class Porter(object):
self.progress.add_table(table, postgres_size, table_size)
if table == "event_search":
yield self.handle_search_table(
await self.handle_search_table(
postgres_size, table_size, forward_chunk, backward_chunk
)
return
@@ -294,7 +302,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
@@ -335,7 +343,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
headers, frows, brows = yield self.sqlite_store.db.runInteraction(
headers, frows, brows = await self.sqlite_store.db.runInteraction(
"select", r
)
@@ -361,7 +369,7 @@ class Porter(object):
},
)
yield self.postgres_store.execute(insert)
await self.postgres_store.execute(insert)
postgres_size += len(rows)
@@ -369,8 +377,7 @@ class Porter(object):
else:
return
@defer.inlineCallbacks
def handle_search_table(
async def handle_search_table(
self, postgres_size, table_size, forward_chunk, backward_chunk
):
select = (
@@ -390,7 +397,7 @@ class Porter(object):
return headers, rows
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
if rows:
forward_chunk = rows[-1][0] + 1
@@ -438,7 +445,7 @@ class Porter(object):
},
)
yield self.postgres_store.execute(insert)
await self.postgres_store.execute(insert)
postgres_size += len(rows)
@@ -476,11 +483,10 @@ class Porter(object):
return store
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
async def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = (
yield self.postgres_store.db.updates.has_completed_background_updates()
await self.postgres_store.db.updates.has_completed_background_updates()
)
if not postgres_ready:
@@ -489,13 +495,20 @@ class Porter(object):
self.progress.set_state("Running background updates on PostgreSQL")
while not postgres_ready:
yield self.postgres_store.db.updates.do_next_background_update(100)
postgres_ready = yield (
await self.postgres_store.db.updates.do_next_background_update(100)
postgres_ready = await (
self.postgres_store.db.updates.has_completed_background_updates()
)
@defer.inlineCallbacks
def run(self):
async def run(self):
"""Ports the SQLite database to a PostgreSQL database.
When a fatal error is met, its message is assigned to the global "end_error"
variable. When this error comes with a stacktrace, its exec_info is assigned to
the global "end_error_exec_info" variable.
"""
global end_error
try:
# we allow people to port away from outdated versions of sqlite.
self.sqlite_store = self.build_db_store(
@@ -505,21 +518,21 @@ class Porter(object):
# Check if all background updates are done, abort if not.
updates_complete = (
yield self.sqlite_store.db.updates.has_completed_background_updates()
await self.sqlite_store.db.updates.has_completed_background_updates()
)
if not updates_complete:
sys.stderr.write(
end_error = (
"Pending background updates exist in the SQLite3 database."
" Please start Synapse again and wait until every update has finished"
" before running this script.\n"
)
defer.returnValue(None)
return
self.postgres_store = self.build_db_store(
self.hs_config.get_single_database()
)
yield self.run_background_updates_on_postgres()
await self.run_background_updates_on_postgres()
self.progress.set_state("Creating port tables")
@@ -547,22 +560,22 @@ class Porter(object):
)
try:
yield self.postgres_store.db.runInteraction("alter_table", alter_table)
await self.postgres_store.db.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass
yield self.postgres_store.db.runInteraction(
await self.postgres_store.db.runInteraction(
"create_port_table", create_port_table
)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
postgres_tables = yield self.postgres_store.db.simple_select_onecol(
postgres_tables = await self.postgres_store.db.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
@@ -573,28 +586,34 @@ class Porter(object):
# Step 3. Figure out what still needs copying
self.progress.set_state("Checking on port progress")
setup_res = yield defer.gatherResults(
[
self.setup_table(table)
for table in tables
if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_")
],
consumeErrors=True,
setup_res = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(self.setup_table, table)
for table in tables
if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_")
],
consumeErrors=True,
)
)
# Step 4. Do the copying.
self.progress.set_state("Copying to postgres")
yield defer.gatherResults(
[self.handle_table(*res) for res in setup_res], consumeErrors=True
await make_deferred_yieldable(
defer.gatherResults(
[run_in_background(self.handle_table, *res) for res in setup_res],
consumeErrors=True,
)
)
# Step 5. Do final post-processing
yield self._setup_state_group_id_seq()
await self._setup_state_group_id_seq()
self.progress.done()
except Exception:
except Exception as e:
global end_error_exec_info
end_error = e
end_error_exec_info = sys.exc_info()
logger.exception("")
finally:
@@ -634,8 +653,7 @@ class Porter(object):
return outrows
@defer.inlineCallbacks
def _setup_sent_transactions(self):
async def _setup_sent_transactions(self):
# Only save things from the last day
yesterday = int(time.time() * 1000) - 86400000
@@ -656,7 +674,7 @@ class Porter(object):
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@@ -669,7 +687,7 @@ class Porter(object):
txn, "sent_transactions", headers[1:], rows
)
yield self.postgres_store.execute(insert)
await self.postgres_store.execute(insert)
else:
max_inserted_rowid = 0
@@ -686,10 +704,10 @@ class Porter(object):
else:
return 1
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = await self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
@@ -705,46 +723,49 @@ class Porter(object):
(size,) = txn.fetchone()
return int(size)
remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
remaining_count = await self.sqlite_store.execute(get_sent_table_size)
total_count = remaining_count + inserted_rows
defer.returnValue((next_chunk, inserted_rows, total_count))
return next_chunk, inserted_rows, total_count
@defer.inlineCallbacks
def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
frows = yield self.sqlite_store.execute_sql(
async def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
frows = await self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
)
brows = yield self.sqlite_store.execute_sql(
brows = await self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
)
defer.returnValue(frows[0][0] + brows[0][0])
return frows[0][0] + brows[0][0]
@defer.inlineCallbacks
def _get_already_ported_count(self, table):
rows = yield self.postgres_store.execute_sql(
async def _get_already_ported_count(self, table):
rows = await self.postgres_store.execute_sql(
"SELECT count(*) FROM %s" % (table,)
)
defer.returnValue(rows[0][0])
return rows[0][0]
@defer.inlineCallbacks
def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
remaining, done = yield defer.gatherResults(
[
self._get_remaining_count_to_port(table, forward_chunk, backward_chunk),
self._get_already_ported_count(table),
],
consumeErrors=True,
async def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
remaining, done = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._get_remaining_count_to_port,
table,
forward_chunk,
backward_chunk,
),
run_in_background(self._get_already_ported_count, table),
],
)
)
remaining = int(remaining) if remaining else 0
done = int(done) if done else 0
defer.returnValue((done, remaining + done))
return done, remaining + done
def _setup_state_group_id_seq(self):
def r(txn):
@@ -1010,7 +1031,12 @@ if __name__ == "__main__":
hs_config=config,
)
reactor.callWhenRunning(porter.run)
@defer.inlineCallbacks
def run():
with LoggingContext("synapse_port_db_run"):
yield defer.ensureDeferred(porter.run())
reactor.callWhenRunning(run)
reactor.run()
@@ -1019,7 +1045,11 @@ if __name__ == "__main__":
else:
start()
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)
if end_error:
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)
sys.stderr.write(end_error)
sys.exit(5)

View File

@@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.9.0.dev2"
__version__ = "1.9.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -15,6 +15,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from six import text_type
import jsonschema
@@ -293,7 +295,7 @@ class Filter(object):
room_id = None
ev_type = "m.presence"
contains_url = False
labels = []
labels = [] # type: List[str]
else:
sender = event.get("sender", None)
if not sender:

View File

@@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from collections import OrderedDict
from typing import Any, Optional, Tuple
from synapse.api.errors import LimitExceededError
@@ -23,7 +24,9 @@ class Ratelimiter(object):
"""
def __init__(self):
self.message_counts = collections.OrderedDict()
self.message_counts = (
OrderedDict()
) # type: OrderedDict[Any, Tuple[float, int, Optional[float]]]
def can_do_action(self, key, time_now_s, rate_hz, burst_count, update=True):
"""Can the entity (e.g. user or IP address) perform the action?

View File

@@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
RoomStateEventRestServlet,
)
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
RoomStore,
DirectoryStore,
SlavedTransactionStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
SlavedEventStore,
SlavedClientIpStore,
RoomStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -29,6 +29,7 @@ class AccountValidityConfig(Config):
def __init__(self, config, synapse_config):
if config is None:
return
super(AccountValidityConfig, self).__init__()
self.enabled = config.get("enabled", False)
self.renew_by_email_enabled = "renew_at" in config
@@ -93,7 +94,7 @@ class RegistrationConfig(Config):
)
self.account_validity = AccountValidityConfig(
config.get("account_validity", {}), config
config.get("account_validity") or {}, config
)
self.registrations_require_3pid = config.get("registrations_require_3pid", [])

View File

@@ -294,6 +294,14 @@ class ServerConfig(Config):
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None
if self.retention_enabled:
logger.info(
"Message retention policies support enabled with the following default"
" policy: min_lifetime = %s ; max_lifetime = %s",
self.retention_default_min_lifetime,
self.retention_default_max_lifetime,
)
self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)

View File

@@ -634,7 +634,7 @@ def get_public_keys(invite_event):
return public_keys
def auth_types_for_event(event) -> Set[Tuple[str]]:
def auth_types_for_event(event) -> Set[Tuple[str, str]]:
"""Given an event, return a list of (EventType, StateKey) that may be
needed to auth the event. The returned list may be a superset of what
would actually be required depending on the full state of the room.

View File

@@ -88,6 +88,8 @@ class PaginationHandler(object):
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
self.clock.looping_call(
run_as_background_process,
job["interval"],
@@ -130,11 +132,22 @@ class PaginationHandler(object):
else:
include_null = False
logger.info(
"[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)",
min_ms,
max_ms,
include_null,
)
rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
logger.debug("[purge] Rooms to purge: %s", rooms)
for room_id, retention_policy in iteritems(rooms):
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"

View File

@@ -66,11 +66,16 @@ class BaseSlavedStore(SQLBaseStore):
self._cache_id_gen.advance(token)
for row in rows:
if row.cache_func == CURRENT_STATE_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for current state cache"
)
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)

View File

@@ -17,7 +17,9 @@
import itertools
import logging
from collections import namedtuple
from typing import Any
from typing import Any, List, Optional
import attr
logger = logging.getLogger(__name__)
@@ -65,10 +67,24 @@ PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
CachesStreamRow = namedtuple(
"CachesStreamRow",
("cache_func", "keys", "invalidation_ts"), # str # list(str) # int
)
@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""
cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(

View File

@@ -29,7 +29,7 @@ from synapse.rest.admin._base import (
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.rooms import ShutdownRoomRestServlet
from synapse.rest.admin.rooms import ListRoomRestServlet, ShutdownRoomRestServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.users import (
AccountValidityRenewServlet,
@@ -188,6 +188,7 @@ def register_servlets(hs, http_server):
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
ListRoomRestServlet(hs).register(http_server)
PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)

View File

@@ -40,6 +40,21 @@ def historical_admin_path_patterns(path_regex):
)
def admin_patterns(path_regex: str):
"""Returns the list of patterns for an admin endpoint
Args:
path_regex: The regex string to match. This should NOT have a ^
as this will be prefixed.
Returns:
A list of regex patterns.
"""
admin_prefix = "^/_synapse/admin/v1"
patterns = [re.compile(admin_prefix + path_regex)]
return patterns
async def assert_requester_is_admin(auth, request):
"""Verify that the requester is an admin user

View File

@@ -36,7 +36,7 @@ class QuarantineMediaInRoom(RestServlet):
historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media/quarantine")
+
# This path kept around for legacy reasons
historical_admin_path_patterns("/quarantine_media/(?P<room_id>![^/]+)")
historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
)
def __init__(self, hs):

View File

@@ -15,15 +15,20 @@
import logging
from synapse.api.constants import Membership
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_integer,
parse_json_object_from_request,
parse_string,
)
from synapse.rest.admin._base import (
admin_patterns,
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.storage.data_stores.main.room import RoomSortOrder
from synapse.types import create_requester
from synapse.util.async_helpers import maybe_awaitable
@@ -155,3 +160,80 @@ class ShutdownRoomRestServlet(RestServlet):
"new_room_id": new_room_id,
},
)
class ListRoomRestServlet(RestServlet):
"""
List all rooms that are known to the homeserver. Results are returned
in a dictionary containing room information. Supports pagination.
"""
PATTERNS = admin_patterns("/rooms")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
# Extract query parameters
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
order_by = parse_string(request, "order_by", default="alphabetical")
if order_by not in (
RoomSortOrder.ALPHABETICAL.value,
RoomSortOrder.SIZE.value,
):
raise SynapseError(
400,
"Unknown value for order_by: %s" % (order_by,),
errcode=Codes.INVALID_PARAM,
)
search_term = parse_string(request, "search_term")
if search_term == "":
raise SynapseError(
400,
"search_term cannot be an empty string",
errcode=Codes.INVALID_PARAM,
)
direction = parse_string(request, "dir", default="f")
if direction not in ("f", "b"):
raise SynapseError(
400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM
)
reverse_order = True if direction == "b" else False
# Return list of rooms according to parameters
rooms, total_rooms = await self.store.get_rooms_paginate(
start, limit, order_by, reverse_order, search_term
)
response = {
# next_token should be opaque, so return a value the client can parse
"offset": start,
"rooms": rooms,
"total_rooms": total_rooms,
}
# Are there more rooms to paginate through after this?
if (start + limit) < total_rooms:
# There are. Calculate where the query should start from next time
# to get the next part of the list
response["next_batch"] = start + limit
# Is it possible to paginate backwards? Check if we currently have an
# offset
if start > 0:
if start > limit:
# Going back one iteration won't take us to the start.
# Calculate new offset
response["prev_batch"] = start - limit
else:
response["prev_batch"] = 0
return 200, response

View File

@@ -193,8 +193,8 @@ class UserRestServletV2(RestServlet):
raise SynapseError(400, "Invalid password")
else:
new_password = body["password"]
await self._set_password_handler.set_password(
target_user, new_password, requester
await self.set_password_handler.set_password(
target_user.to_string(), new_password, requester
)
if "deactivated" in body:
@@ -338,21 +338,22 @@ class UserRegisterServlet(RestServlet):
got_mac = body["mac"]
want_mac = hmac.new(
want_mac_builder = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce.encode("utf8"))
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac_builder.update(nonce.encode("utf8"))
want_mac_builder.update(b"\x00")
want_mac_builder.update(username)
want_mac_builder.update(b"\x00")
want_mac_builder.update(password)
want_mac_builder.update(b"\x00")
want_mac_builder.update(b"admin" if admin else b"notadmin")
if user_type:
want_mac.update(b"\x00")
want_mac.update(user_type.encode("utf8"))
want_mac = want_mac.hexdigest()
want_mac_builder.update(b"\x00")
want_mac_builder.update(user_type.encode("utf8"))
want_mac = want_mac_builder.hexdigest()
if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
raise SynapseError(403, "HMAC incorrect")

View File

@@ -70,7 +70,6 @@ class EventStreamRestServlet(RestServlet):
return 200, {}
# TODO: Unit test gets, with and without auth, with different kinds of events.
class EventRestServlet(RestServlet):
PATTERNS = client_patterns("/events/(?P<event_id>[^/]*)$", v1=True)
@@ -78,6 +77,7 @@ class EventRestServlet(RestServlet):
super(EventRestServlet, self).__init__()
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
self.auth = hs.get_auth()
self._event_serializer = hs.get_event_client_serializer()
async def on_GET(self, request, event_id):

View File

@@ -514,7 +514,7 @@ class CasTicketServlet(RestServlet):
if user is None:
raise Exception("CAS response does not contain user")
except Exception:
logger.error("Error parsing CAS response", exc_info=1)
logger.exception("Error parsing CAS response")
raise LoginError(401, "Invalid CAS response", errcode=Codes.UNAUTHORIZED)
if not success:
raise LoginError(

View File

@@ -16,6 +16,7 @@
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
import logging
from typing import List, Optional
from six.moves.urllib import parse as urlparse
@@ -207,7 +208,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
requester, event_dict, txn_id=txn_id
)
ret = {}
ret = {} # type: dict
if event:
set_tag("event_id", event.event_id)
ret = {"event_id": event.event_id}
@@ -285,7 +286,7 @@ class JoinRoomAliasServlet(TransactionRestServlet):
try:
remote_room_hosts = [
x.decode("ascii") for x in request.args[b"server_name"]
]
] # type: Optional[List[str]]
except Exception:
remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
@@ -375,7 +376,7 @@ class PublicRoomListRestServlet(TransactionRestServlet):
server = parse_string(request, "server", default=None)
content = parse_json_object_from_request(request)
limit = int(content.get("limit", 100))
limit = int(content.get("limit", 100)) # type: Optional[int]
since_token = content.get("since", None)
search_filter = content.get("filter", None)
@@ -504,11 +505,16 @@ class RoomMessageListRestServlet(RestServlet):
filter_bytes = parse_string(request, b"filter", encoding=None)
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes.decode("UTF-8"))
event_filter = Filter(json.loads(filter_json))
if event_filter.filter_json.get("event_format", "client") == "federation":
event_filter = Filter(json.loads(filter_json)) # type: Optional[Filter]
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
== "federation"
):
as_client_event = False
else:
event_filter = None
msgs = await self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
@@ -611,7 +617,7 @@ class RoomEventContextServlet(RestServlet):
filter_bytes = parse_string(request, "filter")
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes)
event_filter = Filter(json.loads(filter_json))
event_filter = Filter(json.loads(filter_json)) # type: Optional[Filter]
else:
event_filter = None

View File

@@ -32,7 +32,7 @@ def client_patterns(path_regex, releases=(0,), unstable=True, v1=False):
Args:
path_regex (str): The regex string to match. This should NOT have a ^
as this will be prefixed.
as this will be prefixed.
Returns:
SRE_Pattern
"""

View File

@@ -21,6 +21,7 @@ from typing import List, Union
from six import string_types
import synapse
import synapse.api.auth
import synapse.types
from synapse.api.constants import LoginType
from synapse.api.errors import (
@@ -405,7 +406,7 @@ class RegisterRestServlet(RestServlet):
return ret
elif kind != b"user":
raise UnrecognizedRequestError(
"Do not understand membership kind: %s" % (kind,)
"Do not understand membership kind: %s" % (kind.decode("utf8"),)
)
# we do basic sanity checks here because the auth layer will store these

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Tuple
from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request
@@ -60,7 +61,7 @@ class SendToDeviceRestServlet(servlet.RestServlet):
sender_user_id, message_type, content["messages"]
)
response = (200, {})
response = (200, {}) # type: Tuple[int, dict]
return response

View File

@@ -13,6 +13,7 @@
# limitations under the License.
import logging
from typing import Dict, Set
from canonicaljson import encode_canonical_json, json
from signedjson.sign import sign_json
@@ -103,7 +104,7 @@ class RemoteKey(DirectServeResource):
async def _async_render_GET(self, request):
if len(request.postpath) == 1:
(server,) = request.postpath
query = {server.decode("ascii"): {}}
query = {server.decode("ascii"): {}} # type: dict
elif len(request.postpath) == 2:
server, key_id = request.postpath
minimum_valid_until_ts = parse_integer(request, "minimum_valid_until_ts")
@@ -148,7 +149,7 @@ class RemoteKey(DirectServeResource):
time_now_ms = self.clock.time_msec()
cache_misses = dict()
cache_misses = dict() # type: Dict[str, Set[str]]
for (server_name, key_id, from_server), results in cached.items():
results = [(result["ts_added_ms"], result) for result in results]

View File

@@ -18,6 +18,7 @@ import errno
import logging
import os
import shutil
from typing import Dict, Tuple
from six import iteritems
@@ -605,7 +606,7 @@ class MediaRepository(object):
# We deduplicate the thumbnail sizes by ignoring the cropped versions if
# they have the same dimensions of a scaled one.
thumbnails = {}
thumbnails = {} # type: Dict[Tuple[int, int, str], str]
for r_width, r_height, r_method, r_type in requirements:
if r_method == "crop":
thumbnails.setdefault((r_width, r_height, r_type), r_method)

View File

@@ -23,6 +23,7 @@ import re
import shutil
import sys
import traceback
from typing import Dict, Optional
import six
from six import string_types
@@ -237,8 +238,8 @@ class PreviewUrlResource(DirectServeResource):
# If we don't find a match, we'll look at the HTTP Content-Type, and
# if that doesn't exist, we'll fall back to UTF-8.
if not encoding:
match = _content_type_match.match(media_info["media_type"])
encoding = match.group(1) if match else "utf-8"
content_match = _content_type_match.match(media_info["media_type"])
encoding = content_match.group(1) if content_match else "utf-8"
og = decode_and_calc_og(body, media_info["uri"], encoding)
@@ -518,7 +519,7 @@ def _calc_og(tree, media_uri):
# "og:video:height" : "720",
# "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
og = {}
og = {} # type: Dict[str, Optional[str]]
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
if "content" in tag.attrib:
# if we've got more than 50 tags, someone is taking the piss

View File

@@ -296,8 +296,8 @@ class ThumbnailResource(DirectServeResource):
d_h = desired_height
if desired_method.lower() == "crop":
info_list = []
info_list2 = []
crop_info_list = []
crop_info_list2 = []
for info in thumbnail_infos:
t_w = info["thumbnail_width"]
t_h = info["thumbnail_height"]
@@ -309,7 +309,7 @@ class ThumbnailResource(DirectServeResource):
type_quality = desired_type != info["thumbnail_type"]
length_quality = info["thumbnail_length"]
if t_w >= d_w or t_h >= d_h:
info_list.append(
crop_info_list.append(
(
aspect_quality,
min_quality,
@@ -320,7 +320,7 @@ class ThumbnailResource(DirectServeResource):
)
)
else:
info_list2.append(
crop_info_list2.append(
(
aspect_quality,
min_quality,
@@ -330,10 +330,10 @@ class ThumbnailResource(DirectServeResource):
info,
)
)
if info_list:
return min(info_list)[-1]
if crop_info_list:
return min(crop_info_list)[-1]
else:
return min(info_list2)[-1]
return min(crop_info_list2)[-1]
else:
info_list = []
info_list2 = []

View File

@@ -17,6 +17,7 @@
import logging
import random
from abc import ABCMeta
from typing import Any, Optional
from six import PY2
from six.moves import builtins
@@ -26,7 +27,7 @@ from canonicaljson import json
from synapse.storage.database import LoggingTransaction # noqa: F401
from synapse.storage.database import make_in_list_sql_clause # noqa: F401
from synapse.storage.database import Database
from synapse.types import get_domain_from_id
from synapse.types import Collection, get_domain_from_id
logger = logging.getLogger(__name__)
@@ -63,17 +64,24 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
def _attempt_to_invalidate_cache(self, cache_name, key):
def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
):
"""Attempts to invalidate the cache of the given name, ignoring if the
cache doesn't exist. Mainly used for invalidating caches on workers,
where they may not have the cache.
Args:
cache_name (str)
key (tuple)
cache_name
key: Entry to invalidate. If None then invalidates the entire
cache.
"""
try:
getattr(self, cache_name).invalidate(key)
if key is None:
getattr(self, cache_name).invalidate_all()
else:
getattr(self, cache_name).invalidate(tuple(key))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.

View File

@@ -16,6 +16,7 @@
import itertools
import logging
from typing import Any, Iterable, Optional
from twisted.internet import defer
@@ -43,6 +44,14 @@ class CacheInvalidationStore(SQLBaseStore):
txn.call_after(cache_func.invalidate, keys)
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
def _invalidate_all_cache_and_stream(self, txn, cache_func):
"""Invalidates the entire cache and adds it to the cache stream so slaves
will know to invalidate their caches.
"""
txn.call_after(cache_func.invalidate_all)
self._send_invalidation_to_replication(txn, cache_func.__name__, None)
def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
"""Special case invalidation of caches based on current state.
@@ -73,17 +82,24 @@ class CacheInvalidationStore(SQLBaseStore):
txn, CURRENT_STATE_CACHE_NAME, [room_id]
)
def _send_invalidation_to_replication(self, txn, cache_name, keys):
def _send_invalidation_to_replication(
self, txn, cache_name: str, keys: Optional[Iterable[Any]]
):
"""Notifies replication that given cache has been invalidated.
Note that this does *not* invalidate the cache locally.
Args:
txn
cache_name (str)
keys (iterable[str])
cache_name
keys: Entry to invalidate. If None will invalidate all.
"""
if cache_name == CURRENT_STATE_CACHE_NAME and keys is None:
raise Exception(
"Can't stream invalidate all with magic current state cache"
)
if isinstance(self.database_engine, PostgresEngine):
# get_next() returns a context manager which is designed to wrap
# the transaction. However, we want to only get an ID when we want
@@ -95,13 +111,16 @@ class CacheInvalidationStore(SQLBaseStore):
txn.call_after(ctx.__exit__, None, None, None)
txn.call_after(self.hs.get_notifier().on_new_replication_data)
if keys is not None:
keys = list(keys)
self.db.simple_insert_txn(
txn,
table="cache_invalidation_stream",
values={
"stream_id": stream_id,
"cache_func": cache_name,
"keys": list(keys),
"keys": keys,
"invalidation_ts": self.clock.time_msec(),
},
)

View File

@@ -19,6 +19,7 @@ import itertools
import logging
from collections import Counter as c_counter, OrderedDict, namedtuple
from functools import wraps
from typing import Dict, List, Tuple
from six import iteritems, text_type
from six.moves import range
@@ -41,8 +42,9 @@ from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.data_stores.main.event_federation import EventFederationStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
from synapse.storage.database import Database
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.storage.database import Database, LoggingTransaction
from synapse.storage.persist_events import DeltaState
from synapse.types import RoomStreamToken, StateMap, get_domain_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.iterutils import batch_iter
@@ -148,30 +150,26 @@ class EventsStore(
@defer.inlineCallbacks
def _persist_events_and_state_updates(
self,
events_and_contexts,
current_state_for_room,
state_delta_for_room,
new_forward_extremeties,
backfilled=False,
delete_existing=False,
events_and_contexts: List[Tuple[EventBase, EventContext]],
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
delete_existing: bool = False,
):
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Args:
events_and_contexts (list[(EventBase, EventContext)]):
current_state_for_room (dict[str, dict]): Map from room_id to the
current state of the room based on forward extremities
state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
of `(to_delete, to_insert)` where to_delete is a list
of type/state keys to remove from current state, and to_insert
is a map (type,key)->event_id giving the state delta in each
room.
new_forward_extremities (dict[str, list[str]]): Map from room_id
to list of event IDs that are the new forward extremities of
the room.
backfilled (bool)
delete_existing (bool):
events_and_contexts:
current_state_for_room: Map from room_id to the current state of
the room based on forward extremities
state_delta_for_room: Map from room_id to the delta to apply to
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
delete_existing
Returns:
Deferred: resolves when the events have been persisted
@@ -352,12 +350,12 @@ class EventsStore(
@log_function
def _persist_events_txn(
self,
txn,
events_and_contexts,
backfilled,
delete_existing=False,
state_delta_for_room={},
new_forward_extremeties={},
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
delete_existing: bool = False,
state_delta_for_room: Dict[str, DeltaState] = {},
new_forward_extremeties: Dict[str, List[str]] = {},
):
"""Insert some number of room events into the necessary database tables.
@@ -366,21 +364,16 @@ class EventsStore(
whether the event was rejected.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]):
events to persist
backfilled (bool): True if the events were backfilled
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
txn
events_and_contexts: events to persist
backfilled: True if the events were backfilled
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of type/state keys to be
removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
list of the event ids which are the forward extremities.
state_delta_for_room: The current-state delta for each room.
new_forward_extremetie: The new forward extremities for each room.
For each room, a list of the event ids which are the forward
extremities.
"""
all_events_and_contexts = events_and_contexts
@@ -465,9 +458,15 @@ class EventsStore(
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
def _update_current_state_txn(
self,
txn: LoggingTransaction,
state_delta_by_room: Dict[str, DeltaState],
stream_id: int,
):
for room_id, delta_state in iteritems(state_delta_by_room):
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so

View File

@@ -27,12 +27,76 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
class MonthlyActiveUsersStore(SQLBaseStore):
class MonthlyActiveUsersWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
super(MonthlyActiveUsersWorkerStore, self).__init__(database, db_conn, hs)
self._clock = hs.get_clock()
self.hs = hs
@cached(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
"""
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
(count,) = txn.fetchone()
return count
return self.db.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
def get_registered_reserved_users(self):
"""Of the reserved threepids defined in config, which are associated
with registered users?
Returns:
Defered[list]: Real reserved users
"""
users = []
for tp in self.hs.config.mau_limits_reserved_threepids[
: self.hs.config.max_mau_value
]:
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
users.append(user_id)
return users
@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Arguments:
user_id (str): user to add/update
Return:
Deferred[int] : timestamp since last seen, None if never seen
"""
return self.db.simple_select_one_onecol(
table="monthly_active_users",
keyvalues={"user_id": user_id},
retcol="timestamp",
allow_none=True,
desc="user_last_seen_monthly_active",
)
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
def __init__(self, database: Database, db_conn, hs):
super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
# Do not add more reserved users than the total allowable number
# cur = LoggingTransaction(
self.db.new_transaction(
db_conn,
"initialise_mau_threepids",
@@ -57,7 +121,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if user_id:
is_support = self.is_support_user_txn(txn, user_id)
if not is_support:
self.upsert_monthly_active_user_txn(txn, user_id)
# We do this manually here to avoid hitting #6791
self.db.simple_upsert_txn(
txn,
table="monthly_active_users",
keyvalues={"user_id": user_id},
values={"timestamp": int(self._clock.time_msec())},
)
else:
logger.warning("mau limit reserved threepid %s not found in db" % tp)
@@ -146,57 +216,22 @@ class MonthlyActiveUsersStore(SQLBaseStore):
txn.execute(sql, query_args)
# It seems poor to invalidate the whole cache, Postgres supports
# 'Returning' which would allow me to invalidate only the
# specific users, but sqlite has no way to do this and instead
# I would need to SELECT and the DELETE which without locking
# is racy.
# Have resolved to invalidate the whole cache for now and do
# something about it if and when the perf becomes significant
self._invalidate_all_cache_and_stream(
txn, self.user_last_seen_monthly_active
)
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
reserved_users = yield self.get_registered_reserved_users()
yield self.db.runInteraction(
"reap_monthly_active_users", _reap_users, reserved_users
)
# It seems poor to invalidate the whole cache, Postgres supports
# 'Returning' which would allow me to invalidate only the
# specific users, but sqlite has no way to do this and instead
# I would need to SELECT and the DELETE which without locking
# is racy.
# Have resolved to invalidate the whole cache for now and do
# something about it if and when the perf becomes significant
self.user_last_seen_monthly_active.invalidate_all()
self.get_monthly_active_count.invalidate_all()
@cached(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
"""
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
(count,) = txn.fetchone()
return count
return self.db.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
def get_registered_reserved_users(self):
"""Of the reserved threepids defined in config, which are associated
with registered users?
Returns:
Defered[list]: Real reserved users
"""
users = []
for tp in self.hs.config.mau_limits_reserved_threepids[
: self.hs.config.max_mau_value
]:
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
users.append(user_id)
return users
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
@@ -222,23 +257,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
)
user_in_mau = self.user_last_seen_monthly_active.cache.get(
(user_id,), None, update_metrics=False
)
if user_in_mau is None:
self.get_monthly_active_count.invalidate(())
self.user_last_seen_monthly_active.invalidate((user_id,))
def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member
Note that, after calling this method, it will generally be necessary
to invalidate the caches on user_last_seen_monthly_active and
get_monthly_active_count. We can't do that here, because we are running
in a database thread rather than the main thread, and we can't call
txn.call_after because txn may not be a LoggingTransaction.
We consciously do not call is_support_txn from this method because it
is not possible to cache the response. is_support_txn will be false in
almost all cases, so it seems reasonable to call it only for
@@ -269,27 +290,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={"timestamp": int(self._clock.time_msec())},
)
return is_insert
@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Arguments:
user_id (str): user to add/update
Return:
Deferred[int] : timestamp since last seen, None if never seen
"""
return self.db.simple_select_one_onecol(
table="monthly_active_users",
keyvalues={"user_id": user_id},
retcol="timestamp",
allow_none=True,
desc="user_last_seen_monthly_active",
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
self._invalidate_cache_and_stream(
txn, self.user_last_seen_monthly_active, (user_id,)
)
return is_insert
@defer.inlineCallbacks
def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally

View File

@@ -18,7 +18,8 @@ import collections
import logging
import re
from abc import abstractmethod
from typing import List, Optional, Tuple
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from six import integer_types
@@ -46,6 +47,18 @@ RatelimitOverride = collections.namedtuple(
)
class RoomSortOrder(Enum):
"""
Enum to define the sorting method used when returning rooms with get_rooms_paginate
ALPHABETICAL = sort rooms alphabetically by name
SIZE = sort rooms by membership size, highest to lowest
"""
ALPHABETICAL = "alphabetical"
SIZE = "size"
class RoomWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
super(RoomWorkerStore, self).__init__(database, db_conn, hs)
@@ -281,6 +294,116 @@ class RoomWorkerStore(SQLBaseStore):
desc="is_room_blocked",
)
async def get_rooms_paginate(
self,
start: int,
limit: int,
order_by: RoomSortOrder,
reverse_order: bool,
search_term: Optional[str],
) -> Tuple[List[Dict[str, Any]], int]:
"""Function to retrieve a paginated list of rooms as json.
Args:
start: offset in the list
limit: maximum amount of rooms to retrieve
order_by: the sort order of the returned list
reverse_order: whether to reverse the room list
search_term: a string to filter room names by
Returns:
A list of room dicts and an integer representing the total number of
rooms that exist given this query
"""
# Filter room names by a string
where_statement = ""
if search_term:
where_statement = "WHERE state.name LIKE ?"
# Our postgres db driver converts ? -> %s in SQL strings as that's the
# placeholder for postgres.
# HOWEVER, if you put a % into your SQL then everything goes wibbly.
# To get around this, we're going to surround search_term with %'s
# before giving it to the database in python instead
search_term = "%" + search_term + "%"
# Set ordering
if RoomSortOrder(order_by) == RoomSortOrder.SIZE:
order_by_column = "curr.joined_members"
order_by_asc = False
elif RoomSortOrder(order_by) == RoomSortOrder.ALPHABETICAL:
# Sort alphabetically
order_by_column = "state.name"
order_by_asc = True
else:
raise StoreError(
500, "Incorrect value for order_by provided: %s" % order_by
)
# Whether to return the list in reverse order
if reverse_order:
# Flip the boolean
order_by_asc = not order_by_asc
# Create one query for getting the limited number of events that the user asked
# for, and another query for getting the total number of events that could be
# returned. Thus allowing us to see if there are more events to paginate through
info_sql = """
SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members
FROM room_stats_state state
INNER JOIN room_stats_current curr USING (room_id)
%s
ORDER BY %s %s
LIMIT ?
OFFSET ?
""" % (
where_statement,
order_by_column,
"ASC" if order_by_asc else "DESC",
)
# Use a nested SELECT statement as SQL can't count(*) with an OFFSET
count_sql = """
SELECT count(*) FROM (
SELECT room_id FROM room_stats_state state
%s
) AS get_room_ids
""" % (
where_statement,
)
def _get_rooms_paginate_txn(txn):
# Execute the data query
sql_values = (limit, start)
if search_term:
# Add the search term into the WHERE clause
sql_values = (search_term,) + sql_values
txn.execute(info_sql, sql_values)
# Refactor room query data into a structured dictionary
rooms = []
for room in txn:
rooms.append(
{
"room_id": room[0],
"name": room[1],
"canonical_alias": room[2],
"joined_members": room[3],
}
)
# Execute the count query
# Add the search term into the WHERE clause if present
sql_values = (search_term,) if search_term else ()
txn.execute(count_sql, sql_values)
room_count = txn.fetchone()
return rooms, room_count[0]
return await self.db.runInteraction(
"get_rooms_paginate", _get_rooms_paginate_txn,
)
@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given

View File

@@ -17,19 +17,24 @@
import logging
from collections import deque, namedtuple
from typing import Iterable, List, Optional, Tuple
from six import iteritems
from six.moves import range
import attr
from prometheus_client import Counter, Histogram
from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.data_stores import DataStores
from synapse.types import StateMap
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.metrics import Measure
@@ -67,6 +72,19 @@ stale_forward_extremities_counter = Histogram(
)
@attr.s(slots=True, frozen=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
Attributes:
to_delete: List of type/state_keys to delete from current state
to_insert: Map of state to upsert into current state
"""
to_delete = attr.ib(type=List[Tuple[str, str]])
to_insert = attr.ib(type=StateMap[str])
class _EventPeristenceQueue(object):
"""Queues up events so that they can be persisted in bulk with only one
concurrent transaction per room.
@@ -138,13 +156,12 @@ class _EventPeristenceQueue(object):
self._currently_persisting_rooms.add(room_id)
@defer.inlineCallbacks
def handle_queue_loop():
async def handle_queue_loop():
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
ret = yield per_item_callback(item)
ret = await per_item_callback(item)
except Exception:
with PreserveLoggingContext():
item.deferred.errback()
@@ -191,12 +208,16 @@ class EventsPersistenceStorage(object):
self._state_resolution_handler = hs.get_state_resolution_handler()
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
def persist_events(
self,
events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
backfilled: bool = False,
):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled (bool): Whether the results are retrieved from federation
backfilled: Whether the results are retrieved from federation
via backfill or not. Used to determine if they're "new" events
which might update the current state etc.
@@ -226,16 +247,12 @@ class EventsPersistenceStorage(object):
return max_persisted_id
@defer.inlineCallbacks
def persist_event(self, event, context, backfilled=False):
def persist_event(
self, event: FrozenEvent, context: EventContext, backfilled: bool = False
):
"""
Args:
event (EventBase):
context (EventContext):
backfilled (bool):
Returns:
Deferred: resolves to (int, int): the stream ordering of ``event``,
Deferred[Tuple[int, int]]: the stream ordering of ``event``,
and the stream ordering of the latest persisted event
"""
deferred = self._event_persist_queue.add_to_queue(
@@ -249,28 +266,22 @@ class EventsPersistenceStorage(object):
max_persisted_id = yield self.main_store.get_current_events_token()
return (event.internal_metadata.stream_ordering, max_persisted_id)
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
def persisting_queue(item):
def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item):
with Measure(self._clock, "persist_events"):
yield self._persist_events(
await self._persist_events(
item.events_and_contexts, backfilled=item.backfilled
)
self._event_persist_queue.handle_queue(room_id, persisting_queue)
@defer.inlineCallbacks
def _persist_events(self, events_and_contexts, backfilled=False):
async def _persist_events(
self,
events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
backfilled: bool = False,
):
"""Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Args:
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
delete_existing (bool):
Returns:
Deferred: resolves when the events have been persisted
"""
if not events_and_contexts:
return
@@ -315,10 +326,10 @@ class EventsPersistenceStorage(object):
)
for room_id, ev_ctx_rm in iteritems(events_by_room):
latest_event_ids = yield self.main_store.get_latest_event_ids_in_room(
latest_event_ids = await self.main_store.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremities(
new_latest_event_ids = await self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
@@ -374,7 +385,7 @@ class EventsPersistenceStorage(object):
with Measure(
self._clock, "persist_events.get_new_state_after_events"
):
res = yield self._get_new_state_after_events(
res = await self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
@@ -389,12 +400,12 @@ class EventsPersistenceStorage(object):
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
state_delta_for_room[room_id] = DeltaState([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
):
delta = yield self._calculate_state_delta(
delta = await self._calculate_state_delta(
room_id, current_state
)
state_delta_for_room[room_id] = delta
@@ -404,7 +415,7 @@ class EventsPersistenceStorage(object):
if current_state is not None:
current_state_for_room[room_id] = current_state
yield self.main_store._persist_events_and_state_updates(
await self.main_store._persist_events_and_state_updates(
chunk,
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
@@ -412,8 +423,12 @@ class EventsPersistenceStorage(object):
backfilled=backfilled,
)
@defer.inlineCallbacks
def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
async def _calculate_new_extremities(
self,
room_id: str,
event_contexts: List[Tuple[FrozenEvent, EventContext]],
latest_event_ids: List[str],
):
"""Calculates the new forward extremities for a room given events to
persist.
@@ -444,13 +459,13 @@ class EventsPersistenceStorage(object):
)
# Remove any events which are prev_events of any existing events.
existing_prevs = yield self.main_store._get_events_which_are_prevs(result)
existing_prevs = await self.main_store._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev
# events. If they do we need to remove them and their prev events,
# otherwise we end up with dangling extremities.
existing_prevs = yield self.main_store._get_prevs_before_rejected(
existing_prevs = await self.main_store._get_prevs_before_rejected(
e_id for event in new_events for e_id in event.prev_event_ids()
)
result.difference_update(existing_prevs)
@@ -464,10 +479,13 @@ class EventsPersistenceStorage(object):
return result
@defer.inlineCallbacks
def _get_new_state_after_events(
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
):
async def _get_new_state_after_events(
self,
room_id: str,
events_context: List[Tuple[FrozenEvent, EventContext]],
old_latest_event_ids: Iterable[str],
new_latest_event_ids: Iterable[str],
) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]:
"""Calculate the current state dict after adding some new events to
a room
@@ -485,7 +503,6 @@ class EventsPersistenceStorage(object):
the new forward extremities for the room.
Returns:
Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
Returns a tuple of two state maps, the first being the full new current
state and the second being the delta to the existing current state.
If both are None then there has been no change.
@@ -547,7 +564,7 @@ class EventsPersistenceStorage(object):
if missing_event_ids:
# Now pull out the state groups for any missing events from DB
event_to_groups = yield self.main_store._get_state_group_for_events(
event_to_groups = await self.main_store._get_state_group_for_events(
missing_event_ids
)
event_id_to_state_group.update(event_to_groups)
@@ -588,7 +605,7 @@ class EventsPersistenceStorage(object):
# their state IDs so we can resolve to a single state set.
missing_state = new_state_groups - set(state_groups_map)
if missing_state:
group_to_state = yield self.state_store._get_state_for_groups(missing_state)
group_to_state = await self.state_store._get_state_for_groups(missing_state)
state_groups_map.update(group_to_state)
if len(new_state_groups) == 1:
@@ -612,10 +629,10 @@ class EventsPersistenceStorage(object):
break
if not room_version:
room_version = yield self.main_store.get_room_version(room_id)
room_version = await self.main_store.get_room_version(room_id)
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
res = await self._state_resolution_handler.resolve_state_groups(
room_id,
room_version,
state_groups,
@@ -625,18 +642,14 @@ class EventsPersistenceStorage(object):
return res.state, None
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
async def _calculate_state_delta(
self, room_id: str, current_state: StateMap[str]
) -> DeltaState:
"""Calculate the new state deltas for a room.
Assumes that we are only persisting events for one room at a time.
Returns:
tuple[list, dict] (to_delete, to_insert): where to_delete are the
type/state_keys to remove from current_state_events and `to_insert`
are the updates to current_state_events.
"""
existing_state = yield self.main_store.get_current_state_ids(room_id)
existing_state = await self.main_store.get_current_state_ids(room_id)
to_delete = [key for key in existing_state if key not in current_state]
@@ -646,4 +659,4 @@ class EventsPersistenceStorage(object):
if ev_id != existing_state.get(key)
}
return to_delete, to_insert
return DeltaState(to_delete=to_delete, to_insert=to_insert)

View File

@@ -17,6 +17,7 @@ import json
import os
import urllib.parse
from binascii import unhexlify
from typing import List, Optional
from mock import Mock
@@ -26,7 +27,7 @@ import synapse.rest.admin
from synapse.http.server import JsonResource
from synapse.logging.context import make_deferred_yieldable
from synapse.rest.admin import VersionServlet
from synapse.rest.client.v1 import events, login, room
from synapse.rest.client.v1 import directory, events, login, room
from synapse.rest.client.v2_alpha import groups
from tests import unittest
@@ -468,9 +469,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
)
# Extract media ID from the response
server_name_and_media_id = response["content_uri"][
6:
] # Cut off the 'mxc://' bit
server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
server_name, media_id = server_name_and_media_id.split("/")
# Attempt to access the media
@@ -516,7 +515,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
),
)
def test_quarantine_all_media_in_room(self):
def test_quarantine_all_media_in_room(self, override_url_template=None):
self.register_user("room_admin", "pass", admin=True)
admin_user_tok = self.login("room_admin", "pass")
@@ -555,9 +554,12 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
)
# Quarantine all media in the room
url = "/_synapse/admin/v1/room/%s/media/quarantine" % urllib.parse.quote(
room_id
)
if override_url_template:
url = override_url_template % urllib.parse.quote(room_id)
else:
url = "/_synapse/admin/v1/room/%s/media/quarantine" % urllib.parse.quote(
room_id
)
request, channel = self.make_request("POST", url, access_token=admin_user_tok,)
self.render(request)
self.pump(1.0)
@@ -611,6 +613,10 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
),
)
def test_quaraantine_all_media_in_room_deprecated_api_path(self):
# Perform the above test with the deprecated API path
self.test_quarantine_all_media_in_room("/_synapse/admin/v1/quarantine_media/%s")
def test_quarantine_all_media_by_user(self):
self.register_user("user_admin", "pass", admin=True)
admin_user_tok = self.login("user_admin", "pass")
@@ -685,3 +691,389 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
% server_and_media_id_2
),
)
class RoomTestCase(unittest.HomeserverTestCase):
"""Test /room admin API.
"""
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
directory.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
# Create user
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
def test_list_rooms(self):
"""Test that we can list rooms"""
# Create 3 test rooms
total_rooms = 3
room_ids = []
for x in range(total_rooms):
room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok
)
room_ids.append(room_id)
# Request the list of rooms
url = "/_synapse/admin/v1/rooms"
request, channel = self.make_request(
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
)
self.render(request)
# Check request completed successfully
self.assertEqual(200, int(channel.code), msg=channel.json_body)
# Check that response json body contains a "rooms" key
self.assertTrue(
"rooms" in channel.json_body,
msg="Response body does not " "contain a 'rooms' key",
)
# Check that 3 rooms were returned
self.assertEqual(3, len(channel.json_body["rooms"]), msg=channel.json_body)
# Check their room_ids match
returned_room_ids = [room["room_id"] for room in channel.json_body["rooms"]]
self.assertEqual(room_ids, returned_room_ids)
# Check that all fields are available
for r in channel.json_body["rooms"]:
self.assertIn("name", r)
self.assertIn("canonical_alias", r)
self.assertIn("joined_members", r)
# Check that the correct number of total rooms was returned
self.assertEqual(channel.json_body["total_rooms"], total_rooms)
# Check that the offset is correct
# Should be 0 as we aren't paginating
self.assertEqual(channel.json_body["offset"], 0)
# Check that the prev_batch parameter is not present
self.assertNotIn("prev_batch", channel.json_body)
# We shouldn't receive a next token here as there's no further rooms to show
self.assertNotIn("next_batch", channel.json_body)
def test_list_rooms_pagination(self):
"""Test that we can get a full list of rooms through pagination"""
# Create 5 test rooms
total_rooms = 5
room_ids = []
for x in range(total_rooms):
room_id = self.helper.create_room_as(
self.admin_user, tok=self.admin_user_tok
)
room_ids.append(room_id)
# Set the name of the rooms so we get a consistent returned ordering
for idx, room_id in enumerate(room_ids):
self.helper.send_state(
room_id, "m.room.name", {"name": str(idx)}, tok=self.admin_user_tok,
)
# Request the list of rooms
returned_room_ids = []
start = 0
limit = 2
run_count = 0
should_repeat = True
while should_repeat:
run_count += 1
url = "/_synapse/admin/v1/rooms?from=%d&limit=%d&order_by=%s" % (
start,
limit,
"alphabetical",
)
request, channel = self.make_request(
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(
200, int(channel.result["code"]), msg=channel.result["body"]
)
self.assertTrue("rooms" in channel.json_body)
for r in channel.json_body["rooms"]:
returned_room_ids.append(r["room_id"])
# Check that the correct number of total rooms was returned
self.assertEqual(channel.json_body["total_rooms"], total_rooms)
# Check that the offset is correct
# We're only getting 2 rooms each page, so should be 2 * last run_count
self.assertEqual(channel.json_body["offset"], 2 * (run_count - 1))
if run_count > 1:
# Check the value of prev_batch is correct
self.assertEqual(channel.json_body["prev_batch"], 2 * (run_count - 2))
if "next_batch" not in channel.json_body:
# We have reached the end of the list
should_repeat = False
else:
# Make another query with an updated start value
start = channel.json_body["next_batch"]
# We should've queried the endpoint 3 times
self.assertEqual(
run_count,
3,
msg="Should've queried 3 times for 5 rooms with limit 2 per query",
)
# Check that we received all of the room ids
self.assertEqual(room_ids, returned_room_ids)
url = "/_synapse/admin/v1/rooms?from=%d&limit=%d" % (start, limit)
request, channel = self.make_request(
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
def test_correct_room_attributes(self):
"""Test the correct attributes for a room are returned"""
# Create a test room
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
test_alias = "#test:test"
test_room_name = "something"
# Have another user join the room
user_2 = self.register_user("user4", "pass")
user_tok_2 = self.login("user4", "pass")
self.helper.join(room_id, user_2, tok=user_tok_2)
# Create a new alias to this room
url = "/_matrix/client/r0/directory/room/%s" % (urllib.parse.quote(test_alias),)
request, channel = self.make_request(
"PUT",
url.encode("ascii"),
{"room_id": room_id},
access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
# Set this new alias as the canonical alias for this room
self.helper.send_state(
room_id,
"m.room.aliases",
{"aliases": [test_alias]},
tok=self.admin_user_tok,
state_key="test",
)
self.helper.send_state(
room_id,
"m.room.canonical_alias",
{"alias": test_alias},
tok=self.admin_user_tok,
)
# Set a name for the room
self.helper.send_state(
room_id, "m.room.name", {"name": test_room_name}, tok=self.admin_user_tok,
)
# Request the list of rooms
url = "/_synapse/admin/v1/rooms"
request, channel = self.make_request(
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
# Check that rooms were returned
self.assertTrue("rooms" in channel.json_body)
rooms = channel.json_body["rooms"]
# Check that only one room was returned
self.assertEqual(len(rooms), 1)
# And that the value of the total_rooms key was correct
self.assertEqual(channel.json_body["total_rooms"], 1)
# Check that the offset is correct
# We're not paginating, so should be 0
self.assertEqual(channel.json_body["offset"], 0)
# Check that there is no `prev_batch`
self.assertNotIn("prev_batch", channel.json_body)
# Check that there is no `next_batch`
self.assertNotIn("next_batch", channel.json_body)
# Check that all provided attributes are set
r = rooms[0]
self.assertEqual(room_id, r["room_id"])
self.assertEqual(test_room_name, r["name"])
self.assertEqual(test_alias, r["canonical_alias"])
def test_room_list_sort_order(self):
"""Test room list sort ordering. alphabetical versus number of members,
reversing the order, etc.
"""
# Create 3 test rooms
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
room_id_3 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
# Set room names in alphabetical order. room 1 -> A, 2 -> B, 3 -> C
self.helper.send_state(
room_id_1, "m.room.name", {"name": "A"}, tok=self.admin_user_tok,
)
self.helper.send_state(
room_id_2, "m.room.name", {"name": "B"}, tok=self.admin_user_tok,
)
self.helper.send_state(
room_id_3, "m.room.name", {"name": "C"}, tok=self.admin_user_tok,
)
# Set room member size in the reverse order. room 1 -> 1 member, 2 -> 2, 3 -> 3
user_1 = self.register_user("bob1", "pass")
user_1_tok = self.login("bob1", "pass")
self.helper.join(room_id_2, user_1, tok=user_1_tok)
user_2 = self.register_user("bob2", "pass")
user_2_tok = self.login("bob2", "pass")
self.helper.join(room_id_3, user_2, tok=user_2_tok)
user_3 = self.register_user("bob3", "pass")
user_3_tok = self.login("bob3", "pass")
self.helper.join(room_id_3, user_3, tok=user_3_tok)
def _order_test(
order_type: str, expected_room_list: List[str], reverse: bool = False,
):
"""Request the list of rooms in a certain order. Assert that order is what
we expect
Args:
order_type: The type of ordering to give the server
expected_room_list: The list of room_ids in the order we expect to get
back from the server
"""
# Request the list of rooms in the given order
url = "/_synapse/admin/v1/rooms?order_by=%s" % (order_type,)
if reverse:
url += "&dir=b"
request, channel = self.make_request(
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, channel.code, msg=channel.json_body)
# Check that rooms were returned
self.assertTrue("rooms" in channel.json_body)
rooms = channel.json_body["rooms"]
# Check for the correct total_rooms value
self.assertEqual(channel.json_body["total_rooms"], 3)
# Check that the offset is correct
# We're not paginating, so should be 0
self.assertEqual(channel.json_body["offset"], 0)
# Check that there is no `prev_batch`
self.assertNotIn("prev_batch", channel.json_body)
# Check that there is no `next_batch`
self.assertNotIn("next_batch", channel.json_body)
# Check that rooms were returned in alphabetical order
returned_order = [r["room_id"] for r in rooms]
self.assertListEqual(expected_room_list, returned_order) # order is checked
# Test different sort orders, with forward and reverse directions
_order_test("alphabetical", [room_id_1, room_id_2, room_id_3])
_order_test("alphabetical", [room_id_3, room_id_2, room_id_1], reverse=True)
_order_test("size", [room_id_3, room_id_2, room_id_1])
_order_test("size", [room_id_1, room_id_2, room_id_3], reverse=True)
def test_search_term(self):
"""Test that searching for a room works correctly"""
# Create two test rooms
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
room_name_1 = "something"
room_name_2 = "else"
# Set the name for each room
self.helper.send_state(
room_id_1, "m.room.name", {"name": room_name_1}, tok=self.admin_user_tok,
)
self.helper.send_state(
room_id_2, "m.room.name", {"name": room_name_2}, tok=self.admin_user_tok,
)
def _search_test(
expected_room_id: Optional[str],
search_term: str,
expected_http_code: int = 200,
):
"""Search for a room and check that the returned room's id is a match
Args:
expected_room_id: The room_id expected to be returned by the API. Set
to None to expect zero results for the search
search_term: The term to search for room names with
expected_http_code: The expected http code for the request
"""
url = "/_synapse/admin/v1/rooms?search_term=%s" % (search_term,)
request, channel = self.make_request(
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(expected_http_code, channel.code, msg=channel.json_body)
if expected_http_code != 200:
return
# Check that rooms were returned
self.assertTrue("rooms" in channel.json_body)
rooms = channel.json_body["rooms"]
# Check that the expected number of rooms were returned
expected_room_count = 1 if expected_room_id else 0
self.assertEqual(len(rooms), expected_room_count)
self.assertEqual(channel.json_body["total_rooms"], expected_room_count)
# Check that the offset is correct
# We're not paginating, so should be 0
self.assertEqual(channel.json_body["offset"], 0)
# Check that there is no `prev_batch`
self.assertNotIn("prev_batch", channel.json_body)
# Check that there is no `next_batch`
self.assertNotIn("next_batch", channel.json_body)
if expected_room_id:
# Check that the first returned room id is correct
r = rooms[0]
self.assertEqual(expected_room_id, r["room_id"])
# Perform search tests
_search_test(room_id_1, "something")
_search_test(room_id_1, "thing")
_search_test(room_id_2, "else")
_search_test(room_id_2, "se")
_search_test(None, "foo")
_search_test(None, "bar")
_search_test(None, "", expected_http_code=400)

View File

@@ -435,6 +435,19 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["is_guest"])
self.assertEqual(0, channel.json_body["deactivated"])
# Change password
body = json.dumps({"password": "hahaha"})
request, channel = self.make_request(
"PUT",
self.url,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
# Modify user
body = json.dumps({"displayname": "foobar", "deactivated": True})

View File

@@ -134,3 +134,30 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase):
# someone else set topic, expect 6 (join,send,topic,join,send,topic)
pass
class GetEventsTestCase(unittest.HomeserverTestCase):
servlets = [
events.register_servlets,
room.register_servlets,
synapse.rest.admin.register_servlets_for_client_rest_resource,
login.register_servlets,
]
def prepare(self, hs, reactor, clock):
# register an account
self.user_id = self.register_user("sid1", "pass")
self.token = self.login(self.user_id, "pass")
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
def test_get_event_via_events(self):
resp = self.helper.send(self.room_id, tok=self.token)
event_id = resp["event_id"]
request, channel = self.make_request(
"GET", "/events/" + event_id, access_token=self.token,
)
self.render(request)
self.assertEquals(channel.code, 200, msg=channel.result)

View File

@@ -149,6 +149,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
self.media_repo = hs.get_media_repository_resource()
self.download_resource = self.media_repo.children[b"download"]
self.thumbnail_resource = self.media_repo.children[b"thumbnail"]
# smol png
self.end_content = unhexlify(
@@ -157,11 +158,11 @@ class MediaRepoTests(unittest.HomeserverTestCase):
b"0a2db40000000049454e44ae426082"
)
self.media_id = "example.com/12345"
def _req(self, content_disposition):
request, channel = self.make_request(
"GET", "example.com/12345", shorthand=False
)
request, channel = self.make_request("GET", self.media_id, shorthand=False)
request.render(self.download_resource)
self.pump()
@@ -170,7 +171,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
self.assertEqual(len(self.fetches), 1)
self.assertEqual(self.fetches[0][1], "example.com")
self.assertEqual(
self.fetches[0][2], "/_matrix/media/v1/download/example.com/12345"
self.fetches[0][2], "/_matrix/media/v1/download/" + self.media_id
)
self.assertEqual(self.fetches[0][3], {"allow_remote": "false"})
@@ -229,3 +230,42 @@ class MediaRepoTests(unittest.HomeserverTestCase):
headers = channel.headers
self.assertEqual(headers.getRawHeaders(b"Content-Type"), [b"image/png"])
self.assertEqual(headers.getRawHeaders(b"Content-Disposition"), None)
def test_thumbnail_crop(self):
expected_body = unhexlify(
b"89504e470d0a1a0a0000000d4948445200000020000000200806"
b"000000737a7af40000001a49444154789cedc101010000008220"
b"ffaf6e484001000000ef0610200001194334ee0000000049454e"
b"44ae426082"
)
self._test_thumbnail("crop", expected_body)
def test_thumbnail_scale(self):
expected_body = unhexlify(
b"89504e470d0a1a0a0000000d4948445200000001000000010806"
b"0000001f15c4890000000d49444154789c636060606000000005"
b"0001a5f645400000000049454e44ae426082"
)
self._test_thumbnail("scale", expected_body)
def _test_thumbnail(self, method, expected_body):
params = "?width=32&height=32&method=" + method
request, channel = self.make_request(
"GET", self.media_id + params, shorthand=False
)
request.render(self.thumbnail_resource)
self.pump()
headers = {
b"Content-Length": [b"%d" % (len(self.end_content))],
b"Content-Type": [b"image/png"],
}
self.fetches[0][0].callback(
(self.end_content, (len(self.end_content), headers))
)
self.pump()
self.assertEqual(channel.code, 200)
self.assertEqual(channel.result["body"], expected_body, channel.result["body"])

View File

@@ -463,7 +463,7 @@ class HomeserverTestCase(TestCase):
# Create the user
request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
self.render(request)
self.assertEqual(channel.code, 200)
self.assertEqual(channel.code, 200, msg=channel.result)
nonce = channel.json_body["nonce"]
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)

View File

@@ -177,13 +177,13 @@ env =
MYPYPATH = stubs/
extras = all
commands = mypy \
synapse/api \
synapse/config/ \
synapse/handlers/ui_auth \
synapse/logging/ \
synapse/module_api \
synapse/replication \
synapse/rest/consent \
synapse/rest/saml2 \
synapse/rest \
synapse/spam_checker_api \
synapse/storage/engines \
synapse/streams