Compare commits
37 Commits
v1.9.0.dev
...
v1.9.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77d9357226 | ||
|
|
bdbeeb94ec | ||
|
|
9bae740527 | ||
|
|
1755326d8a | ||
|
|
1dc5a791cf | ||
|
|
ba64c3b615 | ||
|
|
f3eac2b3e9 | ||
|
|
d31f5f4d89 | ||
|
|
33f7e5ce2a | ||
|
|
91085ef49e | ||
|
|
ffa637050d | ||
|
|
0d0f32bc53 | ||
|
|
90a28fb475 | ||
|
|
ae6cf586b0 | ||
|
|
6ae0c8db33 | ||
|
|
d9a8728b11 | ||
|
|
67aa18e8dc | ||
|
|
ed83c3a018 | ||
|
|
aa9b00fb2f | ||
|
|
5e52d8563b | ||
|
|
5d7a6ad223 | ||
|
|
2093f83ea0 | ||
|
|
837f62266b | ||
|
|
07124d028d | ||
|
|
0e68760078 | ||
|
|
b0a66ab83c | ||
|
|
74b74462f1 | ||
|
|
0f6e525be3 | ||
|
|
ceecedc68b | ||
|
|
e9e066055f | ||
|
|
351fdfede6 | ||
|
|
2f23eb27b3 | ||
|
|
11c23af465 | ||
|
|
026f4bdf3c | ||
|
|
198d52da3a | ||
|
|
a17f64361c | ||
|
|
5909751936 |
@@ -1,22 +0,0 @@
|
||||
version: '3.1'
|
||||
|
||||
services:
|
||||
|
||||
postgres:
|
||||
image: postgres:9.5
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.5
|
||||
depends_on:
|
||||
- postgres
|
||||
env_file: .env
|
||||
environment:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /src
|
||||
volumes:
|
||||
- ..:/src
|
||||
@@ -1,22 +0,0 @@
|
||||
version: '3.1'
|
||||
|
||||
services:
|
||||
|
||||
postgres:
|
||||
image: postgres:11
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.7
|
||||
depends_on:
|
||||
- postgres
|
||||
env_file: .env
|
||||
environment:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /src
|
||||
volumes:
|
||||
- ..:/src
|
||||
@@ -1,22 +0,0 @@
|
||||
version: '3.1'
|
||||
|
||||
services:
|
||||
|
||||
postgres:
|
||||
image: postgres:9.5
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.7
|
||||
depends_on:
|
||||
- postgres
|
||||
env_file: .env
|
||||
environment:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /src
|
||||
volumes:
|
||||
- ..:/src
|
||||
93
CHANGES.md
93
CHANGES.md
@@ -1,3 +1,96 @@
|
||||
Synapse 1.9.1 (2020-01-28)
|
||||
==========================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug where setting `mau_limit_reserved_threepids` config would cause Synapse to refuse to start. ([\#6793](https://github.com/matrix-org/synapse/issues/6793))
|
||||
|
||||
|
||||
Synapse 1.9.0 (2020-01-23)
|
||||
==========================
|
||||
|
||||
**WARNING**: As of this release, Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)).
|
||||
|
||||
If your Synapse deployment uses workers, note that the reverse-proxy configurations for the `synapse.app.media_repository`, `synapse.app.federation_reader` and `synapse.app.event_creator` workers have changed, with the addition of a few paths (see the updated configurations [here](docs/workers.md#available-worker-applications)). Existing configurations will continue to work.
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Fix endpoint documentation for the List Rooms admin API. ([\#6770](https://github.com/matrix-org/synapse/issues/6770))
|
||||
|
||||
|
||||
Synapse 1.9.0rc1 (2020-01-22)
|
||||
=============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Allow admin to create or modify a user. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5742](https://github.com/matrix-org/synapse/issues/5742))
|
||||
- Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media. ([\#6681](https://github.com/matrix-org/synapse/issues/6681), [\#6756](https://github.com/matrix-org/synapse/issues/6756))
|
||||
- Add `org.matrix.e2e_cross_signing` to `unstable_features` in `/versions` as per [MSC1756](https://github.com/matrix-org/matrix-doc/pull/1756). ([\#6712](https://github.com/matrix-org/synapse/issues/6712))
|
||||
- Add a new admin API to list and filter rooms on the server. ([\#6720](https://github.com/matrix-org/synapse/issues/6720))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Correctly proxy HTTP errors due to API calls to remote group servers. ([\#6654](https://github.com/matrix-org/synapse/issues/6654))
|
||||
- Fix media repo admin APIs when using a media worker. ([\#6664](https://github.com/matrix-org/synapse/issues/6664))
|
||||
- Fix "CRITICAL" errors being logged when a request is received for a uri containing non-ascii characters. ([\#6682](https://github.com/matrix-org/synapse/issues/6682))
|
||||
- Fix a bug where we would assign a numeric user ID if somebody tried registering with an empty username. ([\#6690](https://github.com/matrix-org/synapse/issues/6690))
|
||||
- Fix `purge_room` admin API. ([\#6711](https://github.com/matrix-org/synapse/issues/6711))
|
||||
- Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs. ([\#6714](https://github.com/matrix-org/synapse/issues/6714))
|
||||
- Fix the `synapse_port_db` not correctly running background updates. Thanks @tadzik for reporting. ([\#6718](https://github.com/matrix-org/synapse/issues/6718))
|
||||
- Fix changing password via user admin API. ([\#6730](https://github.com/matrix-org/synapse/issues/6730))
|
||||
- Fix `/events/:event_id` deprecated API. ([\#6731](https://github.com/matrix-org/synapse/issues/6731))
|
||||
- Fix monthly active user limiting support for worker mode, fixes [#4639](https://github.com/matrix-org/synapse/issues/4639). ([\#6742](https://github.com/matrix-org/synapse/issues/6742))
|
||||
- Fix bug when setting `account_validity` to an empty block in the config. Thanks to @Sorunome for reporting. ([\#6747](https://github.com/matrix-org/synapse/issues/6747))
|
||||
- Fix `AttributeError: 'NoneType' object has no attribute 'get'` in `hash_password` when configuration has an empty `password_config`. Contributed by @ivilata. ([\#6753](https://github.com/matrix-org/synapse/issues/6753))
|
||||
- Fix the `docker-compose.yaml` overriding the entire `/etc` folder of the container. Contributed by Fabian Meyer. ([\#6656](https://github.com/matrix-org/synapse/issues/6656))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Fix a typo in the configuration example for purge jobs in the sample configuration file. ([\#6621](https://github.com/matrix-org/synapse/issues/6621))
|
||||
- Add complete documentation of the message retention policies support. ([\#6624](https://github.com/matrix-org/synapse/issues/6624), [\#6665](https://github.com/matrix-org/synapse/issues/6665))
|
||||
- Add some helpful tips about changelog entries to the GitHub pull request template. ([\#6663](https://github.com/matrix-org/synapse/issues/6663))
|
||||
- Clarify the `account_validity` and `email` sections of the sample configuration. ([\#6685](https://github.com/matrix-org/synapse/issues/6685))
|
||||
- Add more endpoints to the documentation for Synapse workers. ([\#6698](https://github.com/matrix-org/synapse/issues/6698))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)). ([\#6675](https://github.com/matrix-org/synapse/issues/6675))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Add `local_current_membership` table for tracking local user membership state in rooms. ([\#6655](https://github.com/matrix-org/synapse/issues/6655), [\#6728](https://github.com/matrix-org/synapse/issues/6728))
|
||||
- Port `synapse.replication.tcp` to async/await. ([\#6666](https://github.com/matrix-org/synapse/issues/6666))
|
||||
- Fixup `synapse.replication` to pass mypy checks. ([\#6667](https://github.com/matrix-org/synapse/issues/6667))
|
||||
- Allow `additional_resources` to implement `IResource` directly. ([\#6686](https://github.com/matrix-org/synapse/issues/6686))
|
||||
- Allow REST endpoint implementations to raise a `RedirectException`, which will redirect the user's browser to a given location. ([\#6687](https://github.com/matrix-org/synapse/issues/6687))
|
||||
- Updates and extensions to the module API. ([\#6688](https://github.com/matrix-org/synapse/issues/6688))
|
||||
- Updates to the SAML mapping provider API. ([\#6689](https://github.com/matrix-org/synapse/issues/6689), [\#6723](https://github.com/matrix-org/synapse/issues/6723))
|
||||
- Remove redundant `RegistrationError` class. ([\#6691](https://github.com/matrix-org/synapse/issues/6691))
|
||||
- Don't block processing of incoming EDUs behind processing PDUs in the same transaction. ([\#6697](https://github.com/matrix-org/synapse/issues/6697))
|
||||
- Remove duplicate check for the `session` query parameter on the `/auth/xxx/fallback/web` Client-Server endpoint. ([\#6702](https://github.com/matrix-org/synapse/issues/6702))
|
||||
- Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data. ([\#6706](https://github.com/matrix-org/synapse/issues/6706))
|
||||
- Add `StateMap` type alias to simplify types. ([\#6715](https://github.com/matrix-org/synapse/issues/6715))
|
||||
- Add a `DeltaState` to track changes to be made to current state during event persistence. ([\#6716](https://github.com/matrix-org/synapse/issues/6716))
|
||||
- Add more logging around message retention policies support. ([\#6717](https://github.com/matrix-org/synapse/issues/6717))
|
||||
- When processing a SAML response, log the assertions for easier configuration. ([\#6724](https://github.com/matrix-org/synapse/issues/6724))
|
||||
- Fixup `synapse.rest` to pass mypy. ([\#6732](https://github.com/matrix-org/synapse/issues/6732), [\#6764](https://github.com/matrix-org/synapse/issues/6764))
|
||||
- Fixup `synapse.api` to pass mypy. ([\#6733](https://github.com/matrix-org/synapse/issues/6733))
|
||||
- Allow streaming cache 'invalidate all' to workers. ([\#6749](https://github.com/matrix-org/synapse/issues/6749))
|
||||
- Remove unused CI docker compose files. ([\#6754](https://github.com/matrix-org/synapse/issues/6754))
|
||||
|
||||
|
||||
Synapse 1.8.0 (2020-01-09)
|
||||
==========================
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Allow admin to create or modify a user. Contributed by Awesome Technologies Innovationslabor GmbH.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a typo in the configuration example for purge jobs in the sample configuration file.
|
||||
@@ -1 +0,0 @@
|
||||
Add complete documentation of the message retention policies support.
|
||||
@@ -1 +0,0 @@
|
||||
Correctly proxy HTTP errors due to API calls to remote group servers.
|
||||
@@ -1 +0,0 @@
|
||||
Add `local_current_membership` table for tracking local user membership state in rooms.
|
||||
@@ -1 +0,0 @@
|
||||
No more overriding the entire /etc folder of the container in docker-compose.yaml. Contributed by Fabian Meyer.
|
||||
@@ -1 +0,0 @@
|
||||
Add some helpful tips about changelog entries to the github pull request template.
|
||||
@@ -1 +0,0 @@
|
||||
Fix media repo admin APIs when using a media worker.
|
||||
@@ -1 +0,0 @@
|
||||
Add complete documentation of the message retention policies support.
|
||||
@@ -1 +0,0 @@
|
||||
Port `synapse.replication.tcp` to async/await.
|
||||
@@ -1 +0,0 @@
|
||||
Fixup `synapse.replication` to pass mypy checks.
|
||||
@@ -1 +0,0 @@
|
||||
Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)).
|
||||
@@ -1 +0,0 @@
|
||||
Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media.
|
||||
@@ -1,2 +0,0 @@
|
||||
Fix "CRITICAL" errors being logged when a request is received for a uri containing non-ascii characters.
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Clarify the `account_validity` and `email` sections of the sample configuration.
|
||||
@@ -1 +0,0 @@
|
||||
Allow additional_resources to implement IResource directly.
|
||||
@@ -1 +0,0 @@
|
||||
Allow REST endpoint implementations to raise a RedirectException, which will redirect the user's browser to a given location.
|
||||
@@ -1 +0,0 @@
|
||||
Updates and extensions to the module API.
|
||||
@@ -1 +0,0 @@
|
||||
Updates to the SAML mapping provider API.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug where we would assign a numeric userid if somebody tried registering with an empty username.
|
||||
@@ -1 +0,0 @@
|
||||
Remove redundant RegistrationError class.
|
||||
@@ -1 +0,0 @@
|
||||
Don't block processing of incoming EDUs behind processing PDUs in the same transaction.
|
||||
@@ -1 +0,0 @@
|
||||
Add more endpoints to the documentation for Synapse workers.
|
||||
@@ -1 +0,0 @@
|
||||
Remove duplicate check for the `session` query parameter on the `/auth/xxx/fallback/web` Client-Server endpoint.
|
||||
@@ -1 +0,0 @@
|
||||
Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data.
|
||||
@@ -1 +0,0 @@
|
||||
Fix `purge_room` admin API.
|
||||
@@ -1 +0,0 @@
|
||||
Add org.matrix.e2e_cross_signing to unstable_features in /versions as per [MSC1756](https://github.com/matrix-org/matrix-doc/pull/1756).
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs.
|
||||
@@ -1 +0,0 @@
|
||||
Add StateMap type alias to simplify types.
|
||||
@@ -1 +0,0 @@
|
||||
Updates to the SAML mapping provider API.
|
||||
@@ -1 +0,0 @@
|
||||
When processing a SAML response, log the assertions for easier configuration.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug causing `ValueError: unsupported format character ''' (0x27) at index 312` error when running the schema 57 upgrade script.
|
||||
12
debian/changelog
vendored
12
debian/changelog
vendored
@@ -1,3 +1,15 @@
|
||||
matrix-synapse-py3 (1.9.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.9.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 28 Jan 2020 13:09:23 +0000
|
||||
|
||||
matrix-synapse-py3 (1.9.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.9.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 23 Jan 2020 12:56:31 +0000
|
||||
|
||||
matrix-synapse-py3 (1.8.0) stable; urgency=medium
|
||||
|
||||
[ Richard van der Hoff ]
|
||||
|
||||
173
docs/admin_api/rooms.md
Normal file
173
docs/admin_api/rooms.md
Normal file
@@ -0,0 +1,173 @@
|
||||
# List Room API
|
||||
|
||||
The List Room admin API allows server admins to get a list of rooms on their
|
||||
server. There are various parameters available that allow for filtering and
|
||||
sorting the returned list. This API supports pagination.
|
||||
|
||||
## Parameters
|
||||
|
||||
The following query parameters are available:
|
||||
|
||||
* `from` - Offset in the returned list. Defaults to `0`.
|
||||
* `limit` - Maximum amount of rooms to return. Defaults to `100`.
|
||||
* `order_by` - The method in which to sort the returned list of rooms. Valid values are:
|
||||
- `alphabetical` - Rooms are ordered alphabetically by room name. This is the default.
|
||||
- `size` - Rooms are ordered by the number of members. Largest to smallest.
|
||||
* `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
|
||||
this value to `b` will reverse the above sort order. Defaults to `f`.
|
||||
* `search_term` - Filter rooms by their room name. Search term can be contained in any
|
||||
part of the room name. Defaults to no filtering.
|
||||
|
||||
The following fields are possible in the JSON response body:
|
||||
|
||||
* `rooms` - An array of objects, each containing information about a room.
|
||||
- Room objects contain the following fields:
|
||||
- `room_id` - The ID of the room.
|
||||
- `name` - The name of the room.
|
||||
- `canonical_alias` - The canonical (main) alias address of the room.
|
||||
- `joined_members` - How many users are currently in the room.
|
||||
* `offset` - The current pagination offset in rooms. This parameter should be
|
||||
used instead of `next_token` for room offset as `next_token` is
|
||||
not intended to be parsed.
|
||||
* `total_rooms` - The total number of rooms this query can return. Using this
|
||||
and `offset`, you have enough information to know the current
|
||||
progression through the list.
|
||||
* `next_batch` - If this field is present, we know that there are potentially
|
||||
more rooms on the server that did not all fit into this response.
|
||||
We can use `next_batch` to get the "next page" of results. To do
|
||||
so, simply repeat your request, setting the `from` parameter to
|
||||
the value of `next_batch`.
|
||||
* `prev_batch` - If this field is present, it is possible to paginate backwards.
|
||||
Use `prev_batch` for the `from` value in the next request to
|
||||
get the "previous page" of results.
|
||||
|
||||
## Usage
|
||||
|
||||
A standard request with no filtering:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/rooms
|
||||
|
||||
{}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```
|
||||
{
|
||||
"rooms": [
|
||||
{
|
||||
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
|
||||
"name": "Matrix HQ",
|
||||
"canonical_alias": "#matrix:matrix.org",
|
||||
"joined_members": 8326
|
||||
},
|
||||
... (8 hidden items) ...
|
||||
{
|
||||
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
|
||||
"name": "This Week In Matrix (TWIM)",
|
||||
"canonical_alias": "#twim:matrix.org",
|
||||
"joined_members": 314
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
"total_rooms": 10
|
||||
}
|
||||
```
|
||||
|
||||
Filtering by room name:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/rooms?search_term=TWIM
|
||||
|
||||
{}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```
|
||||
{
|
||||
"rooms": [
|
||||
{
|
||||
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
|
||||
"name": "This Week In Matrix (TWIM)",
|
||||
"canonical_alias": "#twim:matrix.org",
|
||||
"joined_members": 314
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
"total_rooms": 1
|
||||
}
|
||||
```
|
||||
|
||||
Paginating through a list of rooms:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/rooms?order_by=size
|
||||
|
||||
{}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```
|
||||
{
|
||||
"rooms": [
|
||||
{
|
||||
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
|
||||
"name": "Matrix HQ",
|
||||
"canonical_alias": "#matrix:matrix.org",
|
||||
"joined_members": 8326
|
||||
},
|
||||
... (98 hidden items) ...
|
||||
{
|
||||
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
|
||||
"name": "This Week In Matrix (TWIM)",
|
||||
"canonical_alias": "#twim:matrix.org",
|
||||
"joined_members": 314
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
"total_rooms": 150
|
||||
"next_token": 100
|
||||
}
|
||||
```
|
||||
|
||||
The presence of the `next_token` parameter tells us that there are more rooms
|
||||
than returned in this request, and we need to make another request to get them.
|
||||
To get the next batch of room results, we repeat our request, setting the `from`
|
||||
parameter to the value of `next_token`.
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/rooms?order_by=size&from=100
|
||||
|
||||
{}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```
|
||||
{
|
||||
"rooms": [
|
||||
{
|
||||
"room_id": "!mscvqgqpHYjBGDxNym:matrix.org",
|
||||
"name": "Music Theory",
|
||||
"canonical_alias": "#musictheory:matrix.org",
|
||||
"joined_members": 127
|
||||
},
|
||||
... (48 hidden items) ...
|
||||
{
|
||||
"room_id": "!twcBhHVdZlQWuuxBhN:termina.org.uk",
|
||||
"name": "weechat-matrix",
|
||||
"canonical_alias": "#weechat-matrix:termina.org.uk",
|
||||
"joined_members": 137
|
||||
}
|
||||
],
|
||||
"offset": 100,
|
||||
"prev_batch": 0,
|
||||
"total_rooms": 150
|
||||
}
|
||||
```
|
||||
|
||||
Once the `next_token` parameter is no longer present, we know we've reached the
|
||||
end of the list.
|
||||
@@ -254,6 +254,11 @@ and they key to invalidate. For example:
|
||||
|
||||
> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
|
||||
|
||||
Alternatively, an entire cache can be invalidated by sending down a `null`
|
||||
instead of the key. For example:
|
||||
|
||||
> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
|
||||
|
||||
However, there are times when a number of caches need to be invalidated
|
||||
at the same time with the same key. To reduce traffic we batch those
|
||||
invalidations into a single poke by defining a special cache name that
|
||||
|
||||
12
mypy.ini
12
mypy.ini
@@ -7,6 +7,9 @@ show_error_codes = True
|
||||
show_traceback = True
|
||||
mypy_path = stubs
|
||||
|
||||
[mypy-pymacaroons.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-zope]
|
||||
ignore_missing_imports = True
|
||||
|
||||
@@ -63,3 +66,12 @@ ignore_missing_imports = True
|
||||
|
||||
[mypy-sentry_sdk]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-PIL.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-lxml]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-jwt.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
@@ -22,10 +22,12 @@ import yaml
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
import synapse
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage import DataStore
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("update_database")
|
||||
|
||||
@@ -38,6 +40,8 @@ class MockHomeserver(HomeServer):
|
||||
config.server_name, reactor=reactor, config=config, **kwargs
|
||||
)
|
||||
|
||||
self.version_string = "Synapse/"+get_version_string(synapse)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
@@ -81,15 +85,17 @@ if __name__ == "__main__":
|
||||
hs.setup()
|
||||
store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run_background_updates():
|
||||
yield store.db.updates.run_background_updates(sleep=False)
|
||||
async def run_background_updates():
|
||||
await store.db.updates.run_background_updates(sleep=False)
|
||||
# Stop the reactor to exit the script once every background update is run.
|
||||
reactor.stop()
|
||||
|
||||
# Apply all background updates on the database.
|
||||
reactor.callWhenRunning(
|
||||
lambda: run_as_background_process("background_updates", run_background_updates)
|
||||
)
|
||||
def run():
|
||||
# Apply all background updates on the database.
|
||||
defer.ensureDeferred(
|
||||
run_as_background_process("background_updates", run_background_updates)
|
||||
)
|
||||
|
||||
reactor.callWhenRunning(run)
|
||||
|
||||
reactor.run()
|
||||
|
||||
@@ -52,7 +52,7 @@ if __name__ == "__main__":
|
||||
if "config" in args and args.config:
|
||||
config = yaml.safe_load(args.config)
|
||||
bcrypt_rounds = config.get("bcrypt_rounds", bcrypt_rounds)
|
||||
password_config = config.get("password_config", {})
|
||||
password_config = config.get("password_config", None) or {}
|
||||
password_pepper = password_config.get("pepper", password_pepper)
|
||||
password = args.password
|
||||
|
||||
|
||||
@@ -27,13 +27,16 @@ from six import string_types
|
||||
|
||||
import yaml
|
||||
|
||||
from twisted.enterprise import adbapi
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
import synapse
|
||||
from synapse.config.database import DatabaseConnectionConfig
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.storage._base import LoggingTransaction
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.deviceinbox import (
|
||||
DeviceInboxBackgroundUpdateStore,
|
||||
@@ -61,6 +64,7 @@ from synapse.storage.database import Database, make_conn
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.util import Clock
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse_port_db")
|
||||
|
||||
@@ -125,6 +129,13 @@ APPEND_ONLY_TABLES = [
|
||||
]
|
||||
|
||||
|
||||
# Error returned by the run function. Used at the top-level part of the script to
|
||||
# handle errors and return codes.
|
||||
end_error = None
|
||||
# The exec_info for the error, if any. If error is defined but not exec_info the script
|
||||
# will show only the error message without the stacktrace, if exec_info is defined but
|
||||
# not the error then the script will show nothing outside of what's printed in the run
|
||||
# function. If both are defined, the script will print both the error and the stacktrace.
|
||||
end_error_exec_info = None
|
||||
|
||||
|
||||
@@ -177,6 +188,7 @@ class MockHomeserver:
|
||||
self.clock = Clock(reactor)
|
||||
self.config = config
|
||||
self.hostname = config.server_name
|
||||
self.version_string = "Synapse/"+get_version_string(synapse)
|
||||
|
||||
def get_clock(self):
|
||||
return self.clock
|
||||
@@ -189,11 +201,10 @@ class Porter(object):
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setup_table(self, table):
|
||||
async def setup_table(self, table):
|
||||
if table in APPEND_ONLY_TABLES:
|
||||
# It's safe to just carry on inserting.
|
||||
row = yield self.postgres_store.db.simple_select_one(
|
||||
row = await self.postgres_store.db.simple_select_one(
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
retcols=("forward_rowid", "backward_rowid"),
|
||||
@@ -207,10 +218,10 @@ class Porter(object):
|
||||
forward_chunk,
|
||||
already_ported,
|
||||
total_to_port,
|
||||
) = yield self._setup_sent_transactions()
|
||||
) = await self._setup_sent_transactions()
|
||||
backward_chunk = 0
|
||||
else:
|
||||
yield self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={
|
||||
"table_name": table,
|
||||
@@ -227,7 +238,7 @@ class Porter(object):
|
||||
backward_chunk = row["backward_rowid"]
|
||||
|
||||
if total_to_port is None:
|
||||
already_ported, total_to_port = yield self._get_total_count_to_port(
|
||||
already_ported, total_to_port = await self._get_total_count_to_port(
|
||||
table, forward_chunk, backward_chunk
|
||||
)
|
||||
else:
|
||||
@@ -238,9 +249,9 @@ class Porter(object):
|
||||
)
|
||||
txn.execute("TRUNCATE %s CASCADE" % (table,))
|
||||
|
||||
yield self.postgres_store.execute(delete_all)
|
||||
await self.postgres_store.execute(delete_all)
|
||||
|
||||
yield self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
|
||||
)
|
||||
@@ -248,16 +259,13 @@ class Porter(object):
|
||||
forward_chunk = 1
|
||||
backward_chunk = 0
|
||||
|
||||
already_ported, total_to_port = yield self._get_total_count_to_port(
|
||||
already_ported, total_to_port = await self._get_total_count_to_port(
|
||||
table, forward_chunk, backward_chunk
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
(table, already_ported, total_to_port, forward_chunk, backward_chunk)
|
||||
)
|
||||
return table, already_ported, total_to_port, forward_chunk, backward_chunk
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_table(
|
||||
async def handle_table(
|
||||
self, table, postgres_size, table_size, forward_chunk, backward_chunk
|
||||
):
|
||||
logger.info(
|
||||
@@ -275,7 +283,7 @@ class Porter(object):
|
||||
self.progress.add_table(table, postgres_size, table_size)
|
||||
|
||||
if table == "event_search":
|
||||
yield self.handle_search_table(
|
||||
await self.handle_search_table(
|
||||
postgres_size, table_size, forward_chunk, backward_chunk
|
||||
)
|
||||
return
|
||||
@@ -294,7 +302,7 @@ class Porter(object):
|
||||
if table == "user_directory_stream_pos":
|
||||
# We need to make sure there is a single row, `(X, null), as that is
|
||||
# what synapse expects to be there.
|
||||
yield self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db.simple_insert(
|
||||
table=table, values={"stream_id": None}
|
||||
)
|
||||
self.progress.update(table, table_size) # Mark table as done
|
||||
@@ -335,7 +343,7 @@ class Porter(object):
|
||||
|
||||
return headers, forward_rows, backward_rows
|
||||
|
||||
headers, frows, brows = yield self.sqlite_store.db.runInteraction(
|
||||
headers, frows, brows = await self.sqlite_store.db.runInteraction(
|
||||
"select", r
|
||||
)
|
||||
|
||||
@@ -361,7 +369,7 @@ class Porter(object):
|
||||
},
|
||||
)
|
||||
|
||||
yield self.postgres_store.execute(insert)
|
||||
await self.postgres_store.execute(insert)
|
||||
|
||||
postgres_size += len(rows)
|
||||
|
||||
@@ -369,8 +377,7 @@ class Porter(object):
|
||||
else:
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_search_table(
|
||||
async def handle_search_table(
|
||||
self, postgres_size, table_size, forward_chunk, backward_chunk
|
||||
):
|
||||
select = (
|
||||
@@ -390,7 +397,7 @@ class Porter(object):
|
||||
|
||||
return headers, rows
|
||||
|
||||
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
|
||||
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
|
||||
|
||||
if rows:
|
||||
forward_chunk = rows[-1][0] + 1
|
||||
@@ -438,7 +445,7 @@ class Porter(object):
|
||||
},
|
||||
)
|
||||
|
||||
yield self.postgres_store.execute(insert)
|
||||
await self.postgres_store.execute(insert)
|
||||
|
||||
postgres_size += len(rows)
|
||||
|
||||
@@ -476,11 +483,10 @@ class Porter(object):
|
||||
|
||||
return store
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run_background_updates_on_postgres(self):
|
||||
async def run_background_updates_on_postgres(self):
|
||||
# Manually apply all background updates on the PostgreSQL database.
|
||||
postgres_ready = (
|
||||
yield self.postgres_store.db.updates.has_completed_background_updates()
|
||||
await self.postgres_store.db.updates.has_completed_background_updates()
|
||||
)
|
||||
|
||||
if not postgres_ready:
|
||||
@@ -489,13 +495,20 @@ class Porter(object):
|
||||
self.progress.set_state("Running background updates on PostgreSQL")
|
||||
|
||||
while not postgres_ready:
|
||||
yield self.postgres_store.db.updates.do_next_background_update(100)
|
||||
postgres_ready = yield (
|
||||
await self.postgres_store.db.updates.do_next_background_update(100)
|
||||
postgres_ready = await (
|
||||
self.postgres_store.db.updates.has_completed_background_updates()
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run(self):
|
||||
async def run(self):
|
||||
"""Ports the SQLite database to a PostgreSQL database.
|
||||
|
||||
When a fatal error is met, its message is assigned to the global "end_error"
|
||||
variable. When this error comes with a stacktrace, its exec_info is assigned to
|
||||
the global "end_error_exec_info" variable.
|
||||
"""
|
||||
global end_error
|
||||
|
||||
try:
|
||||
# we allow people to port away from outdated versions of sqlite.
|
||||
self.sqlite_store = self.build_db_store(
|
||||
@@ -505,21 +518,21 @@ class Porter(object):
|
||||
|
||||
# Check if all background updates are done, abort if not.
|
||||
updates_complete = (
|
||||
yield self.sqlite_store.db.updates.has_completed_background_updates()
|
||||
await self.sqlite_store.db.updates.has_completed_background_updates()
|
||||
)
|
||||
if not updates_complete:
|
||||
sys.stderr.write(
|
||||
end_error = (
|
||||
"Pending background updates exist in the SQLite3 database."
|
||||
" Please start Synapse again and wait until every update has finished"
|
||||
" before running this script.\n"
|
||||
)
|
||||
defer.returnValue(None)
|
||||
return
|
||||
|
||||
self.postgres_store = self.build_db_store(
|
||||
self.hs_config.get_single_database()
|
||||
)
|
||||
|
||||
yield self.run_background_updates_on_postgres()
|
||||
await self.run_background_updates_on_postgres()
|
||||
|
||||
self.progress.set_state("Creating port tables")
|
||||
|
||||
@@ -547,22 +560,22 @@ class Porter(object):
|
||||
)
|
||||
|
||||
try:
|
||||
yield self.postgres_store.db.runInteraction("alter_table", alter_table)
|
||||
await self.postgres_store.db.runInteraction("alter_table", alter_table)
|
||||
except Exception:
|
||||
# On Error Resume Next
|
||||
pass
|
||||
|
||||
yield self.postgres_store.db.runInteraction(
|
||||
await self.postgres_store.db.runInteraction(
|
||||
"create_port_table", create_port_table
|
||||
)
|
||||
|
||||
# Step 2. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
|
||||
sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
|
||||
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
|
||||
)
|
||||
|
||||
postgres_tables = yield self.postgres_store.db.simple_select_onecol(
|
||||
postgres_tables = await self.postgres_store.db.simple_select_onecol(
|
||||
table="information_schema.tables",
|
||||
keyvalues={},
|
||||
retcol="distinct table_name",
|
||||
@@ -573,28 +586,34 @@ class Porter(object):
|
||||
|
||||
# Step 3. Figure out what still needs copying
|
||||
self.progress.set_state("Checking on port progress")
|
||||
setup_res = yield defer.gatherResults(
|
||||
[
|
||||
self.setup_table(table)
|
||||
for table in tables
|
||||
if table not in ["schema_version", "applied_schema_deltas"]
|
||||
and not table.startswith("sqlite_")
|
||||
],
|
||||
consumeErrors=True,
|
||||
setup_res = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(self.setup_table, table)
|
||||
for table in tables
|
||||
if table not in ["schema_version", "applied_schema_deltas"]
|
||||
and not table.startswith("sqlite_")
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
)
|
||||
|
||||
# Step 4. Do the copying.
|
||||
self.progress.set_state("Copying to postgres")
|
||||
yield defer.gatherResults(
|
||||
[self.handle_table(*res) for res in setup_res], consumeErrors=True
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[run_in_background(self.handle_table, *res) for res in setup_res],
|
||||
consumeErrors=True,
|
||||
)
|
||||
)
|
||||
|
||||
# Step 5. Do final post-processing
|
||||
yield self._setup_state_group_id_seq()
|
||||
await self._setup_state_group_id_seq()
|
||||
|
||||
self.progress.done()
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
global end_error_exec_info
|
||||
end_error = e
|
||||
end_error_exec_info = sys.exc_info()
|
||||
logger.exception("")
|
||||
finally:
|
||||
@@ -634,8 +653,7 @@ class Porter(object):
|
||||
|
||||
return outrows
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _setup_sent_transactions(self):
|
||||
async def _setup_sent_transactions(self):
|
||||
# Only save things from the last day
|
||||
yesterday = int(time.time() * 1000) - 86400000
|
||||
|
||||
@@ -656,7 +674,7 @@ class Porter(object):
|
||||
|
||||
return headers, [r for r in rows if r[ts_ind] < yesterday]
|
||||
|
||||
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
|
||||
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
|
||||
|
||||
rows = self._convert_rows("sent_transactions", headers, rows)
|
||||
|
||||
@@ -669,7 +687,7 @@ class Porter(object):
|
||||
txn, "sent_transactions", headers[1:], rows
|
||||
)
|
||||
|
||||
yield self.postgres_store.execute(insert)
|
||||
await self.postgres_store.execute(insert)
|
||||
else:
|
||||
max_inserted_rowid = 0
|
||||
|
||||
@@ -686,10 +704,10 @@ class Porter(object):
|
||||
else:
|
||||
return 1
|
||||
|
||||
next_chunk = yield self.sqlite_store.execute(get_start_id)
|
||||
next_chunk = await self.sqlite_store.execute(get_start_id)
|
||||
next_chunk = max(max_inserted_rowid + 1, next_chunk)
|
||||
|
||||
yield self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={
|
||||
"table_name": "sent_transactions",
|
||||
@@ -705,46 +723,49 @@ class Porter(object):
|
||||
(size,) = txn.fetchone()
|
||||
return int(size)
|
||||
|
||||
remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
|
||||
remaining_count = await self.sqlite_store.execute(get_sent_table_size)
|
||||
|
||||
total_count = remaining_count + inserted_rows
|
||||
|
||||
defer.returnValue((next_chunk, inserted_rows, total_count))
|
||||
return next_chunk, inserted_rows, total_count
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
|
||||
frows = yield self.sqlite_store.execute_sql(
|
||||
async def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
|
||||
frows = await self.sqlite_store.execute_sql(
|
||||
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
|
||||
)
|
||||
|
||||
brows = yield self.sqlite_store.execute_sql(
|
||||
brows = await self.sqlite_store.execute_sql(
|
||||
"SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
|
||||
)
|
||||
|
||||
defer.returnValue(frows[0][0] + brows[0][0])
|
||||
return frows[0][0] + brows[0][0]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_already_ported_count(self, table):
|
||||
rows = yield self.postgres_store.execute_sql(
|
||||
async def _get_already_ported_count(self, table):
|
||||
rows = await self.postgres_store.execute_sql(
|
||||
"SELECT count(*) FROM %s" % (table,)
|
||||
)
|
||||
|
||||
defer.returnValue(rows[0][0])
|
||||
return rows[0][0]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
|
||||
remaining, done = yield defer.gatherResults(
|
||||
[
|
||||
self._get_remaining_count_to_port(table, forward_chunk, backward_chunk),
|
||||
self._get_already_ported_count(table),
|
||||
],
|
||||
consumeErrors=True,
|
||||
async def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
|
||||
remaining, done = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self._get_remaining_count_to_port,
|
||||
table,
|
||||
forward_chunk,
|
||||
backward_chunk,
|
||||
),
|
||||
run_in_background(self._get_already_ported_count, table),
|
||||
],
|
||||
)
|
||||
)
|
||||
|
||||
remaining = int(remaining) if remaining else 0
|
||||
done = int(done) if done else 0
|
||||
|
||||
defer.returnValue((done, remaining + done))
|
||||
return done, remaining + done
|
||||
|
||||
def _setup_state_group_id_seq(self):
|
||||
def r(txn):
|
||||
@@ -1010,7 +1031,12 @@ if __name__ == "__main__":
|
||||
hs_config=config,
|
||||
)
|
||||
|
||||
reactor.callWhenRunning(porter.run)
|
||||
@defer.inlineCallbacks
|
||||
def run():
|
||||
with LoggingContext("synapse_port_db_run"):
|
||||
yield defer.ensureDeferred(porter.run())
|
||||
|
||||
reactor.callWhenRunning(run)
|
||||
|
||||
reactor.run()
|
||||
|
||||
@@ -1019,7 +1045,11 @@ if __name__ == "__main__":
|
||||
else:
|
||||
start()
|
||||
|
||||
if end_error_exec_info:
|
||||
exc_type, exc_value, exc_traceback = end_error_exec_info
|
||||
traceback.print_exception(exc_type, exc_value, exc_traceback)
|
||||
if end_error:
|
||||
if end_error_exec_info:
|
||||
exc_type, exc_value, exc_traceback = end_error_exec_info
|
||||
traceback.print_exception(exc_type, exc_value, exc_traceback)
|
||||
|
||||
sys.stderr.write(end_error)
|
||||
|
||||
sys.exit(5)
|
||||
|
||||
@@ -36,7 +36,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.9.0.dev2"
|
||||
__version__ = "1.9.1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import List
|
||||
|
||||
from six import text_type
|
||||
|
||||
import jsonschema
|
||||
@@ -293,7 +295,7 @@ class Filter(object):
|
||||
room_id = None
|
||||
ev_type = "m.presence"
|
||||
contains_url = False
|
||||
labels = []
|
||||
labels = [] # type: List[str]
|
||||
else:
|
||||
sender = event.get("sender", None)
|
||||
if not sender:
|
||||
|
||||
@@ -12,7 +12,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
from collections import OrderedDict
|
||||
from typing import Any, Optional, Tuple
|
||||
|
||||
from synapse.api.errors import LimitExceededError
|
||||
|
||||
@@ -23,7 +24,9 @@ class Ratelimiter(object):
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.message_counts = collections.OrderedDict()
|
||||
self.message_counts = (
|
||||
OrderedDict()
|
||||
) # type: OrderedDict[Any, Tuple[float, int, Optional[float]]]
|
||||
|
||||
def can_do_action(self, key, time_now_s, rate_hz, burst_count, update=True):
|
||||
"""Can the entity (e.g. user or IP address) perform the action?
|
||||
|
||||
@@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
|
||||
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
|
||||
from synapse.rest.client.versions import VersionsRestServlet
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
@@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedClientIpStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
|
||||
RoomStateEventRestServlet,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
|
||||
SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
RoomStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
@@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedTransactionStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
||||
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
|
||||
from synapse.rest.client.v2_alpha import sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.presence import UserPresenceState
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -29,6 +29,7 @@ class AccountValidityConfig(Config):
|
||||
def __init__(self, config, synapse_config):
|
||||
if config is None:
|
||||
return
|
||||
super(AccountValidityConfig, self).__init__()
|
||||
self.enabled = config.get("enabled", False)
|
||||
self.renew_by_email_enabled = "renew_at" in config
|
||||
|
||||
@@ -93,7 +94,7 @@ class RegistrationConfig(Config):
|
||||
)
|
||||
|
||||
self.account_validity = AccountValidityConfig(
|
||||
config.get("account_validity", {}), config
|
||||
config.get("account_validity") or {}, config
|
||||
)
|
||||
|
||||
self.registrations_require_3pid = config.get("registrations_require_3pid", [])
|
||||
|
||||
@@ -294,6 +294,14 @@ class ServerConfig(Config):
|
||||
self.retention_default_min_lifetime = None
|
||||
self.retention_default_max_lifetime = None
|
||||
|
||||
if self.retention_enabled:
|
||||
logger.info(
|
||||
"Message retention policies support enabled with the following default"
|
||||
" policy: min_lifetime = %s ; max_lifetime = %s",
|
||||
self.retention_default_min_lifetime,
|
||||
self.retention_default_max_lifetime,
|
||||
)
|
||||
|
||||
self.retention_allowed_lifetime_min = retention_config.get(
|
||||
"allowed_lifetime_min"
|
||||
)
|
||||
|
||||
@@ -634,7 +634,7 @@ def get_public_keys(invite_event):
|
||||
return public_keys
|
||||
|
||||
|
||||
def auth_types_for_event(event) -> Set[Tuple[str]]:
|
||||
def auth_types_for_event(event) -> Set[Tuple[str, str]]:
|
||||
"""Given an event, return a list of (EventType, StateKey) that may be
|
||||
needed to auth the event. The returned list may be a superset of what
|
||||
would actually be required depending on the full state of the room.
|
||||
|
||||
@@ -88,6 +88,8 @@ class PaginationHandler(object):
|
||||
if hs.config.retention_enabled:
|
||||
# Run the purge jobs described in the configuration file.
|
||||
for job in hs.config.retention_purge_jobs:
|
||||
logger.info("Setting up purge job with config: %s", job)
|
||||
|
||||
self.clock.looping_call(
|
||||
run_as_background_process,
|
||||
job["interval"],
|
||||
@@ -130,11 +132,22 @@ class PaginationHandler(object):
|
||||
else:
|
||||
include_null = False
|
||||
|
||||
logger.info(
|
||||
"[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)",
|
||||
min_ms,
|
||||
max_ms,
|
||||
include_null,
|
||||
)
|
||||
|
||||
rooms = yield self.store.get_rooms_for_retention_period_in_range(
|
||||
min_ms, max_ms, include_null
|
||||
)
|
||||
|
||||
logger.debug("[purge] Rooms to purge: %s", rooms)
|
||||
|
||||
for room_id, retention_policy in iteritems(rooms):
|
||||
logger.info("[purge] Attempting to purge messages in room %s", room_id)
|
||||
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
logger.warning(
|
||||
"[purge] not purging room %s as there's an ongoing purge running"
|
||||
|
||||
@@ -66,11 +66,16 @@ class BaseSlavedStore(SQLBaseStore):
|
||||
self._cache_id_gen.advance(token)
|
||||
for row in rows:
|
||||
if row.cache_func == CURRENT_STATE_CACHE_NAME:
|
||||
if row.keys is None:
|
||||
raise Exception(
|
||||
"Can't send an 'invalidate all' for current state cache"
|
||||
)
|
||||
|
||||
room_id = row.keys[0]
|
||||
members_changed = set(row.keys[1:])
|
||||
self._invalidate_state_caches(room_id, members_changed)
|
||||
else:
|
||||
self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
|
||||
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
|
||||
|
||||
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
||||
txn.call_after(cache_func.invalidate, keys)
|
||||
|
||||
@@ -17,7 +17,9 @@
|
||||
import itertools
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import Any
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import attr
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -65,10 +67,24 @@ PushersStreamRow = namedtuple(
|
||||
"PushersStreamRow",
|
||||
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
|
||||
)
|
||||
CachesStreamRow = namedtuple(
|
||||
"CachesStreamRow",
|
||||
("cache_func", "keys", "invalidation_ts"), # str # list(str) # int
|
||||
)
|
||||
|
||||
|
||||
@attr.s
|
||||
class CachesStreamRow:
|
||||
"""Stream to inform workers they should invalidate their cache.
|
||||
|
||||
Attributes:
|
||||
cache_func: Name of the cached function.
|
||||
keys: The entry in the cache to invalidate. If None then will
|
||||
invalidate all.
|
||||
invalidation_ts: Timestamp of when the invalidation took place.
|
||||
"""
|
||||
|
||||
cache_func = attr.ib(type=str)
|
||||
keys = attr.ib(type=Optional[List[Any]])
|
||||
invalidation_ts = attr.ib(type=int)
|
||||
|
||||
|
||||
PublicRoomsStreamRow = namedtuple(
|
||||
"PublicRoomsStreamRow",
|
||||
(
|
||||
|
||||
@@ -29,7 +29,7 @@ from synapse.rest.admin._base import (
|
||||
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
|
||||
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
|
||||
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
|
||||
from synapse.rest.admin.rooms import ShutdownRoomRestServlet
|
||||
from synapse.rest.admin.rooms import ListRoomRestServlet, ShutdownRoomRestServlet
|
||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||
from synapse.rest.admin.users import (
|
||||
AccountValidityRenewServlet,
|
||||
@@ -188,6 +188,7 @@ def register_servlets(hs, http_server):
|
||||
Register all the admin servlets.
|
||||
"""
|
||||
register_servlets_for_client_rest_resource(hs, http_server)
|
||||
ListRoomRestServlet(hs).register(http_server)
|
||||
PurgeRoomServlet(hs).register(http_server)
|
||||
SendServerNoticeServlet(hs).register(http_server)
|
||||
VersionServlet(hs).register(http_server)
|
||||
|
||||
@@ -40,6 +40,21 @@ def historical_admin_path_patterns(path_regex):
|
||||
)
|
||||
|
||||
|
||||
def admin_patterns(path_regex: str):
|
||||
"""Returns the list of patterns for an admin endpoint
|
||||
|
||||
Args:
|
||||
path_regex: The regex string to match. This should NOT have a ^
|
||||
as this will be prefixed.
|
||||
|
||||
Returns:
|
||||
A list of regex patterns.
|
||||
"""
|
||||
admin_prefix = "^/_synapse/admin/v1"
|
||||
patterns = [re.compile(admin_prefix + path_regex)]
|
||||
return patterns
|
||||
|
||||
|
||||
async def assert_requester_is_admin(auth, request):
|
||||
"""Verify that the requester is an admin user
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ class QuarantineMediaInRoom(RestServlet):
|
||||
historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media/quarantine")
|
||||
+
|
||||
# This path kept around for legacy reasons
|
||||
historical_admin_path_patterns("/quarantine_media/(?P<room_id>![^/]+)")
|
||||
historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
|
||||
)
|
||||
|
||||
def __init__(self, hs):
|
||||
|
||||
@@ -15,15 +15,20 @@
|
||||
import logging
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
assert_params_in_dict,
|
||||
parse_integer,
|
||||
parse_json_object_from_request,
|
||||
parse_string,
|
||||
)
|
||||
from synapse.rest.admin._base import (
|
||||
admin_patterns,
|
||||
assert_user_is_admin,
|
||||
historical_admin_path_patterns,
|
||||
)
|
||||
from synapse.storage.data_stores.main.room import RoomSortOrder
|
||||
from synapse.types import create_requester
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
|
||||
@@ -155,3 +160,80 @@ class ShutdownRoomRestServlet(RestServlet):
|
||||
"new_room_id": new_room_id,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class ListRoomRestServlet(RestServlet):
|
||||
"""
|
||||
List all rooms that are known to the homeserver. Results are returned
|
||||
in a dictionary containing room information. Supports pagination.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms")
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.auth = hs.get_auth()
|
||||
self.admin_handler = hs.get_handlers().admin_handler
|
||||
|
||||
async def on_GET(self, request):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
# Extract query parameters
|
||||
start = parse_integer(request, "from", default=0)
|
||||
limit = parse_integer(request, "limit", default=100)
|
||||
order_by = parse_string(request, "order_by", default="alphabetical")
|
||||
if order_by not in (
|
||||
RoomSortOrder.ALPHABETICAL.value,
|
||||
RoomSortOrder.SIZE.value,
|
||||
):
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Unknown value for order_by: %s" % (order_by,),
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
search_term = parse_string(request, "search_term")
|
||||
if search_term == "":
|
||||
raise SynapseError(
|
||||
400,
|
||||
"search_term cannot be an empty string",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
direction = parse_string(request, "dir", default="f")
|
||||
if direction not in ("f", "b"):
|
||||
raise SynapseError(
|
||||
400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM
|
||||
)
|
||||
|
||||
reverse_order = True if direction == "b" else False
|
||||
|
||||
# Return list of rooms according to parameters
|
||||
rooms, total_rooms = await self.store.get_rooms_paginate(
|
||||
start, limit, order_by, reverse_order, search_term
|
||||
)
|
||||
response = {
|
||||
# next_token should be opaque, so return a value the client can parse
|
||||
"offset": start,
|
||||
"rooms": rooms,
|
||||
"total_rooms": total_rooms,
|
||||
}
|
||||
|
||||
# Are there more rooms to paginate through after this?
|
||||
if (start + limit) < total_rooms:
|
||||
# There are. Calculate where the query should start from next time
|
||||
# to get the next part of the list
|
||||
response["next_batch"] = start + limit
|
||||
|
||||
# Is it possible to paginate backwards? Check if we currently have an
|
||||
# offset
|
||||
if start > 0:
|
||||
if start > limit:
|
||||
# Going back one iteration won't take us to the start.
|
||||
# Calculate new offset
|
||||
response["prev_batch"] = start - limit
|
||||
else:
|
||||
response["prev_batch"] = 0
|
||||
|
||||
return 200, response
|
||||
|
||||
@@ -193,8 +193,8 @@ class UserRestServletV2(RestServlet):
|
||||
raise SynapseError(400, "Invalid password")
|
||||
else:
|
||||
new_password = body["password"]
|
||||
await self._set_password_handler.set_password(
|
||||
target_user, new_password, requester
|
||||
await self.set_password_handler.set_password(
|
||||
target_user.to_string(), new_password, requester
|
||||
)
|
||||
|
||||
if "deactivated" in body:
|
||||
@@ -338,21 +338,22 @@ class UserRegisterServlet(RestServlet):
|
||||
|
||||
got_mac = body["mac"]
|
||||
|
||||
want_mac = hmac.new(
|
||||
want_mac_builder = hmac.new(
|
||||
key=self.hs.config.registration_shared_secret.encode(),
|
||||
digestmod=hashlib.sha1,
|
||||
)
|
||||
want_mac.update(nonce.encode("utf8"))
|
||||
want_mac.update(b"\x00")
|
||||
want_mac.update(username)
|
||||
want_mac.update(b"\x00")
|
||||
want_mac.update(password)
|
||||
want_mac.update(b"\x00")
|
||||
want_mac.update(b"admin" if admin else b"notadmin")
|
||||
want_mac_builder.update(nonce.encode("utf8"))
|
||||
want_mac_builder.update(b"\x00")
|
||||
want_mac_builder.update(username)
|
||||
want_mac_builder.update(b"\x00")
|
||||
want_mac_builder.update(password)
|
||||
want_mac_builder.update(b"\x00")
|
||||
want_mac_builder.update(b"admin" if admin else b"notadmin")
|
||||
if user_type:
|
||||
want_mac.update(b"\x00")
|
||||
want_mac.update(user_type.encode("utf8"))
|
||||
want_mac = want_mac.hexdigest()
|
||||
want_mac_builder.update(b"\x00")
|
||||
want_mac_builder.update(user_type.encode("utf8"))
|
||||
|
||||
want_mac = want_mac_builder.hexdigest()
|
||||
|
||||
if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
|
||||
raise SynapseError(403, "HMAC incorrect")
|
||||
|
||||
@@ -70,7 +70,6 @@ class EventStreamRestServlet(RestServlet):
|
||||
return 200, {}
|
||||
|
||||
|
||||
# TODO: Unit test gets, with and without auth, with different kinds of events.
|
||||
class EventRestServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/events/(?P<event_id>[^/]*)$", v1=True)
|
||||
|
||||
@@ -78,6 +77,7 @@ class EventRestServlet(RestServlet):
|
||||
super(EventRestServlet, self).__init__()
|
||||
self.clock = hs.get_clock()
|
||||
self.event_handler = hs.get_event_handler()
|
||||
self.auth = hs.get_auth()
|
||||
self._event_serializer = hs.get_event_client_serializer()
|
||||
|
||||
async def on_GET(self, request, event_id):
|
||||
|
||||
@@ -514,7 +514,7 @@ class CasTicketServlet(RestServlet):
|
||||
if user is None:
|
||||
raise Exception("CAS response does not contain user")
|
||||
except Exception:
|
||||
logger.error("Error parsing CAS response", exc_info=1)
|
||||
logger.exception("Error parsing CAS response")
|
||||
raise LoginError(401, "Invalid CAS response", errcode=Codes.UNAUTHORIZED)
|
||||
if not success:
|
||||
raise LoginError(
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
@@ -207,7 +208,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
|
||||
requester, event_dict, txn_id=txn_id
|
||||
)
|
||||
|
||||
ret = {}
|
||||
ret = {} # type: dict
|
||||
if event:
|
||||
set_tag("event_id", event.event_id)
|
||||
ret = {"event_id": event.event_id}
|
||||
@@ -285,7 +286,7 @@ class JoinRoomAliasServlet(TransactionRestServlet):
|
||||
try:
|
||||
remote_room_hosts = [
|
||||
x.decode("ascii") for x in request.args[b"server_name"]
|
||||
]
|
||||
] # type: Optional[List[str]]
|
||||
except Exception:
|
||||
remote_room_hosts = None
|
||||
elif RoomAlias.is_valid(room_identifier):
|
||||
@@ -375,7 +376,7 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
server = parse_string(request, "server", default=None)
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
limit = int(content.get("limit", 100))
|
||||
limit = int(content.get("limit", 100)) # type: Optional[int]
|
||||
since_token = content.get("since", None)
|
||||
search_filter = content.get("filter", None)
|
||||
|
||||
@@ -504,11 +505,16 @@ class RoomMessageListRestServlet(RestServlet):
|
||||
filter_bytes = parse_string(request, b"filter", encoding=None)
|
||||
if filter_bytes:
|
||||
filter_json = urlparse.unquote(filter_bytes.decode("UTF-8"))
|
||||
event_filter = Filter(json.loads(filter_json))
|
||||
if event_filter.filter_json.get("event_format", "client") == "federation":
|
||||
event_filter = Filter(json.loads(filter_json)) # type: Optional[Filter]
|
||||
if (
|
||||
event_filter
|
||||
and event_filter.filter_json.get("event_format", "client")
|
||||
== "federation"
|
||||
):
|
||||
as_client_event = False
|
||||
else:
|
||||
event_filter = None
|
||||
|
||||
msgs = await self.pagination_handler.get_messages(
|
||||
room_id=room_id,
|
||||
requester=requester,
|
||||
@@ -611,7 +617,7 @@ class RoomEventContextServlet(RestServlet):
|
||||
filter_bytes = parse_string(request, "filter")
|
||||
if filter_bytes:
|
||||
filter_json = urlparse.unquote(filter_bytes)
|
||||
event_filter = Filter(json.loads(filter_json))
|
||||
event_filter = Filter(json.loads(filter_json)) # type: Optional[Filter]
|
||||
else:
|
||||
event_filter = None
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ def client_patterns(path_regex, releases=(0,), unstable=True, v1=False):
|
||||
|
||||
Args:
|
||||
path_regex (str): The regex string to match. This should NOT have a ^
|
||||
as this will be prefixed.
|
||||
as this will be prefixed.
|
||||
Returns:
|
||||
SRE_Pattern
|
||||
"""
|
||||
|
||||
@@ -21,6 +21,7 @@ from typing import List, Union
|
||||
from six import string_types
|
||||
|
||||
import synapse
|
||||
import synapse.api.auth
|
||||
import synapse.types
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import (
|
||||
@@ -405,7 +406,7 @@ class RegisterRestServlet(RestServlet):
|
||||
return ret
|
||||
elif kind != b"user":
|
||||
raise UnrecognizedRequestError(
|
||||
"Do not understand membership kind: %s" % (kind,)
|
||||
"Do not understand membership kind: %s" % (kind.decode("utf8"),)
|
||||
)
|
||||
|
||||
# we do basic sanity checks here because the auth layer will store these
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Tuple
|
||||
|
||||
from synapse.http import servlet
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
@@ -60,7 +61,7 @@ class SendToDeviceRestServlet(servlet.RestServlet):
|
||||
sender_user_id, message_type, content["messages"]
|
||||
)
|
||||
|
||||
response = (200, {})
|
||||
response = (200, {}) # type: Tuple[int, dict]
|
||||
return response
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Dict, Set
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
from signedjson.sign import sign_json
|
||||
@@ -103,7 +104,7 @@ class RemoteKey(DirectServeResource):
|
||||
async def _async_render_GET(self, request):
|
||||
if len(request.postpath) == 1:
|
||||
(server,) = request.postpath
|
||||
query = {server.decode("ascii"): {}}
|
||||
query = {server.decode("ascii"): {}} # type: dict
|
||||
elif len(request.postpath) == 2:
|
||||
server, key_id = request.postpath
|
||||
minimum_valid_until_ts = parse_integer(request, "minimum_valid_until_ts")
|
||||
@@ -148,7 +149,7 @@ class RemoteKey(DirectServeResource):
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
|
||||
cache_misses = dict()
|
||||
cache_misses = dict() # type: Dict[str, Set[str]]
|
||||
for (server_name, key_id, from_server), results in cached.items():
|
||||
results = [(result["ts_added_ms"], result) for result in results]
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import errno
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from typing import Dict, Tuple
|
||||
|
||||
from six import iteritems
|
||||
|
||||
@@ -605,7 +606,7 @@ class MediaRepository(object):
|
||||
|
||||
# We deduplicate the thumbnail sizes by ignoring the cropped versions if
|
||||
# they have the same dimensions of a scaled one.
|
||||
thumbnails = {}
|
||||
thumbnails = {} # type: Dict[Tuple[int, int, str], str]
|
||||
for r_width, r_height, r_method, r_type in requirements:
|
||||
if r_method == "crop":
|
||||
thumbnails.setdefault((r_width, r_height, r_type), r_method)
|
||||
|
||||
@@ -23,6 +23,7 @@ import re
|
||||
import shutil
|
||||
import sys
|
||||
import traceback
|
||||
from typing import Dict, Optional
|
||||
|
||||
import six
|
||||
from six import string_types
|
||||
@@ -237,8 +238,8 @@ class PreviewUrlResource(DirectServeResource):
|
||||
# If we don't find a match, we'll look at the HTTP Content-Type, and
|
||||
# if that doesn't exist, we'll fall back to UTF-8.
|
||||
if not encoding:
|
||||
match = _content_type_match.match(media_info["media_type"])
|
||||
encoding = match.group(1) if match else "utf-8"
|
||||
content_match = _content_type_match.match(media_info["media_type"])
|
||||
encoding = content_match.group(1) if content_match else "utf-8"
|
||||
|
||||
og = decode_and_calc_og(body, media_info["uri"], encoding)
|
||||
|
||||
@@ -518,7 +519,7 @@ def _calc_og(tree, media_uri):
|
||||
# "og:video:height" : "720",
|
||||
# "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
|
||||
|
||||
og = {}
|
||||
og = {} # type: Dict[str, Optional[str]]
|
||||
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
|
||||
if "content" in tag.attrib:
|
||||
# if we've got more than 50 tags, someone is taking the piss
|
||||
|
||||
@@ -296,8 +296,8 @@ class ThumbnailResource(DirectServeResource):
|
||||
d_h = desired_height
|
||||
|
||||
if desired_method.lower() == "crop":
|
||||
info_list = []
|
||||
info_list2 = []
|
||||
crop_info_list = []
|
||||
crop_info_list2 = []
|
||||
for info in thumbnail_infos:
|
||||
t_w = info["thumbnail_width"]
|
||||
t_h = info["thumbnail_height"]
|
||||
@@ -309,7 +309,7 @@ class ThumbnailResource(DirectServeResource):
|
||||
type_quality = desired_type != info["thumbnail_type"]
|
||||
length_quality = info["thumbnail_length"]
|
||||
if t_w >= d_w or t_h >= d_h:
|
||||
info_list.append(
|
||||
crop_info_list.append(
|
||||
(
|
||||
aspect_quality,
|
||||
min_quality,
|
||||
@@ -320,7 +320,7 @@ class ThumbnailResource(DirectServeResource):
|
||||
)
|
||||
)
|
||||
else:
|
||||
info_list2.append(
|
||||
crop_info_list2.append(
|
||||
(
|
||||
aspect_quality,
|
||||
min_quality,
|
||||
@@ -330,10 +330,10 @@ class ThumbnailResource(DirectServeResource):
|
||||
info,
|
||||
)
|
||||
)
|
||||
if info_list:
|
||||
return min(info_list)[-1]
|
||||
if crop_info_list:
|
||||
return min(crop_info_list)[-1]
|
||||
else:
|
||||
return min(info_list2)[-1]
|
||||
return min(crop_info_list2)[-1]
|
||||
else:
|
||||
info_list = []
|
||||
info_list2 = []
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
import logging
|
||||
import random
|
||||
from abc import ABCMeta
|
||||
from typing import Any, Optional
|
||||
|
||||
from six import PY2
|
||||
from six.moves import builtins
|
||||
@@ -26,7 +27,7 @@ from canonicaljson import json
|
||||
from synapse.storage.database import LoggingTransaction # noqa: F401
|
||||
from synapse.storage.database import make_in_list_sql_clause # noqa: F401
|
||||
from synapse.storage.database import Database
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.types import Collection, get_domain_from_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -63,17 +64,24 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(self, cache_name, key):
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
):
|
||||
"""Attempts to invalidate the cache of the given name, ignoring if the
|
||||
cache doesn't exist. Mainly used for invalidating caches on workers,
|
||||
where they may not have the cache.
|
||||
|
||||
Args:
|
||||
cache_name (str)
|
||||
key (tuple)
|
||||
cache_name
|
||||
key: Entry to invalidate. If None then invalidates the entire
|
||||
cache.
|
||||
"""
|
||||
|
||||
try:
|
||||
getattr(self, cache_name).invalidate(key)
|
||||
if key is None:
|
||||
getattr(self, cache_name).invalidate_all()
|
||||
else:
|
||||
getattr(self, cache_name).invalidate(tuple(key))
|
||||
except AttributeError:
|
||||
# We probably haven't pulled in the cache in this worker,
|
||||
# which is fine.
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import Any, Iterable, Optional
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -43,6 +44,14 @@ class CacheInvalidationStore(SQLBaseStore):
|
||||
txn.call_after(cache_func.invalidate, keys)
|
||||
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
|
||||
|
||||
def _invalidate_all_cache_and_stream(self, txn, cache_func):
|
||||
"""Invalidates the entire cache and adds it to the cache stream so slaves
|
||||
will know to invalidate their caches.
|
||||
"""
|
||||
|
||||
txn.call_after(cache_func.invalidate_all)
|
||||
self._send_invalidation_to_replication(txn, cache_func.__name__, None)
|
||||
|
||||
def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
|
||||
"""Special case invalidation of caches based on current state.
|
||||
|
||||
@@ -73,17 +82,24 @@ class CacheInvalidationStore(SQLBaseStore):
|
||||
txn, CURRENT_STATE_CACHE_NAME, [room_id]
|
||||
)
|
||||
|
||||
def _send_invalidation_to_replication(self, txn, cache_name, keys):
|
||||
def _send_invalidation_to_replication(
|
||||
self, txn, cache_name: str, keys: Optional[Iterable[Any]]
|
||||
):
|
||||
"""Notifies replication that given cache has been invalidated.
|
||||
|
||||
Note that this does *not* invalidate the cache locally.
|
||||
|
||||
Args:
|
||||
txn
|
||||
cache_name (str)
|
||||
keys (iterable[str])
|
||||
cache_name
|
||||
keys: Entry to invalidate. If None will invalidate all.
|
||||
"""
|
||||
|
||||
if cache_name == CURRENT_STATE_CACHE_NAME and keys is None:
|
||||
raise Exception(
|
||||
"Can't stream invalidate all with magic current state cache"
|
||||
)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# get_next() returns a context manager which is designed to wrap
|
||||
# the transaction. However, we want to only get an ID when we want
|
||||
@@ -95,13 +111,16 @@ class CacheInvalidationStore(SQLBaseStore):
|
||||
txn.call_after(ctx.__exit__, None, None, None)
|
||||
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
||||
|
||||
if keys is not None:
|
||||
keys = list(keys)
|
||||
|
||||
self.db.simple_insert_txn(
|
||||
txn,
|
||||
table="cache_invalidation_stream",
|
||||
values={
|
||||
"stream_id": stream_id,
|
||||
"cache_func": cache_name,
|
||||
"keys": list(keys),
|
||||
"keys": keys,
|
||||
"invalidation_ts": self.clock.time_msec(),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -19,6 +19,7 @@ import itertools
|
||||
import logging
|
||||
from collections import Counter as c_counter, OrderedDict, namedtuple
|
||||
from functools import wraps
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from six import iteritems, text_type
|
||||
from six.moves import range
|
||||
@@ -41,8 +42,9 @@ from synapse.storage._base import make_in_list_sql_clause
|
||||
from synapse.storage.data_stores.main.event_federation import EventFederationStore
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
|
||||
from synapse.storage.database import Database
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.storage.database import Database, LoggingTransaction
|
||||
from synapse.storage.persist_events import DeltaState
|
||||
from synapse.types import RoomStreamToken, StateMap, get_domain_from_id
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.iterutils import batch_iter
|
||||
@@ -148,30 +150,26 @@ class EventsStore(
|
||||
@defer.inlineCallbacks
|
||||
def _persist_events_and_state_updates(
|
||||
self,
|
||||
events_and_contexts,
|
||||
current_state_for_room,
|
||||
state_delta_for_room,
|
||||
new_forward_extremeties,
|
||||
backfilled=False,
|
||||
delete_existing=False,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
current_state_for_room: Dict[str, StateMap[str]],
|
||||
state_delta_for_room: Dict[str, DeltaState],
|
||||
new_forward_extremeties: Dict[str, List[str]],
|
||||
backfilled: bool = False,
|
||||
delete_existing: bool = False,
|
||||
):
|
||||
"""Persist a set of events alongside updates to the current state and
|
||||
forward extremities tables.
|
||||
|
||||
Args:
|
||||
events_and_contexts (list[(EventBase, EventContext)]):
|
||||
current_state_for_room (dict[str, dict]): Map from room_id to the
|
||||
current state of the room based on forward extremities
|
||||
state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
|
||||
of `(to_delete, to_insert)` where to_delete is a list
|
||||
of type/state keys to remove from current state, and to_insert
|
||||
is a map (type,key)->event_id giving the state delta in each
|
||||
room.
|
||||
new_forward_extremities (dict[str, list[str]]): Map from room_id
|
||||
to list of event IDs that are the new forward extremities of
|
||||
the room.
|
||||
backfilled (bool)
|
||||
delete_existing (bool):
|
||||
events_and_contexts:
|
||||
current_state_for_room: Map from room_id to the current state of
|
||||
the room based on forward extremities
|
||||
state_delta_for_room: Map from room_id to the delta to apply to
|
||||
room state
|
||||
new_forward_extremities: Map from room_id to list of event IDs
|
||||
that are the new forward extremities of the room.
|
||||
backfilled
|
||||
delete_existing
|
||||
|
||||
Returns:
|
||||
Deferred: resolves when the events have been persisted
|
||||
@@ -352,12 +350,12 @@ class EventsStore(
|
||||
@log_function
|
||||
def _persist_events_txn(
|
||||
self,
|
||||
txn,
|
||||
events_and_contexts,
|
||||
backfilled,
|
||||
delete_existing=False,
|
||||
state_delta_for_room={},
|
||||
new_forward_extremeties={},
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool,
|
||||
delete_existing: bool = False,
|
||||
state_delta_for_room: Dict[str, DeltaState] = {},
|
||||
new_forward_extremeties: Dict[str, List[str]] = {},
|
||||
):
|
||||
"""Insert some number of room events into the necessary database tables.
|
||||
|
||||
@@ -366,21 +364,16 @@ class EventsStore(
|
||||
whether the event was rejected.
|
||||
|
||||
Args:
|
||||
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||
events_and_contexts (list[(EventBase, EventContext)]):
|
||||
events to persist
|
||||
backfilled (bool): True if the events were backfilled
|
||||
delete_existing (bool): True to purge existing table rows for the
|
||||
events from the database. This is useful when retrying due to
|
||||
txn
|
||||
events_and_contexts: events to persist
|
||||
backfilled: True if the events were backfilled
|
||||
delete_existing True to purge existing table rows for the events
|
||||
from the database. This is useful when retrying due to
|
||||
IntegrityError.
|
||||
state_delta_for_room (dict[str, (list, dict)]):
|
||||
The current-state delta for each room. For each room, a tuple
|
||||
(to_delete, to_insert), being a list of type/state keys to be
|
||||
removed from the current state, and a state set to be added to
|
||||
the current state.
|
||||
new_forward_extremeties (dict[str, list[str]]):
|
||||
The new forward extremities for each room. For each room, a
|
||||
list of the event ids which are the forward extremities.
|
||||
state_delta_for_room: The current-state delta for each room.
|
||||
new_forward_extremetie: The new forward extremities for each room.
|
||||
For each room, a list of the event ids which are the forward
|
||||
extremities.
|
||||
|
||||
"""
|
||||
all_events_and_contexts = events_and_contexts
|
||||
@@ -465,9 +458,15 @@ class EventsStore(
|
||||
# room_memberships, where applicable.
|
||||
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
|
||||
|
||||
def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
|
||||
for room_id, current_state_tuple in iteritems(state_delta_by_room):
|
||||
to_delete, to_insert = current_state_tuple
|
||||
def _update_current_state_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
state_delta_by_room: Dict[str, DeltaState],
|
||||
stream_id: int,
|
||||
):
|
||||
for room_id, delta_state in iteritems(state_delta_by_room):
|
||||
to_delete = delta_state.to_delete
|
||||
to_insert = delta_state.to_insert
|
||||
|
||||
# First we add entries to the current_state_delta_stream. We
|
||||
# do this before updating the current_state_events table so
|
||||
|
||||
@@ -27,12 +27,76 @@ logger = logging.getLogger(__name__)
|
||||
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
|
||||
|
||||
|
||||
class MonthlyActiveUsersStore(SQLBaseStore):
|
||||
class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
|
||||
super(MonthlyActiveUsersWorkerStore, self).__init__(database, db_conn, hs)
|
||||
self._clock = hs.get_clock()
|
||||
self.hs = hs
|
||||
|
||||
@cached(num_args=0)
|
||||
def get_monthly_active_count(self):
|
||||
"""Generates current count of monthly active users
|
||||
|
||||
Returns:
|
||||
Defered[int]: Number of current monthly active users
|
||||
"""
|
||||
|
||||
def _count_users(txn):
|
||||
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
|
||||
|
||||
txn.execute(sql)
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
return self.db.runInteraction("count_users", _count_users)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_registered_reserved_users(self):
|
||||
"""Of the reserved threepids defined in config, which are associated
|
||||
with registered users?
|
||||
|
||||
Returns:
|
||||
Defered[list]: Real reserved users
|
||||
"""
|
||||
users = []
|
||||
|
||||
for tp in self.hs.config.mau_limits_reserved_threepids[
|
||||
: self.hs.config.max_mau_value
|
||||
]:
|
||||
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
|
||||
tp["medium"], tp["address"]
|
||||
)
|
||||
if user_id:
|
||||
users.append(user_id)
|
||||
|
||||
return users
|
||||
|
||||
@cached(num_args=1)
|
||||
def user_last_seen_monthly_active(self, user_id):
|
||||
"""
|
||||
Checks if a given user is part of the monthly active user group
|
||||
Arguments:
|
||||
user_id (str): user to add/update
|
||||
Return:
|
||||
Deferred[int] : timestamp since last seen, None if never seen
|
||||
|
||||
"""
|
||||
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="monthly_active_users",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcol="timestamp",
|
||||
allow_none=True,
|
||||
desc="user_last_seen_monthly_active",
|
||||
)
|
||||
|
||||
|
||||
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
# Do not add more reserved users than the total allowable number
|
||||
# cur = LoggingTransaction(
|
||||
self.db.new_transaction(
|
||||
db_conn,
|
||||
"initialise_mau_threepids",
|
||||
@@ -57,7 +121,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||
if user_id:
|
||||
is_support = self.is_support_user_txn(txn, user_id)
|
||||
if not is_support:
|
||||
self.upsert_monthly_active_user_txn(txn, user_id)
|
||||
# We do this manually here to avoid hitting #6791
|
||||
self.db.simple_upsert_txn(
|
||||
txn,
|
||||
table="monthly_active_users",
|
||||
keyvalues={"user_id": user_id},
|
||||
values={"timestamp": int(self._clock.time_msec())},
|
||||
)
|
||||
else:
|
||||
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
||||
|
||||
@@ -146,57 +216,22 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
# It seems poor to invalidate the whole cache, Postgres supports
|
||||
# 'Returning' which would allow me to invalidate only the
|
||||
# specific users, but sqlite has no way to do this and instead
|
||||
# I would need to SELECT and the DELETE which without locking
|
||||
# is racy.
|
||||
# Have resolved to invalidate the whole cache for now and do
|
||||
# something about it if and when the perf becomes significant
|
||||
self._invalidate_all_cache_and_stream(
|
||||
txn, self.user_last_seen_monthly_active
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
|
||||
|
||||
reserved_users = yield self.get_registered_reserved_users()
|
||||
yield self.db.runInteraction(
|
||||
"reap_monthly_active_users", _reap_users, reserved_users
|
||||
)
|
||||
# It seems poor to invalidate the whole cache, Postgres supports
|
||||
# 'Returning' which would allow me to invalidate only the
|
||||
# specific users, but sqlite has no way to do this and instead
|
||||
# I would need to SELECT and the DELETE which without locking
|
||||
# is racy.
|
||||
# Have resolved to invalidate the whole cache for now and do
|
||||
# something about it if and when the perf becomes significant
|
||||
self.user_last_seen_monthly_active.invalidate_all()
|
||||
self.get_monthly_active_count.invalidate_all()
|
||||
|
||||
@cached(num_args=0)
|
||||
def get_monthly_active_count(self):
|
||||
"""Generates current count of monthly active users
|
||||
|
||||
Returns:
|
||||
Defered[int]: Number of current monthly active users
|
||||
"""
|
||||
|
||||
def _count_users(txn):
|
||||
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
|
||||
|
||||
txn.execute(sql)
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
return self.db.runInteraction("count_users", _count_users)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_registered_reserved_users(self):
|
||||
"""Of the reserved threepids defined in config, which are associated
|
||||
with registered users?
|
||||
|
||||
Returns:
|
||||
Defered[list]: Real reserved users
|
||||
"""
|
||||
users = []
|
||||
|
||||
for tp in self.hs.config.mau_limits_reserved_threepids[
|
||||
: self.hs.config.max_mau_value
|
||||
]:
|
||||
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
|
||||
tp["medium"], tp["address"]
|
||||
)
|
||||
if user_id:
|
||||
users.append(user_id)
|
||||
|
||||
return users
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def upsert_monthly_active_user(self, user_id):
|
||||
@@ -222,23 +257,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
|
||||
)
|
||||
|
||||
user_in_mau = self.user_last_seen_monthly_active.cache.get(
|
||||
(user_id,), None, update_metrics=False
|
||||
)
|
||||
if user_in_mau is None:
|
||||
self.get_monthly_active_count.invalidate(())
|
||||
|
||||
self.user_last_seen_monthly_active.invalidate((user_id,))
|
||||
|
||||
def upsert_monthly_active_user_txn(self, txn, user_id):
|
||||
"""Updates or inserts monthly active user member
|
||||
|
||||
Note that, after calling this method, it will generally be necessary
|
||||
to invalidate the caches on user_last_seen_monthly_active and
|
||||
get_monthly_active_count. We can't do that here, because we are running
|
||||
in a database thread rather than the main thread, and we can't call
|
||||
txn.call_after because txn may not be a LoggingTransaction.
|
||||
|
||||
We consciously do not call is_support_txn from this method because it
|
||||
is not possible to cache the response. is_support_txn will be false in
|
||||
almost all cases, so it seems reasonable to call it only for
|
||||
@@ -269,27 +290,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||
values={"timestamp": int(self._clock.time_msec())},
|
||||
)
|
||||
|
||||
return is_insert
|
||||
|
||||
@cached(num_args=1)
|
||||
def user_last_seen_monthly_active(self, user_id):
|
||||
"""
|
||||
Checks if a given user is part of the monthly active user group
|
||||
Arguments:
|
||||
user_id (str): user to add/update
|
||||
Return:
|
||||
Deferred[int] : timestamp since last seen, None if never seen
|
||||
|
||||
"""
|
||||
|
||||
return self.db.simple_select_one_onecol(
|
||||
table="monthly_active_users",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcol="timestamp",
|
||||
allow_none=True,
|
||||
desc="user_last_seen_monthly_active",
|
||||
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.user_last_seen_monthly_active, (user_id,)
|
||||
)
|
||||
|
||||
return is_insert
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def populate_monthly_active_users(self, user_id):
|
||||
"""Checks on the state of monthly active user limits and optionally
|
||||
|
||||
@@ -18,7 +18,8 @@ import collections
|
||||
import logging
|
||||
import re
|
||||
from abc import abstractmethod
|
||||
from typing import List, Optional, Tuple
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from six import integer_types
|
||||
|
||||
@@ -46,6 +47,18 @@ RatelimitOverride = collections.namedtuple(
|
||||
)
|
||||
|
||||
|
||||
class RoomSortOrder(Enum):
|
||||
"""
|
||||
Enum to define the sorting method used when returning rooms with get_rooms_paginate
|
||||
|
||||
ALPHABETICAL = sort rooms alphabetically by name
|
||||
SIZE = sort rooms by membership size, highest to lowest
|
||||
"""
|
||||
|
||||
ALPHABETICAL = "alphabetical"
|
||||
SIZE = "size"
|
||||
|
||||
|
||||
class RoomWorkerStore(SQLBaseStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
super(RoomWorkerStore, self).__init__(database, db_conn, hs)
|
||||
@@ -281,6 +294,116 @@ class RoomWorkerStore(SQLBaseStore):
|
||||
desc="is_room_blocked",
|
||||
)
|
||||
|
||||
async def get_rooms_paginate(
|
||||
self,
|
||||
start: int,
|
||||
limit: int,
|
||||
order_by: RoomSortOrder,
|
||||
reverse_order: bool,
|
||||
search_term: Optional[str],
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
"""Function to retrieve a paginated list of rooms as json.
|
||||
|
||||
Args:
|
||||
start: offset in the list
|
||||
limit: maximum amount of rooms to retrieve
|
||||
order_by: the sort order of the returned list
|
||||
reverse_order: whether to reverse the room list
|
||||
search_term: a string to filter room names by
|
||||
Returns:
|
||||
A list of room dicts and an integer representing the total number of
|
||||
rooms that exist given this query
|
||||
"""
|
||||
# Filter room names by a string
|
||||
where_statement = ""
|
||||
if search_term:
|
||||
where_statement = "WHERE state.name LIKE ?"
|
||||
|
||||
# Our postgres db driver converts ? -> %s in SQL strings as that's the
|
||||
# placeholder for postgres.
|
||||
# HOWEVER, if you put a % into your SQL then everything goes wibbly.
|
||||
# To get around this, we're going to surround search_term with %'s
|
||||
# before giving it to the database in python instead
|
||||
search_term = "%" + search_term + "%"
|
||||
|
||||
# Set ordering
|
||||
if RoomSortOrder(order_by) == RoomSortOrder.SIZE:
|
||||
order_by_column = "curr.joined_members"
|
||||
order_by_asc = False
|
||||
elif RoomSortOrder(order_by) == RoomSortOrder.ALPHABETICAL:
|
||||
# Sort alphabetically
|
||||
order_by_column = "state.name"
|
||||
order_by_asc = True
|
||||
else:
|
||||
raise StoreError(
|
||||
500, "Incorrect value for order_by provided: %s" % order_by
|
||||
)
|
||||
|
||||
# Whether to return the list in reverse order
|
||||
if reverse_order:
|
||||
# Flip the boolean
|
||||
order_by_asc = not order_by_asc
|
||||
|
||||
# Create one query for getting the limited number of events that the user asked
|
||||
# for, and another query for getting the total number of events that could be
|
||||
# returned. Thus allowing us to see if there are more events to paginate through
|
||||
info_sql = """
|
||||
SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members
|
||||
FROM room_stats_state state
|
||||
INNER JOIN room_stats_current curr USING (room_id)
|
||||
%s
|
||||
ORDER BY %s %s
|
||||
LIMIT ?
|
||||
OFFSET ?
|
||||
""" % (
|
||||
where_statement,
|
||||
order_by_column,
|
||||
"ASC" if order_by_asc else "DESC",
|
||||
)
|
||||
|
||||
# Use a nested SELECT statement as SQL can't count(*) with an OFFSET
|
||||
count_sql = """
|
||||
SELECT count(*) FROM (
|
||||
SELECT room_id FROM room_stats_state state
|
||||
%s
|
||||
) AS get_room_ids
|
||||
""" % (
|
||||
where_statement,
|
||||
)
|
||||
|
||||
def _get_rooms_paginate_txn(txn):
|
||||
# Execute the data query
|
||||
sql_values = (limit, start)
|
||||
if search_term:
|
||||
# Add the search term into the WHERE clause
|
||||
sql_values = (search_term,) + sql_values
|
||||
txn.execute(info_sql, sql_values)
|
||||
|
||||
# Refactor room query data into a structured dictionary
|
||||
rooms = []
|
||||
for room in txn:
|
||||
rooms.append(
|
||||
{
|
||||
"room_id": room[0],
|
||||
"name": room[1],
|
||||
"canonical_alias": room[2],
|
||||
"joined_members": room[3],
|
||||
}
|
||||
)
|
||||
|
||||
# Execute the count query
|
||||
|
||||
# Add the search term into the WHERE clause if present
|
||||
sql_values = (search_term,) if search_term else ()
|
||||
txn.execute(count_sql, sql_values)
|
||||
|
||||
room_count = txn.fetchone()
|
||||
return rooms, room_count[0]
|
||||
|
||||
return await self.db.runInteraction(
|
||||
"get_rooms_paginate", _get_rooms_paginate_txn,
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(max_entries=10000)
|
||||
def get_ratelimit_for_user(self, user_id):
|
||||
"""Check if there are any overrides for ratelimiting for the given
|
||||
|
||||
@@ -17,19 +17,24 @@
|
||||
|
||||
import logging
|
||||
from collections import deque, namedtuple
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
from six import iteritems
|
||||
from six.moves import range
|
||||
|
||||
import attr
|
||||
from prometheus_client import Counter, Histogram
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.data_stores import DataStores
|
||||
from synapse.types import StateMap
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -67,6 +72,19 @@ stale_forward_extremities_counter = Histogram(
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class DeltaState:
|
||||
"""Deltas to use to update the `current_state_events` table.
|
||||
|
||||
Attributes:
|
||||
to_delete: List of type/state_keys to delete from current state
|
||||
to_insert: Map of state to upsert into current state
|
||||
"""
|
||||
|
||||
to_delete = attr.ib(type=List[Tuple[str, str]])
|
||||
to_insert = attr.ib(type=StateMap[str])
|
||||
|
||||
|
||||
class _EventPeristenceQueue(object):
|
||||
"""Queues up events so that they can be persisted in bulk with only one
|
||||
concurrent transaction per room.
|
||||
@@ -138,13 +156,12 @@ class _EventPeristenceQueue(object):
|
||||
|
||||
self._currently_persisting_rooms.add(room_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_queue_loop():
|
||||
async def handle_queue_loop():
|
||||
try:
|
||||
queue = self._get_drainining_queue(room_id)
|
||||
for item in queue:
|
||||
try:
|
||||
ret = yield per_item_callback(item)
|
||||
ret = await per_item_callback(item)
|
||||
except Exception:
|
||||
with PreserveLoggingContext():
|
||||
item.deferred.errback()
|
||||
@@ -191,12 +208,16 @@ class EventsPersistenceStorage(object):
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def persist_events(self, events_and_contexts, backfilled=False):
|
||||
def persist_events(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
|
||||
backfilled: bool = False,
|
||||
):
|
||||
"""
|
||||
Write events to the database
|
||||
Args:
|
||||
events_and_contexts: list of tuples of (event, context)
|
||||
backfilled (bool): Whether the results are retrieved from federation
|
||||
backfilled: Whether the results are retrieved from federation
|
||||
via backfill or not. Used to determine if they're "new" events
|
||||
which might update the current state etc.
|
||||
|
||||
@@ -226,16 +247,12 @@ class EventsPersistenceStorage(object):
|
||||
return max_persisted_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def persist_event(self, event, context, backfilled=False):
|
||||
def persist_event(
|
||||
self, event: FrozenEvent, context: EventContext, backfilled: bool = False
|
||||
):
|
||||
"""
|
||||
|
||||
Args:
|
||||
event (EventBase):
|
||||
context (EventContext):
|
||||
backfilled (bool):
|
||||
|
||||
Returns:
|
||||
Deferred: resolves to (int, int): the stream ordering of ``event``,
|
||||
Deferred[Tuple[int, int]]: the stream ordering of ``event``,
|
||||
and the stream ordering of the latest persisted event
|
||||
"""
|
||||
deferred = self._event_persist_queue.add_to_queue(
|
||||
@@ -249,28 +266,22 @@ class EventsPersistenceStorage(object):
|
||||
max_persisted_id = yield self.main_store.get_current_events_token()
|
||||
return (event.internal_metadata.stream_ordering, max_persisted_id)
|
||||
|
||||
def _maybe_start_persisting(self, room_id):
|
||||
@defer.inlineCallbacks
|
||||
def persisting_queue(item):
|
||||
def _maybe_start_persisting(self, room_id: str):
|
||||
async def persisting_queue(item):
|
||||
with Measure(self._clock, "persist_events"):
|
||||
yield self._persist_events(
|
||||
await self._persist_events(
|
||||
item.events_and_contexts, backfilled=item.backfilled
|
||||
)
|
||||
|
||||
self._event_persist_queue.handle_queue(room_id, persisting_queue)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _persist_events(self, events_and_contexts, backfilled=False):
|
||||
async def _persist_events(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[FrozenEvent, EventContext]],
|
||||
backfilled: bool = False,
|
||||
):
|
||||
"""Calculates the change to current state and forward extremities, and
|
||||
persists the given events and with those updates.
|
||||
|
||||
Args:
|
||||
events_and_contexts (list[(EventBase, EventContext)]):
|
||||
backfilled (bool):
|
||||
delete_existing (bool):
|
||||
|
||||
Returns:
|
||||
Deferred: resolves when the events have been persisted
|
||||
"""
|
||||
if not events_and_contexts:
|
||||
return
|
||||
@@ -315,10 +326,10 @@ class EventsPersistenceStorage(object):
|
||||
)
|
||||
|
||||
for room_id, ev_ctx_rm in iteritems(events_by_room):
|
||||
latest_event_ids = yield self.main_store.get_latest_event_ids_in_room(
|
||||
latest_event_ids = await self.main_store.get_latest_event_ids_in_room(
|
||||
room_id
|
||||
)
|
||||
new_latest_event_ids = yield self._calculate_new_extremities(
|
||||
new_latest_event_ids = await self._calculate_new_extremities(
|
||||
room_id, ev_ctx_rm, latest_event_ids
|
||||
)
|
||||
|
||||
@@ -374,7 +385,7 @@ class EventsPersistenceStorage(object):
|
||||
with Measure(
|
||||
self._clock, "persist_events.get_new_state_after_events"
|
||||
):
|
||||
res = yield self._get_new_state_after_events(
|
||||
res = await self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm,
|
||||
latest_event_ids,
|
||||
@@ -389,12 +400,12 @@ class EventsPersistenceStorage(object):
|
||||
# If there is a delta we know that we've
|
||||
# only added or replaced state, never
|
||||
# removed keys entirely.
|
||||
state_delta_for_room[room_id] = ([], delta_ids)
|
||||
state_delta_for_room[room_id] = DeltaState([], delta_ids)
|
||||
elif current_state is not None:
|
||||
with Measure(
|
||||
self._clock, "persist_events.calculate_state_delta"
|
||||
):
|
||||
delta = yield self._calculate_state_delta(
|
||||
delta = await self._calculate_state_delta(
|
||||
room_id, current_state
|
||||
)
|
||||
state_delta_for_room[room_id] = delta
|
||||
@@ -404,7 +415,7 @@ class EventsPersistenceStorage(object):
|
||||
if current_state is not None:
|
||||
current_state_for_room[room_id] = current_state
|
||||
|
||||
yield self.main_store._persist_events_and_state_updates(
|
||||
await self.main_store._persist_events_and_state_updates(
|
||||
chunk,
|
||||
current_state_for_room=current_state_for_room,
|
||||
state_delta_for_room=state_delta_for_room,
|
||||
@@ -412,8 +423,12 @@ class EventsPersistenceStorage(object):
|
||||
backfilled=backfilled,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
|
||||
async def _calculate_new_extremities(
|
||||
self,
|
||||
room_id: str,
|
||||
event_contexts: List[Tuple[FrozenEvent, EventContext]],
|
||||
latest_event_ids: List[str],
|
||||
):
|
||||
"""Calculates the new forward extremities for a room given events to
|
||||
persist.
|
||||
|
||||
@@ -444,13 +459,13 @@ class EventsPersistenceStorage(object):
|
||||
)
|
||||
|
||||
# Remove any events which are prev_events of any existing events.
|
||||
existing_prevs = yield self.main_store._get_events_which_are_prevs(result)
|
||||
existing_prevs = await self.main_store._get_events_which_are_prevs(result)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
# Finally handle the case where the new events have soft-failed prev
|
||||
# events. If they do we need to remove them and their prev events,
|
||||
# otherwise we end up with dangling extremities.
|
||||
existing_prevs = yield self.main_store._get_prevs_before_rejected(
|
||||
existing_prevs = await self.main_store._get_prevs_before_rejected(
|
||||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
result.difference_update(existing_prevs)
|
||||
@@ -464,10 +479,13 @@ class EventsPersistenceStorage(object):
|
||||
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(
|
||||
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
|
||||
):
|
||||
async def _get_new_state_after_events(
|
||||
self,
|
||||
room_id: str,
|
||||
events_context: List[Tuple[FrozenEvent, EventContext]],
|
||||
old_latest_event_ids: Iterable[str],
|
||||
new_latest_event_ids: Iterable[str],
|
||||
) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]:
|
||||
"""Calculate the current state dict after adding some new events to
|
||||
a room
|
||||
|
||||
@@ -485,7 +503,6 @@ class EventsPersistenceStorage(object):
|
||||
the new forward extremities for the room.
|
||||
|
||||
Returns:
|
||||
Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
|
||||
Returns a tuple of two state maps, the first being the full new current
|
||||
state and the second being the delta to the existing current state.
|
||||
If both are None then there has been no change.
|
||||
@@ -547,7 +564,7 @@ class EventsPersistenceStorage(object):
|
||||
|
||||
if missing_event_ids:
|
||||
# Now pull out the state groups for any missing events from DB
|
||||
event_to_groups = yield self.main_store._get_state_group_for_events(
|
||||
event_to_groups = await self.main_store._get_state_group_for_events(
|
||||
missing_event_ids
|
||||
)
|
||||
event_id_to_state_group.update(event_to_groups)
|
||||
@@ -588,7 +605,7 @@ class EventsPersistenceStorage(object):
|
||||
# their state IDs so we can resolve to a single state set.
|
||||
missing_state = new_state_groups - set(state_groups_map)
|
||||
if missing_state:
|
||||
group_to_state = yield self.state_store._get_state_for_groups(missing_state)
|
||||
group_to_state = await self.state_store._get_state_for_groups(missing_state)
|
||||
state_groups_map.update(group_to_state)
|
||||
|
||||
if len(new_state_groups) == 1:
|
||||
@@ -612,10 +629,10 @@ class EventsPersistenceStorage(object):
|
||||
break
|
||||
|
||||
if not room_version:
|
||||
room_version = yield self.main_store.get_room_version(room_id)
|
||||
room_version = await self.main_store.get_room_version(room_id)
|
||||
|
||||
logger.debug("calling resolve_state_groups from preserve_events")
|
||||
res = yield self._state_resolution_handler.resolve_state_groups(
|
||||
res = await self._state_resolution_handler.resolve_state_groups(
|
||||
room_id,
|
||||
room_version,
|
||||
state_groups,
|
||||
@@ -625,18 +642,14 @@ class EventsPersistenceStorage(object):
|
||||
|
||||
return res.state, None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_state_delta(self, room_id, current_state):
|
||||
async def _calculate_state_delta(
|
||||
self, room_id: str, current_state: StateMap[str]
|
||||
) -> DeltaState:
|
||||
"""Calculate the new state deltas for a room.
|
||||
|
||||
Assumes that we are only persisting events for one room at a time.
|
||||
|
||||
Returns:
|
||||
tuple[list, dict] (to_delete, to_insert): where to_delete are the
|
||||
type/state_keys to remove from current_state_events and `to_insert`
|
||||
are the updates to current_state_events.
|
||||
"""
|
||||
existing_state = yield self.main_store.get_current_state_ids(room_id)
|
||||
existing_state = await self.main_store.get_current_state_ids(room_id)
|
||||
|
||||
to_delete = [key for key in existing_state if key not in current_state]
|
||||
|
||||
@@ -646,4 +659,4 @@ class EventsPersistenceStorage(object):
|
||||
if ev_id != existing_state.get(key)
|
||||
}
|
||||
|
||||
return to_delete, to_insert
|
||||
return DeltaState(to_delete=to_delete, to_insert=to_insert)
|
||||
|
||||
@@ -17,6 +17,7 @@ import json
|
||||
import os
|
||||
import urllib.parse
|
||||
from binascii import unhexlify
|
||||
from typing import List, Optional
|
||||
|
||||
from mock import Mock
|
||||
|
||||
@@ -26,7 +27,7 @@ import synapse.rest.admin
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.rest.admin import VersionServlet
|
||||
from synapse.rest.client.v1 import events, login, room
|
||||
from synapse.rest.client.v1 import directory, events, login, room
|
||||
from synapse.rest.client.v2_alpha import groups
|
||||
|
||||
from tests import unittest
|
||||
@@ -468,9 +469,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
# Extract media ID from the response
|
||||
server_name_and_media_id = response["content_uri"][
|
||||
6:
|
||||
] # Cut off the 'mxc://' bit
|
||||
server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
||||
server_name, media_id = server_name_and_media_id.split("/")
|
||||
|
||||
# Attempt to access the media
|
||||
@@ -516,7 +515,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
),
|
||||
)
|
||||
|
||||
def test_quarantine_all_media_in_room(self):
|
||||
def test_quarantine_all_media_in_room(self, override_url_template=None):
|
||||
self.register_user("room_admin", "pass", admin=True)
|
||||
admin_user_tok = self.login("room_admin", "pass")
|
||||
|
||||
@@ -555,9 +554,12 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
# Quarantine all media in the room
|
||||
url = "/_synapse/admin/v1/room/%s/media/quarantine" % urllib.parse.quote(
|
||||
room_id
|
||||
)
|
||||
if override_url_template:
|
||||
url = override_url_template % urllib.parse.quote(room_id)
|
||||
else:
|
||||
url = "/_synapse/admin/v1/room/%s/media/quarantine" % urllib.parse.quote(
|
||||
room_id
|
||||
)
|
||||
request, channel = self.make_request("POST", url, access_token=admin_user_tok,)
|
||||
self.render(request)
|
||||
self.pump(1.0)
|
||||
@@ -611,6 +613,10 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
),
|
||||
)
|
||||
|
||||
def test_quaraantine_all_media_in_room_deprecated_api_path(self):
|
||||
# Perform the above test with the deprecated API path
|
||||
self.test_quarantine_all_media_in_room("/_synapse/admin/v1/quarantine_media/%s")
|
||||
|
||||
def test_quarantine_all_media_by_user(self):
|
||||
self.register_user("user_admin", "pass", admin=True)
|
||||
admin_user_tok = self.login("user_admin", "pass")
|
||||
@@ -685,3 +691,389 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
|
||||
% server_and_media_id_2
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class RoomTestCase(unittest.HomeserverTestCase):
|
||||
"""Test /room admin API.
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
directory.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
# Create user
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
def test_list_rooms(self):
|
||||
"""Test that we can list rooms"""
|
||||
# Create 3 test rooms
|
||||
total_rooms = 3
|
||||
room_ids = []
|
||||
for x in range(total_rooms):
|
||||
room_id = self.helper.create_room_as(
|
||||
self.admin_user, tok=self.admin_user_tok
|
||||
)
|
||||
room_ids.append(room_id)
|
||||
|
||||
# Request the list of rooms
|
||||
url = "/_synapse/admin/v1/rooms"
|
||||
request, channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
|
||||
# Check request completed successfully
|
||||
self.assertEqual(200, int(channel.code), msg=channel.json_body)
|
||||
|
||||
# Check that response json body contains a "rooms" key
|
||||
self.assertTrue(
|
||||
"rooms" in channel.json_body,
|
||||
msg="Response body does not " "contain a 'rooms' key",
|
||||
)
|
||||
|
||||
# Check that 3 rooms were returned
|
||||
self.assertEqual(3, len(channel.json_body["rooms"]), msg=channel.json_body)
|
||||
|
||||
# Check their room_ids match
|
||||
returned_room_ids = [room["room_id"] for room in channel.json_body["rooms"]]
|
||||
self.assertEqual(room_ids, returned_room_ids)
|
||||
|
||||
# Check that all fields are available
|
||||
for r in channel.json_body["rooms"]:
|
||||
self.assertIn("name", r)
|
||||
self.assertIn("canonical_alias", r)
|
||||
self.assertIn("joined_members", r)
|
||||
|
||||
# Check that the correct number of total rooms was returned
|
||||
self.assertEqual(channel.json_body["total_rooms"], total_rooms)
|
||||
|
||||
# Check that the offset is correct
|
||||
# Should be 0 as we aren't paginating
|
||||
self.assertEqual(channel.json_body["offset"], 0)
|
||||
|
||||
# Check that the prev_batch parameter is not present
|
||||
self.assertNotIn("prev_batch", channel.json_body)
|
||||
|
||||
# We shouldn't receive a next token here as there's no further rooms to show
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
def test_list_rooms_pagination(self):
|
||||
"""Test that we can get a full list of rooms through pagination"""
|
||||
# Create 5 test rooms
|
||||
total_rooms = 5
|
||||
room_ids = []
|
||||
for x in range(total_rooms):
|
||||
room_id = self.helper.create_room_as(
|
||||
self.admin_user, tok=self.admin_user_tok
|
||||
)
|
||||
room_ids.append(room_id)
|
||||
|
||||
# Set the name of the rooms so we get a consistent returned ordering
|
||||
for idx, room_id in enumerate(room_ids):
|
||||
self.helper.send_state(
|
||||
room_id, "m.room.name", {"name": str(idx)}, tok=self.admin_user_tok,
|
||||
)
|
||||
|
||||
# Request the list of rooms
|
||||
returned_room_ids = []
|
||||
start = 0
|
||||
limit = 2
|
||||
|
||||
run_count = 0
|
||||
should_repeat = True
|
||||
while should_repeat:
|
||||
run_count += 1
|
||||
|
||||
url = "/_synapse/admin/v1/rooms?from=%d&limit=%d&order_by=%s" % (
|
||||
start,
|
||||
limit,
|
||||
"alphabetical",
|
||||
)
|
||||
request, channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEqual(
|
||||
200, int(channel.result["code"]), msg=channel.result["body"]
|
||||
)
|
||||
|
||||
self.assertTrue("rooms" in channel.json_body)
|
||||
for r in channel.json_body["rooms"]:
|
||||
returned_room_ids.append(r["room_id"])
|
||||
|
||||
# Check that the correct number of total rooms was returned
|
||||
self.assertEqual(channel.json_body["total_rooms"], total_rooms)
|
||||
|
||||
# Check that the offset is correct
|
||||
# We're only getting 2 rooms each page, so should be 2 * last run_count
|
||||
self.assertEqual(channel.json_body["offset"], 2 * (run_count - 1))
|
||||
|
||||
if run_count > 1:
|
||||
# Check the value of prev_batch is correct
|
||||
self.assertEqual(channel.json_body["prev_batch"], 2 * (run_count - 2))
|
||||
|
||||
if "next_batch" not in channel.json_body:
|
||||
# We have reached the end of the list
|
||||
should_repeat = False
|
||||
else:
|
||||
# Make another query with an updated start value
|
||||
start = channel.json_body["next_batch"]
|
||||
|
||||
# We should've queried the endpoint 3 times
|
||||
self.assertEqual(
|
||||
run_count,
|
||||
3,
|
||||
msg="Should've queried 3 times for 5 rooms with limit 2 per query",
|
||||
)
|
||||
|
||||
# Check that we received all of the room ids
|
||||
self.assertEqual(room_ids, returned_room_ids)
|
||||
|
||||
url = "/_synapse/admin/v1/rooms?from=%d&limit=%d" % (start, limit)
|
||||
request, channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
def test_correct_room_attributes(self):
|
||||
"""Test the correct attributes for a room are returned"""
|
||||
# Create a test room
|
||||
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
test_alias = "#test:test"
|
||||
test_room_name = "something"
|
||||
|
||||
# Have another user join the room
|
||||
user_2 = self.register_user("user4", "pass")
|
||||
user_tok_2 = self.login("user4", "pass")
|
||||
self.helper.join(room_id, user_2, tok=user_tok_2)
|
||||
|
||||
# Create a new alias to this room
|
||||
url = "/_matrix/client/r0/directory/room/%s" % (urllib.parse.quote(test_alias),)
|
||||
request, channel = self.make_request(
|
||||
"PUT",
|
||||
url.encode("ascii"),
|
||||
{"room_id": room_id},
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Set this new alias as the canonical alias for this room
|
||||
self.helper.send_state(
|
||||
room_id,
|
||||
"m.room.aliases",
|
||||
{"aliases": [test_alias]},
|
||||
tok=self.admin_user_tok,
|
||||
state_key="test",
|
||||
)
|
||||
self.helper.send_state(
|
||||
room_id,
|
||||
"m.room.canonical_alias",
|
||||
{"alias": test_alias},
|
||||
tok=self.admin_user_tok,
|
||||
)
|
||||
|
||||
# Set a name for the room
|
||||
self.helper.send_state(
|
||||
room_id, "m.room.name", {"name": test_room_name}, tok=self.admin_user_tok,
|
||||
)
|
||||
|
||||
# Request the list of rooms
|
||||
url = "/_synapse/admin/v1/rooms"
|
||||
request, channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Check that rooms were returned
|
||||
self.assertTrue("rooms" in channel.json_body)
|
||||
rooms = channel.json_body["rooms"]
|
||||
|
||||
# Check that only one room was returned
|
||||
self.assertEqual(len(rooms), 1)
|
||||
|
||||
# And that the value of the total_rooms key was correct
|
||||
self.assertEqual(channel.json_body["total_rooms"], 1)
|
||||
|
||||
# Check that the offset is correct
|
||||
# We're not paginating, so should be 0
|
||||
self.assertEqual(channel.json_body["offset"], 0)
|
||||
|
||||
# Check that there is no `prev_batch`
|
||||
self.assertNotIn("prev_batch", channel.json_body)
|
||||
|
||||
# Check that there is no `next_batch`
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
# Check that all provided attributes are set
|
||||
r = rooms[0]
|
||||
self.assertEqual(room_id, r["room_id"])
|
||||
self.assertEqual(test_room_name, r["name"])
|
||||
self.assertEqual(test_alias, r["canonical_alias"])
|
||||
|
||||
def test_room_list_sort_order(self):
|
||||
"""Test room list sort ordering. alphabetical versus number of members,
|
||||
reversing the order, etc.
|
||||
"""
|
||||
# Create 3 test rooms
|
||||
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
room_id_3 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
# Set room names in alphabetical order. room 1 -> A, 2 -> B, 3 -> C
|
||||
self.helper.send_state(
|
||||
room_id_1, "m.room.name", {"name": "A"}, tok=self.admin_user_tok,
|
||||
)
|
||||
self.helper.send_state(
|
||||
room_id_2, "m.room.name", {"name": "B"}, tok=self.admin_user_tok,
|
||||
)
|
||||
self.helper.send_state(
|
||||
room_id_3, "m.room.name", {"name": "C"}, tok=self.admin_user_tok,
|
||||
)
|
||||
|
||||
# Set room member size in the reverse order. room 1 -> 1 member, 2 -> 2, 3 -> 3
|
||||
user_1 = self.register_user("bob1", "pass")
|
||||
user_1_tok = self.login("bob1", "pass")
|
||||
self.helper.join(room_id_2, user_1, tok=user_1_tok)
|
||||
|
||||
user_2 = self.register_user("bob2", "pass")
|
||||
user_2_tok = self.login("bob2", "pass")
|
||||
self.helper.join(room_id_3, user_2, tok=user_2_tok)
|
||||
|
||||
user_3 = self.register_user("bob3", "pass")
|
||||
user_3_tok = self.login("bob3", "pass")
|
||||
self.helper.join(room_id_3, user_3, tok=user_3_tok)
|
||||
|
||||
def _order_test(
|
||||
order_type: str, expected_room_list: List[str], reverse: bool = False,
|
||||
):
|
||||
"""Request the list of rooms in a certain order. Assert that order is what
|
||||
we expect
|
||||
|
||||
Args:
|
||||
order_type: The type of ordering to give the server
|
||||
expected_room_list: The list of room_ids in the order we expect to get
|
||||
back from the server
|
||||
"""
|
||||
# Request the list of rooms in the given order
|
||||
url = "/_synapse/admin/v1/rooms?order_by=%s" % (order_type,)
|
||||
if reverse:
|
||||
url += "&dir=b"
|
||||
request, channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
|
||||
# Check that rooms were returned
|
||||
self.assertTrue("rooms" in channel.json_body)
|
||||
rooms = channel.json_body["rooms"]
|
||||
|
||||
# Check for the correct total_rooms value
|
||||
self.assertEqual(channel.json_body["total_rooms"], 3)
|
||||
|
||||
# Check that the offset is correct
|
||||
# We're not paginating, so should be 0
|
||||
self.assertEqual(channel.json_body["offset"], 0)
|
||||
|
||||
# Check that there is no `prev_batch`
|
||||
self.assertNotIn("prev_batch", channel.json_body)
|
||||
|
||||
# Check that there is no `next_batch`
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
# Check that rooms were returned in alphabetical order
|
||||
returned_order = [r["room_id"] for r in rooms]
|
||||
self.assertListEqual(expected_room_list, returned_order) # order is checked
|
||||
|
||||
# Test different sort orders, with forward and reverse directions
|
||||
_order_test("alphabetical", [room_id_1, room_id_2, room_id_3])
|
||||
_order_test("alphabetical", [room_id_3, room_id_2, room_id_1], reverse=True)
|
||||
|
||||
_order_test("size", [room_id_3, room_id_2, room_id_1])
|
||||
_order_test("size", [room_id_1, room_id_2, room_id_3], reverse=True)
|
||||
|
||||
def test_search_term(self):
|
||||
"""Test that searching for a room works correctly"""
|
||||
# Create two test rooms
|
||||
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
room_name_1 = "something"
|
||||
room_name_2 = "else"
|
||||
|
||||
# Set the name for each room
|
||||
self.helper.send_state(
|
||||
room_id_1, "m.room.name", {"name": room_name_1}, tok=self.admin_user_tok,
|
||||
)
|
||||
self.helper.send_state(
|
||||
room_id_2, "m.room.name", {"name": room_name_2}, tok=self.admin_user_tok,
|
||||
)
|
||||
|
||||
def _search_test(
|
||||
expected_room_id: Optional[str],
|
||||
search_term: str,
|
||||
expected_http_code: int = 200,
|
||||
):
|
||||
"""Search for a room and check that the returned room's id is a match
|
||||
|
||||
Args:
|
||||
expected_room_id: The room_id expected to be returned by the API. Set
|
||||
to None to expect zero results for the search
|
||||
search_term: The term to search for room names with
|
||||
expected_http_code: The expected http code for the request
|
||||
"""
|
||||
url = "/_synapse/admin/v1/rooms?search_term=%s" % (search_term,)
|
||||
request, channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEqual(expected_http_code, channel.code, msg=channel.json_body)
|
||||
|
||||
if expected_http_code != 200:
|
||||
return
|
||||
|
||||
# Check that rooms were returned
|
||||
self.assertTrue("rooms" in channel.json_body)
|
||||
rooms = channel.json_body["rooms"]
|
||||
|
||||
# Check that the expected number of rooms were returned
|
||||
expected_room_count = 1 if expected_room_id else 0
|
||||
self.assertEqual(len(rooms), expected_room_count)
|
||||
self.assertEqual(channel.json_body["total_rooms"], expected_room_count)
|
||||
|
||||
# Check that the offset is correct
|
||||
# We're not paginating, so should be 0
|
||||
self.assertEqual(channel.json_body["offset"], 0)
|
||||
|
||||
# Check that there is no `prev_batch`
|
||||
self.assertNotIn("prev_batch", channel.json_body)
|
||||
|
||||
# Check that there is no `next_batch`
|
||||
self.assertNotIn("next_batch", channel.json_body)
|
||||
|
||||
if expected_room_id:
|
||||
# Check that the first returned room id is correct
|
||||
r = rooms[0]
|
||||
self.assertEqual(expected_room_id, r["room_id"])
|
||||
|
||||
# Perform search tests
|
||||
_search_test(room_id_1, "something")
|
||||
_search_test(room_id_1, "thing")
|
||||
|
||||
_search_test(room_id_2, "else")
|
||||
_search_test(room_id_2, "se")
|
||||
|
||||
_search_test(None, "foo")
|
||||
_search_test(None, "bar")
|
||||
_search_test(None, "", expected_http_code=400)
|
||||
|
||||
@@ -435,6 +435,19 @@ class UserRestTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(0, channel.json_body["is_guest"])
|
||||
self.assertEqual(0, channel.json_body["deactivated"])
|
||||
|
||||
# Change password
|
||||
body = json.dumps({"password": "hahaha"})
|
||||
|
||||
request, channel = self.make_request(
|
||||
"PUT",
|
||||
self.url,
|
||||
access_token=self.admin_user_tok,
|
||||
content=body.encode(encoding="utf_8"),
|
||||
)
|
||||
self.render(request)
|
||||
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Modify user
|
||||
body = json.dumps({"displayname": "foobar", "deactivated": True})
|
||||
|
||||
|
||||
@@ -134,3 +134,30 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
# someone else set topic, expect 6 (join,send,topic,join,send,topic)
|
||||
pass
|
||||
|
||||
|
||||
class GetEventsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
events.register_servlets,
|
||||
room.register_servlets,
|
||||
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, hs, reactor, clock):
|
||||
|
||||
# register an account
|
||||
self.user_id = self.register_user("sid1", "pass")
|
||||
self.token = self.login(self.user_id, "pass")
|
||||
|
||||
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
||||
|
||||
def test_get_event_via_events(self):
|
||||
resp = self.helper.send(self.room_id, tok=self.token)
|
||||
event_id = resp["event_id"]
|
||||
|
||||
request, channel = self.make_request(
|
||||
"GET", "/events/" + event_id, access_token=self.token,
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEquals(channel.code, 200, msg=channel.result)
|
||||
|
||||
@@ -149,6 +149,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
|
||||
|
||||
self.media_repo = hs.get_media_repository_resource()
|
||||
self.download_resource = self.media_repo.children[b"download"]
|
||||
self.thumbnail_resource = self.media_repo.children[b"thumbnail"]
|
||||
|
||||
# smol png
|
||||
self.end_content = unhexlify(
|
||||
@@ -157,11 +158,11 @@ class MediaRepoTests(unittest.HomeserverTestCase):
|
||||
b"0a2db40000000049454e44ae426082"
|
||||
)
|
||||
|
||||
self.media_id = "example.com/12345"
|
||||
|
||||
def _req(self, content_disposition):
|
||||
|
||||
request, channel = self.make_request(
|
||||
"GET", "example.com/12345", shorthand=False
|
||||
)
|
||||
request, channel = self.make_request("GET", self.media_id, shorthand=False)
|
||||
request.render(self.download_resource)
|
||||
self.pump()
|
||||
|
||||
@@ -170,7 +171,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
|
||||
self.assertEqual(len(self.fetches), 1)
|
||||
self.assertEqual(self.fetches[0][1], "example.com")
|
||||
self.assertEqual(
|
||||
self.fetches[0][2], "/_matrix/media/v1/download/example.com/12345"
|
||||
self.fetches[0][2], "/_matrix/media/v1/download/" + self.media_id
|
||||
)
|
||||
self.assertEqual(self.fetches[0][3], {"allow_remote": "false"})
|
||||
|
||||
@@ -229,3 +230,42 @@ class MediaRepoTests(unittest.HomeserverTestCase):
|
||||
headers = channel.headers
|
||||
self.assertEqual(headers.getRawHeaders(b"Content-Type"), [b"image/png"])
|
||||
self.assertEqual(headers.getRawHeaders(b"Content-Disposition"), None)
|
||||
|
||||
def test_thumbnail_crop(self):
|
||||
expected_body = unhexlify(
|
||||
b"89504e470d0a1a0a0000000d4948445200000020000000200806"
|
||||
b"000000737a7af40000001a49444154789cedc101010000008220"
|
||||
b"ffaf6e484001000000ef0610200001194334ee0000000049454e"
|
||||
b"44ae426082"
|
||||
)
|
||||
|
||||
self._test_thumbnail("crop", expected_body)
|
||||
|
||||
def test_thumbnail_scale(self):
|
||||
expected_body = unhexlify(
|
||||
b"89504e470d0a1a0a0000000d4948445200000001000000010806"
|
||||
b"0000001f15c4890000000d49444154789c636060606000000005"
|
||||
b"0001a5f645400000000049454e44ae426082"
|
||||
)
|
||||
|
||||
self._test_thumbnail("scale", expected_body)
|
||||
|
||||
def _test_thumbnail(self, method, expected_body):
|
||||
params = "?width=32&height=32&method=" + method
|
||||
request, channel = self.make_request(
|
||||
"GET", self.media_id + params, shorthand=False
|
||||
)
|
||||
request.render(self.thumbnail_resource)
|
||||
self.pump()
|
||||
|
||||
headers = {
|
||||
b"Content-Length": [b"%d" % (len(self.end_content))],
|
||||
b"Content-Type": [b"image/png"],
|
||||
}
|
||||
self.fetches[0][0].callback(
|
||||
(self.end_content, (len(self.end_content), headers))
|
||||
)
|
||||
self.pump()
|
||||
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(channel.result["body"], expected_body, channel.result["body"])
|
||||
|
||||
@@ -463,7 +463,7 @@ class HomeserverTestCase(TestCase):
|
||||
# Create the user
|
||||
request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
|
||||
self.render(request)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(channel.code, 200, msg=channel.result)
|
||||
nonce = channel.json_body["nonce"]
|
||||
|
||||
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
|
||||
|
||||
4
tox.ini
4
tox.ini
@@ -177,13 +177,13 @@ env =
|
||||
MYPYPATH = stubs/
|
||||
extras = all
|
||||
commands = mypy \
|
||||
synapse/api \
|
||||
synapse/config/ \
|
||||
synapse/handlers/ui_auth \
|
||||
synapse/logging/ \
|
||||
synapse/module_api \
|
||||
synapse/replication \
|
||||
synapse/rest/consent \
|
||||
synapse/rest/saml2 \
|
||||
synapse/rest \
|
||||
synapse/spam_checker_api \
|
||||
synapse/storage/engines \
|
||||
synapse/streams
|
||||
|
||||
Reference in New Issue
Block a user