1
0

Compare commits

..

117 Commits

Author SHA1 Message Date
Neil Johnson
3c099219e0 bump version and changelog for 0.29.0 2018-05-16 15:44:59 +01:00
Neil Johnson
589ecc5b58 further musical chairs 2018-05-14 17:49:59 +01:00
Neil Johnson
e71fb118f4 rearrange and collect related PRs 2018-05-14 17:39:22 +01:00
Neil Johnson
aea80a0118 v0.29.0-rc1: bump version and change log 2018-05-14 15:50:57 +01:00
David Baker
8cbbfd16fb Merge pull request #3201 from matrix-org/dbkr/leave_rooms_on_deactivate
Part user from rooms on account deactivate
2018-05-14 11:31:48 +01:00
Michael Kaye
16f41237f0 Merge pull request #2846 from kaiyou/feat-dockerfile
Add a Dockerfile for synapse
2018-05-11 17:21:12 +01:00
Richard van der Hoff
c25d7ba12e Merge pull request #3208 from matrix-org/rav/more_refactor_request_handler
Set Server header in SynapseRequest
2018-05-11 16:07:47 +01:00
Richard van der Hoff
23e2dfe940 Merge pull request #3209 from damir-manapov/master
transaction_id, destination defined twice
2018-05-11 00:35:13 +01:00
Richard van der Hoff
bd8d0cfab1 Merge remote-tracking branch 'origin/master' into develop 2018-05-11 00:19:26 +01:00
Damir Manapov
db18d854cd transaction_id, destination twice 2018-05-10 22:13:31 +03:00
Richard van der Hoff
318711e139 Set Server header in SynapseRequest
(instead of everywhere that writes a response. Or rather, the subset of places
which write responses where we haven't forgotten it).

This also means that we don't have to have the mysterious version_string
attribute in anything with a request handler.

Unfortunately it does mean that we have to pass the version string wherever we
instantiate a SynapseSite, which has been c&ped 150 times, but that is code
that ought to be cleaned up anyway really.
2018-05-10 18:50:27 +01:00
Richard van der Hoff
7b411007e6 Merge pull request #3203 from matrix-org/rav/refactor_request_handler
Refactor request handling wrappers
2018-05-10 15:26:53 +01:00
David Baker
6b49628e3b Catch failure to part user from room 2018-05-10 12:23:53 +01:00
David Baker
217bc53c98 Many docstrings 2018-05-10 12:20:40 +01:00
Richard van der Hoff
645cb4bf06 Remove redundant request_handler decorator
This is needless complexity; we might as well use the wrapper directly.

Also rename wrap_request_handler->wrap_json_request_handler.
2018-05-10 12:19:53 +01:00
Richard van der Hoff
09f570b935 Factor wrap_request_handler_with_logging out of wrap_request_handler
... so that it can be used on non-JSON endpoints
2018-05-10 12:19:52 +01:00
Richard van der Hoff
9589a1925e Remove include_metrics param
The metrics are now available via the request, so this is redundant and can go
away at last.
2018-05-10 12:19:52 +01:00
Richard van der Hoff
49e5a613f1 Move outgoing_responses_counter handling to RequestMetrics
it's much neater there.
2018-05-10 12:19:52 +01:00
Richard van der Hoff
b8700dd7d0 Bump requests_counter in wrapped_request_handler
less magic
2018-05-10 12:19:52 +01:00
Richard van der Hoff
c6f730282c Move RequestMetrics handling into SynapseRequest.processing()
It fits quite nicely here, and opens the path to getting rid of the
"include_metrics" mess.
2018-05-10 12:19:51 +01:00
Richard van der Hoff
09b29f9c4a Make RequestMetrics take a raw time rather than a clock
... which is going to make it easier to move around.
2018-05-10 12:18:52 +01:00
David Baker
4d298506dd Oops, don't call function passed to run_in_background 2018-05-10 11:57:13 +01:00
Richard van der Hoff
8460e48d06 Move request_id management into SynapseRequest 2018-05-10 11:48:17 +01:00
Richard van der Hoff
18e144fe08 Move RequestsMetrics to its own file
This is useful in its own right, because server.py is full of stuff; but more
importantly, I want to do some refactoring that will cause a circular reference
as it is.
2018-05-09 19:55:03 +01:00
Erik Johnston
bfe1f73855 Merge pull request #3199 from matrix-org/erikj/pagination_sync
Refactor sync APIs to reuse pagination API
2018-05-09 16:16:56 +01:00
Erik Johnston
5adb75bcba Merge pull request #3198 from matrix-org/erikj/fixup_return_pagination
Refactor get_recent_events_for_room return type
2018-05-09 16:07:14 +01:00
Erik Johnston
a5c98dda48 Merge pull request #3200 from matrix-org/erikj/remove_membership_change
Remove unused code path from member change DB func
2018-05-09 16:02:13 +01:00
Erik Johnston
d26bec8a43 Add comment to sync as to why code path is split 2018-05-09 15:56:07 +01:00
Erik Johnston
fcf55f2255 Fix returned token is no longer a tuple 2018-05-09 15:43:00 +01:00
Erik Johnston
7ce98804ff Fix up comment 2018-05-09 15:42:39 +01:00
Erik Johnston
cddf91c8b9 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/remove_membership_change 2018-05-09 15:32:07 +01:00
Erik Johnston
9896dab8f6 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/fixup_return_pagination 2018-05-09 15:31:33 +01:00
Erik Johnston
1e5280b7d0 Merge pull request #3196 from matrix-org/erikj/pagination_return
Refactor pagination DB API to return concrete type
2018-05-09 15:27:17 +01:00
Erik Johnston
75552d2148 Update comments 2018-05-09 15:15:38 +01:00
David Baker
294e9a0c9b Prefix internal functions 2018-05-09 15:10:37 +01:00
David Baker
46df23f581 Add the schema file 2018-05-09 15:07:54 +01:00
David Baker
52281e4c54 Indent fail 2018-05-09 15:06:16 +01:00
David Baker
7e8726b8fb Part deactivated users in the background
One room at a time so we don't take out the whole server with leave
events, and restart at server restart.
2018-05-09 14:54:28 +01:00
Erik Johnston
c0e08dc45b Remove unused code path from member change DB func
The function is never called without a from_key, so we can remove all
the handling for that scenario.
2018-05-09 14:31:32 +01:00
Erik Johnston
0461ef01b7 Merge pull request #3195 from matrix-org/erikj/pagination_refactor
Refactor recent events func to use pagination func
2018-05-09 14:12:24 +01:00
Erik Johnston
e2accd7f1d Refactor sync APIs to reuse pagination API
The sync API often returns events in a topological rather than stream
ordering, e.g. when the user joined the room or on initial sync. When
this happens we can reuse existing pagination storage functions.
2018-05-09 13:43:39 +01:00
Erik Johnston
e5ab9cd24b Don't unnecessarily require token to be stream token
This allows calling the `get_recent_event_ids_for_room` function in more
situations.
2018-05-09 11:58:35 +01:00
Richard van der Hoff
60590211c1 Merge pull request #3194 from rubo77/fix-nuke
nuke-room-from-db.sh: fix deletion from search table
2018-05-09 11:58:07 +01:00
Erik Johnston
c4af4c24ca Refactor get_recent_events_for_room return type
There is no reason to return a tuple of tokens when the last token is
always the token passed as an argument. Changing it makes it consistent
with other storage APIs
2018-05-09 11:55:34 +01:00
Erik Johnston
05e0a2462c Refactor pagination DB API to return concrete type
This makes it easier to document what is being returned by the storage
functions and what some functions expect as arguments.
2018-05-09 11:34:24 +01:00
Erik Johnston
7dd13415db Remove unused from_token param 2018-05-09 10:58:16 +01:00
Erik Johnston
27cf170558 Refactor recent events func to use pagination func
This also removes a cache that is unlikely to ever get hit.
2018-05-09 10:55:55 +01:00
Erik Johnston
1aeb5e28a9 Merge pull request #3193 from matrix-org/erikj/pagination_refactor
Refactor /context to reuse pagination storage functions
2018-05-09 10:22:38 +01:00
Erik Johnston
23ec51c94c Fix up comments and make function private 2018-05-09 09:55:19 +01:00
Richard van der Hoff
d5377eba55 Merge pull request #2337 from rubo77/patch-5
nuke-room-from-db.sh: added postgresql option and help
2018-05-09 09:43:07 +01:00
rubo77
d11b8b6b65 nuke-room-from-db.sh: nuke from table event_search too 2018-05-09 00:46:47 +02:00
rubo77
8ff8ab3bce Dont nuke non-existing table event_search_content 2018-05-09 00:21:00 +02:00
rubo77
6c957e26f0 nuke-room-from-db.sh: added postgresql option and help 2018-05-09 00:14:01 +02:00
Erik Johnston
696f532453 Reuse existing pagination code for context API 2018-05-08 16:20:19 +01:00
Erik Johnston
3e6d306e94 Parse tokens before calling DB function 2018-05-08 16:18:58 +01:00
Erik Johnston
274b8c6025 Only fetch required fields from database 2018-05-08 16:15:25 +01:00
Erik Johnston
06c0d0ed08 Split paginate_room_events storage function 2018-05-08 16:14:26 +01:00
David Baker
bf98fa0864 Part user from rooms on account deactivate
This implements this very crudely: this probably isn't viable
because parting a user from all their rooms could take a long time,
and if the HS gets restarted in that time the process will be
aborted.
2018-05-08 15:58:35 +01:00
Richard van der Hoff
678e649b78 Merge pull request #3190 from mujx/notif-token-fix
notifications: Convert next_token to string according to the spec
2018-05-08 13:54:47 +01:00
Erik Johnston
0b7dfbb194 Merge pull request #3186 from matrix-org/erikj/fix_int_values_metrics
Fix metrics that have integer value labels
2018-05-08 09:48:50 +01:00
Konstantinos Sideris
88868b2839 notifications: Convert next_token to string according to the spec
Currently the parameter is serialized as an integer.

Signed-off-by: Konstantinos Sideris <sideris.konstantin@gmail.com>
2018-05-05 12:55:02 +03:00
kaiyou
5addeaa02c Add Docker packaging in the author list 2018-05-04 21:23:01 +02:00
Erik Johnston
6d8ec3462d Note that label values can be anything 2018-05-03 16:25:05 +01:00
Erik Johnston
95b6912045 Fix metrics that have integer value labels 2018-05-03 15:51:04 +01:00
kaiyou
9a779c2ddb Merge remote-tracking branch 'upstream/master' into feat-dockerfile 2018-05-02 20:22:41 +02:00
kaiyou
4f2e898c29 Make the logging level configurable 2018-05-01 20:50:03 +02:00
kaiyou
d4c14e1438 Fix the documentation about 'POSTGRES_DB' 2018-05-01 20:47:58 +02:00
Matthew Hodgson
da602419b2 missing word :| 2018-05-01 19:19:23 +01:00
Matthew Hodgson
562532dd2d Merge branch 'release-v0.28.1' 2018-05-01 19:04:11 +01:00
kaiyou
041b41a825 Merge remote-tracking branch 'upstream/master' into feat-dockerfile 2018-04-14 12:27:16 +02:00
kaiyou
a13b7860c6 Merge remote-tracking branch 'upstream/master' into feat-dockerfile 2018-04-08 17:56:44 +02:00
kaiyou
757f1b5843 Merge remote-tracking branch 'upstream/master' into feat-dockerfile 2018-03-17 16:02:08 +01:00
kaiyou
f44b7c022f Disable logging to file and rely on the console when using Docker 2018-02-10 23:57:51 +01:00
kaiyou
07f1b71819 Explicitely provide the postgres password to synapse in the Compose example 2018-02-10 23:57:36 +01:00
kaiyou
b815aa0e2d Remove an accidentally committed test configuration 2018-02-10 21:59:58 +01:00
kaiyou
6f0b1f85f9 Generate macaroon and registration secrets, then store the results to the data dir 2018-02-10 00:05:03 +01:00
kaiyou
ca70148c05 Fix the path to the log config file 2018-02-09 00:23:19 +01:00
kaiyou
e511979fe6 Make SYNAPSE_MACAROON_SECRET_KEY a mandatory option 2018-02-09 00:13:26 +01:00
kaiyou
a03c382966 Specify the Docker registry for the postgres image 2018-02-08 22:00:43 +01:00
kaiyou
48e2c641b8 Specify the Docker registry in the build tag 2018-02-08 21:58:12 +01:00
kaiyou
d8680c969b Make it clear that the image has two modes of operation 2018-02-08 21:55:35 +01:00
kaiyou
b9b668e4bb Update to Alpine 3.7 and switch to libressl 2018-02-08 21:39:36 +01:00
kaiyou
ef1f8d4be6 Enable email server configuration from environment variables 2018-02-08 20:53:12 +01:00
kaiyou
a0af0054ec Honor the SYNAPSE_REPORT_STATS parameter in the Docker image 2018-02-08 20:46:11 +01:00
kaiyou
914a59cb8c Disable the Web client in the Docker image 2018-02-08 20:43:45 +01:00
kaiyou
e174c46a29 Use 'synapse' as a default postgres user in Docker examples 2018-02-08 20:42:57 +01:00
kaiyou
b8a4dceb3c Refactor the start script to better handle mandatory parameters 2018-02-08 20:41:41 +01:00
kaiyou
084afbb6a0 Rename the permissions variable to avoid confusion 2018-02-08 19:50:04 +01:00
kaiyou
58df3a8c5d Add some documentation about high performance storage 2018-02-08 19:48:53 +01:00
kaiyou
63fd148724 Make it clear that two modes are avaiable in the documentation, improve the compose file 2018-02-08 19:46:11 +01:00
kaiyou
1ffd9cb936 Support loading application service files from /data/appservices/ 2018-02-05 23:13:27 +01:00
kaiyou
107a5c9441 Add the non-tls port to the expose list 2018-02-05 23:02:33 +01:00
kaiyou
ee3b160a2a Only generate configuration files when necessary 2018-02-05 22:57:35 +01:00
kaiyou
630573a932 Do not copy documentation files to the Docker root folder 2018-02-05 22:57:22 +01:00
kaiyou
f5364b47ec Point to the 'latest' tag in the Docker documentation 2018-02-05 22:14:40 +01:00
kaiyou
d8c7da5dca Fix a typo in the Docker README 2018-02-05 22:12:50 +01:00
kaiyou
cf4ef60e28 Document the cache factor environment variable for Docker 2018-02-05 22:10:03 +01:00
kaiyou
cd51931b62 Add dynamic TURN configuration in the Docker image 2018-02-05 21:53:53 +01:00
kaiyou
81010a126e Add dynamic recaptcha configuration in the Docker image 2018-02-05 21:28:15 +01:00
kaiyou
8db84e9b21 Remove docker related files from the python manifest 2018-02-05 20:08:35 +01:00
kaiyou
e9021e16c4 Run the server as an unprivileged user 2018-02-04 23:19:08 +01:00
kaiyou
f72c9c1fb6 Fix multiple typos 2018-02-04 16:18:40 +01:00
kaiyou
b8ab78b82c Add the build cache/ folder to gitignore 2018-02-04 15:41:54 +01:00
kaiyou
9a87b8aaf7 Update sumperdump Docker readme to match this image properties 2018-02-04 15:27:32 +01:00
kaiyou
84a9209ba7 Remove etc/service files from rob's branch 2018-02-04 15:08:43 +01:00
kaiyou
53965334da Merge remote-tracking branch 'origin/rob/docker' into feat-dockerfile 2018-02-04 15:08:19 +01:00
kaiyou
a207cccb05 Reuse environment variables of the postgres container 2018-02-04 15:04:26 +01:00
kaiyou
1ba2fe114c Provide an example docker compose file 2018-02-04 12:55:20 +01:00
kaiyou
042757feb2 Install the postgres dependencies 2018-02-04 12:28:42 +01:00
kaiyou
886c2d5019 Support an external postgresql config in the Docker image 2018-02-04 12:20:29 +01:00
kaiyou
f2bf0cda02 Generate shared secrets if not defined in the environment 2018-02-04 11:40:20 +01:00
kaiyou
6d1e28a842 Generate any missing keys before starting synapse 2018-02-04 11:14:06 +01:00
kaiyou
48bc22f89d Allow for a wheel cache and include missing files in the build 2018-02-04 10:58:07 +01:00
kaiyou
d434ae3387 Add template config files for the Docker image 2018-02-03 20:30:08 +01:00
kaiyou
431476fbc4 Initial commit including a Dockerfile for synapse 2018-02-03 20:18:36 +01:00
Robert Swain
24d162814b docker: s/matrix-org/matrixdotorg/g 2017-09-29 11:40:15 +02:00
Robert Swain
95e02b856b docker: Initial Dockerfile and docker-compose.yaml 2017-09-28 12:12:47 +02:00
48 changed files with 1381 additions and 544 deletions

5
.dockerignore Normal file
View File

@@ -0,0 +1,5 @@
Dockerfile
.travis.yml
.gitignore
demo/etc
tox.ini

1
.gitignore vendored
View File

@@ -32,6 +32,7 @@ demo/media_store.*
demo/etc
uploads
cache
.idea/
media_store/

View File

@@ -60,3 +60,6 @@ Niklas Riekenbrauck <nikriek at gmail dot.com>
Christoph Witzany <christoph at web.crofting.com>
* Add LDAP support for authentication
Pierre Jaury <pierre at jaury.eu>
* Docker packaging

View File

@@ -1,5 +1,13 @@
Changes in synapse <unreleased>
===============================
Changes in synapse v0.29.0 (2018-05-16)
===========================================
Changes in synapse v0.29.0-rc1 (2018-05-14)
===========================================
Notable changes, a docker file for running Synapse (Thanks to @kaiyou!) and a
closed spec bug in the Client Server API. Additionally further prep for Python 3
migration.
Potentially breaking change:
@@ -12,6 +20,66 @@ Potentially breaking change:
Thanks to @NotAFile for fixing this.
Features:
* Add a Dockerfile for synapse (PR #2846) Thanks to @kaiyou!
Changes - General:
* nuke-room-from-db.sh: added postgresql option and help (PR #2337) Thanks to @rubo77!
* Part user from rooms on account deactivate (PR #3201)
* Make 'unexpected logging context' into warnings (PR #3007)
* Set Server header in SynapseRequest (PR #3208)
* remove duplicates from groups tables (PR #3129)
* Improve exception handling for background processes (PR #3138)
* Add missing consumeErrors to improve exception handling (PR #3139)
* reraise exceptions more carefully (PR #3142)
* Remove redundant call to preserve_fn (PR #3143)
* Trap exceptions thrown within run_in_background (PR #3144)
Changes - Refactors:
* Refactor /context to reuse pagination storage functions (PR #3193)
* Refactor recent events func to use pagination func (PR #3195)
* Refactor pagination DB API to return concrete type (PR #3196)
* Refactor get_recent_events_for_room return type (PR #3198)
* Refactor sync APIs to reuse pagination API (PR #3199)
* Remove unused code path from member change DB func (PR #3200)
* Refactor request handling wrappers (PR #3203)
* transaction_id, destination defined twice (PR #3209) Thanks to @damir-manapov!
* Refactor event storage to prepare for changes in state calculations (PR #3141)
* Set Server header in SynapseRequest (PR #3208)
* Use deferred.addTimeout instead of time_bound_deferred (PR #3127, #3178)
* Use run_in_background in preference to preserve_fn (PR #3140)
Changes - Python 3 migration:
* Construct HMAC as bytes on py3 (PR #3156) Thanks to @NotAFile!
* run config tests on py3 (PR #3159) Thanks to @NotAFile!
* Open certificate files as bytes (PR #3084) Thanks to @NotAFile!
* Open config file in non-bytes mode (PR #3085) Thanks to @NotAFile!
* Make event properties raise AttributeError instead (PR #3102) Thanks to @NotAFile!
* Use six.moves.urlparse (PR #3108) Thanks to @NotAFile!
* Add py3 tests to tox with folders that work (PR #3145) Thanks to @NotAFile!
* Don't yield in list comprehensions (PR #3150) Thanks to @NotAFile!
* Move more xrange to six (PR #3151) Thanks to @NotAFile!
* make imports local (PR #3152) Thanks to @NotAFile!
* move httplib import to six (PR #3153) Thanks to @NotAFile!
* Replace stringIO imports with six (PR #3154, #3168) Thanks to @NotAFile!
* more bytes strings (PR #3155) Thanks to @NotAFile!
Bug Fixes:
* synapse fails to start under Twisted >= 18.4 (PR #3157)
* Fix a class of logcontext leaks (PR #3170)
* Fix a couple of logcontext leaks in unit tests (PR #3172)
* Fix logcontext leak in media repo (PR #3174)
* Escape label values in prometheus metrics (PR #3175, #3186)
* Fix 'Unhandled Error' logs with Twisted 18.4 (PR #3182) Thanks to @Half-Shot!
* Fix logcontext leaks in rate limiter (PR #3183)
* notifications: Convert next_token to string according to the spec (PR #3190) Thanks to @mujx!
* nuke-room-from-db.sh: fix deletion from search table (PR #3194) Thanks to @rubo77!
* add guard for None on purge_history api (PR #3160) Thanks to @krombel!
Changes in synapse v0.28.1 (2018-05-01)
=======================================

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
FROM docker.io/python:2-alpine3.7
RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev
COPY . /synapse
# A wheel cache may be provided in ./cache for faster build
RUN cd /synapse \
&& pip install --upgrade pip setuptools psycopg2 \
&& mkdir -p /synapse/cache \
&& pip install -f /synapse/cache --upgrade --process-dependency-links . \
&& mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \
&& rm -rf setup.py setup.cfg synapse
VOLUME ["/data"]
EXPOSE 8008/tcp 8448/tcp
ENTRYPOINT ["/start.py"]

View File

@@ -25,6 +25,8 @@ recursive-include synapse/static *.js
exclude jenkins.sh
exclude jenkins*.sh
exclude jenkins*
exclude Dockerfile
exclude .dockerignore
recursive-exclude jenkins *.sh
prune .github

148
contrib/docker/README.md Normal file
View File

@@ -0,0 +1,148 @@
# Synapse Docker
This Docker image will run Synapse as a single process. It does not provide any
database server or TURN server that you should run separately.
If you run a Postgres server, you should simply have it in the same Compose
project or set the proper environment variables and the image will automatically
use that server.
## Build
Build the docker image with the `docker build` command from the root of the synapse repository.
```
docker build -t docker.io/matrixdotorg/synapse .
```
The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
You may have a local Python wheel cache available, in which case copy the relevant packages in the ``cache/`` directory at the root of the project.
## Run
This image is designed to run either with an automatically generated configuration
file or with a custom configuration that requires manual edition.
### Automated configuration
It is recommended that you use Docker Compose to run your containers, including
this image and a Postgres server. A sample ``docker-compose.yml`` is provided,
including example labels for reverse proxying and other artifacts.
Read the section about environment variables and set at least mandatory variables,
then run the server:
```
docker-compose up -d
```
### Manual configuration
A sample ``docker-compose.yml`` is provided, including example labels for
reverse proxying and other artifacts.
Specify a ``SYNAPSE_CONFIG_PATH``, preferably to a persistent path,
to use manual configuration. To generate a fresh ``homeserver.yaml``, simply run:
```
docker-compose run --rm -e SYNAPSE_SERVER_NAME=my.matrix.host synapse generate
```
Then, customize your configuration and run the server:
```
docker-compose up -d
```
### Without Compose
If you do not wish to use Compose, you may still run this image using plain
Docker commands. Note that the following is just a guideline and you may need
to add parameters to the docker run command to account for the network situation
with your postgres database.
```
docker run \
-d \
--name synapse \
-v ${DATA_PATH}:/data \
-e SYNAPSE_SERVER_NAME=my.matrix.host \
-e SYNAPSE_REPORT_STATS=yes \
docker.io/matrixdotorg/synapse:latest
```
## Volumes
The image expects a single volume, located at ``/data``, that will hold:
* temporary files during uploads;
* uploaded media and thumbnails;
* the SQLite database if you do not configure postgres;
* the appservices configuration.
You are free to use separate volumes depending on storage endpoints at your
disposal. For instance, ``/data/media`` coud be stored on a large but low
performance hdd storage while other files could be stored on high performance
endpoints.
In order to setup an application service, simply create an ``appservices``
directory in the data volume and write the application service Yaml
configuration file there. Multiple application services are supported.
## Environment
Unless you specify a custom path for the configuration file, a very generic
file will be generated, based on the following environment settings.
These are a good starting point for setting up your own deployment.
Global settings:
* ``UID``, the user id Synapse will run as [default 991]
* ``GID``, the group id Synapse will run as [default 991]
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
then customize it manually. No other environment variable is required.
Otherwise, a dynamic configuration file will be used. The following environment
variables are available for configuration:
* ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
statistics reporting back to the Matrix project which helps us to get funding.
* ``SYNAPSE_MACAROON_SECRET_KEY`` (mandatory) secret for signing access tokens
to the server, set this to a proper random key.
* ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
you run your own TLS-capable reverse proxy).
* ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
the Synapse instance.
* ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
* ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
* ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
* ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
key in order to enable recaptcha upon registration.
* ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
key in order to enable recaptcha upon registration.
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
Shared secrets, that will be initialized to random values if not set:
* ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
registration is disable.
Database specific values (will use SQLite if not set):
* `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
* `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
* `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
* `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
Mail server specific values (will not send emails if not set):
* ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
* ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
* ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.

View File

@@ -0,0 +1,219 @@
# vim:ft=yaml
## TLS ##
tls_certificate_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.crt"
tls_private_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.key"
tls_dh_params_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.dh"
no_tls: {{ "True" if SYNAPSE_NO_TLS else "False" }}
tls_fingerprints: []
## Server ##
server_name: "{{ SYNAPSE_SERVER_NAME }}"
pid_file: /homeserver.pid
web_client: False
soft_file_limit: 0
## Ports ##
listeners:
{% if not SYNAPSE_NO_TLS %}
-
port: 8448
bind_addresses: ['0.0.0.0']
type: http
tls: true
x_forwarded: false
resources:
- names: [client]
compress: true
- names: [federation] # Federation APIs
compress: false
{% endif %}
- port: 8008
tls: false
bind_addresses: ['0.0.0.0']
type: http
x_forwarded: false
resources:
- names: [client]
compress: true
- names: [federation]
compress: false
## Database ##
{% if POSTGRES_PASSWORD %}
database:
name: "psycopg2"
args:
user: "{{ POSTGRES_USER or "synapse" }}"
password: "{{ POSTGRES_PASSWORD }}"
database: "{{ POSTGRES_DB or "synapse" }}"
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
cp_min: 5
cp_max: 10
{% else %}
database:
name: "sqlite3"
args:
database: "/data/homeserver.db"
{% endif %}
## Performance ##
event_cache_size: "{{ SYNAPSE_EVENT_CACHE_SIZE or "10K" }}"
verbose: 0
log_file: "/data/homeserver.log"
log_config: "/compiled/log.config"
## Ratelimiting ##
rc_messages_per_second: 0.2
rc_message_burst_count: 10.0
federation_rc_window_size: 1000
federation_rc_sleep_limit: 10
federation_rc_sleep_delay: 500
federation_rc_reject_limit: 50
federation_rc_concurrent: 3
## Files ##
media_store_path: "/data/media"
uploads_path: "/data/uploads"
max_upload_size: "10M"
max_image_pixels: "32M"
dynamic_thumbnails: false
# List of thumbnail to precalculate when an image is uploaded.
thumbnail_sizes:
- width: 32
height: 32
method: crop
- width: 96
height: 96
method: crop
- width: 320
height: 240
method: scale
- width: 640
height: 480
method: scale
- width: 800
height: 600
method: scale
url_preview_enabled: False
max_spider_size: "10M"
## Captcha ##
{% if SYNAPSE_RECAPTCHA_PUBLIC_KEY %}
recaptcha_public_key: "{{ SYNAPSE_RECAPTCHA_PUBLIC_KEY }}"
recaptcha_private_key: "{{ SYNAPSE_RECAPTCHA_PRIVATE_KEY }}"
enable_registration_captcha: True
recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
{% else %}
recaptcha_public_key: "YOUR_PUBLIC_KEY"
recaptcha_private_key: "YOUR_PRIVATE_KEY"
enable_registration_captcha: False
recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
{% endif %}
## Turn ##
{% if SYNAPSE_TURN_URIS %}
turn_uris:
{% for uri in SYNAPSE_TURN_URIS.split(',') %} - "{{ uri }}"
{% endfor %}
turn_shared_secret: "{{ SYNAPSE_TURN_SECRET }}"
turn_user_lifetime: "1h"
turn_allow_guests: True
{% else %}
turn_uris: []
turn_shared_secret: "YOUR_SHARED_SECRET"
turn_user_lifetime: "1h"
turn_allow_guests: True
{% endif %}
## Registration ##
enable_registration: {{ "True" if SYNAPSE_ENABLE_REGISTRATION else "False" }}
registration_shared_secret: "{{ SYNAPSE_REGISTRATION_SHARED_SECRET }}"
bcrypt_rounds: 12
allow_guest_access: {{ "True" if SYNAPSE_ALLOW_GUEST else "False" }}
enable_group_creation: true
# The list of identity servers trusted to verify third party
# identifiers by this server.
trusted_third_party_id_servers:
- matrix.org
- vector.im
- riot.im
## Metrics ###
{% if SYNAPSE_REPORT_STATS.lower() == "yes" %}
enable_metrics: True
report_stats: True
{% else %}
enable_metrics: False
report_stats: False
{% endif %}
## API Configuration ##
room_invite_state_types:
- "m.room.join_rules"
- "m.room.canonical_alias"
- "m.room.avatar"
- "m.room.name"
{% if SYNAPSE_APPSERVICES %}
app_service_config_files:
{% for appservice in SYNAPSE_APPSERVICES %} - "{{ appservice }}"
{% endfor %}
{% else %}
app_service_config_files: []
{% endif %}
macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
expire_access_token: False
## Signing Keys ##
signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
old_signing_keys: {}
key_refresh_interval: "1d" # 1 Day.
# The trusted servers to download signing keys from.
perspectives:
servers:
"matrix.org":
verify_keys:
"ed25519:auto":
key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
password_config:
enabled: true
{% if SYNAPSE_SMTP_HOST %}
email:
enable_notifs: false
smtp_host: "{{ SYNAPSE_SMTP_HOST }}"
smtp_port: {{ SYNAPSE_SMTP_PORT or "25" }}
smtp_user: "{{ SYNAPSE_SMTP_USER }}"
smtp_pass: "{{ SYNAPSE_SMTP_PASSWORD }}"
require_transport_security: False
notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}"
app_name: Matrix
template_dir: res/templates
notif_template_html: notif_mail.html
notif_template_text: notif_mail.txt
notif_for_new_users: True
riot_base_url: "https://{{ SYNAPSE_SERVER_NAME }}"
{% endif %}

View File

@@ -0,0 +1,29 @@
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
filters:
context:
(): synapse.util.logcontext.LoggingContextFilter
request: ""
handlers:
console:
class: logging.StreamHandler
formatter: precise
filters: [context]
loggers:
synapse:
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
root:
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
handlers: [console]

View File

@@ -0,0 +1,49 @@
# This compose file is compatible with Compose itself, it might need some
# adjustments to run properly with stack.
version: '3'
services:
synapse:
image: docker.io/matrixdotorg/synapse:latest
# Since snyapse does not retry to connect to the database, restart upon
# failure
restart: unless-stopped
# See the readme for a full documentation of the environment settings
environment:
- SYNAPSE_SERVER_NAME=my.matrix.host
- SYNAPSE_REPORT_STATS=no
- SYNAPSE_ENABLE_REGISTRATION=yes
- SYNAPSE_LOG_LEVEL=INFO
- POSTGRES_PASSWORD=changeme
volumes:
# You may either store all the files in a local folder
- ./files:/data
# .. or you may split this between different storage points
# - ./files:/data
# - /path/to/ssd:/data/uploads
# - /path/to/large_hdd:/data/media
depends_on:
- db
# In order to expose Synapse, remove one of the following, you might for
# instance expose the TLS port directly:
ports:
- 8448:8448/tcp
# ... or use a reverse proxy, here is an example for traefik:
labels:
- traefik.enable=true
- traefik.frontend.rule=Host:my.matrix.Host
- traefik.port=8448
db:
image: docker.io/postgres:10-alpine
# Change that password, of course!
environment:
- POSTGRES_USER=synapse
- POSTGRES_PASSWORD=changeme
volumes:
# You may store the database tables in a local folder..
- ./schemas:/var/lib/postgresql/data
# .. or store them on some high performance storage for better results
# - /path/to/ssd/storage:/var/lib/postfesql/data

66
contrib/docker/start.py Executable file
View File

@@ -0,0 +1,66 @@
#!/usr/local/bin/python
import jinja2
import os
import sys
import subprocess
import glob
# Utility functions
convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ))
def check_arguments(environ, args):
for argument in args:
if argument not in environ:
print("Environment variable %s is mandatory, exiting." % argument)
sys.exit(2)
def generate_secrets(environ, secrets):
for name, secret in secrets.items():
if secret not in environ:
filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
if os.path.exists(filename):
with open(filename) as handle: value = handle.read()
else:
print("Generating a random secret for {}".format(name))
value = os.urandom(32).encode("hex")
with open(filename, "w") as handle: handle.write(value)
environ[secret] = value
# Prepare the configuration
mode = sys.argv[1] if len(sys.argv) > 1 else None
environ = os.environ.copy()
ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
args = ["python", "-m", "synapse.app.homeserver"]
# In generate mode, generate a configuration, missing keys, then exit
if mode == "generate":
check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS", "SYNAPSE_CONFIG_PATH"))
args += [
"--server-name", environ["SYNAPSE_SERVER_NAME"],
"--report-stats", environ["SYNAPSE_REPORT_STATS"],
"--config-path", environ["SYNAPSE_CONFIG_PATH"],
"--generate-config"
]
os.execv("/usr/local/bin/python", args)
# In normal mode, generate missing keys if any, then run synapse
else:
# Parse the configuration file
if "SYNAPSE_CONFIG_PATH" in environ:
args += ["--config-path", environ["SYNAPSE_CONFIG_PATH"]]
else:
check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"))
generate_secrets(environ, {
"registration": "SYNAPSE_REGISTRATION_SHARED_SECRET",
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY"
})
environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
if not os.path.exists("/compiled"): os.mkdir("/compiled")
convert("/conf/homeserver.yaml", "/compiled/homeserver.yaml", environ)
convert("/conf/log.config", "/compiled/log.config", environ)
subprocess.check_output(["chown", "-R", ownership, "/data"])
args += ["--config-path", "/compiled/homeserver.yaml"]
# Generate missing keys and start synapse
subprocess.check_output(args + ["--generate-keys"])
os.execv("/sbin/su-exec", ["su-exec", ownership] + args)

View File

@@ -6,9 +6,19 @@
## Do not run it lightly.
set -e
if [ "$1" == "-h" ] || [ "$1" == "" ]; then
echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
echo "or"
echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
exit
fi
ROOMID="$1"
sqlite3 homeserver.db <<EOF
cat <<EOF
DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_edges WHERE room_id = '$ROOMID';
@@ -29,7 +39,7 @@ DELETE FROM state_groups WHERE room_id = '$ROOMID';
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
DELETE FROM event_search_content WHERE c1room_id = '$ROOMID';
DELETE FROM event_search WHERE room_id = '$ROOMID';
DELETE FROM guest_access WHERE room_id = '$ROOMID';
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
DELETE FROM room_tags WHERE room_id = '$ROOMID';

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.28.1"
__version__ = "0.29.0"

View File

@@ -74,6 +74,7 @@ class AppserviceServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -98,6 +98,7 @@ class ClientReaderServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -114,6 +114,7 @@ class EventCreatorServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -87,6 +87,7 @@ class FederationReaderServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -101,6 +101,7 @@ class FederationSenderServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -152,6 +152,7 @@ class FrontendProxyServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -140,6 +140,7 @@ class SynapseHomeServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
),
self.tls_server_context_factory,
)
@@ -153,6 +154,7 @@ class SynapseHomeServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)
logger.info("Synapse now listening on port %d", port)

View File

@@ -94,6 +94,7 @@ class MediaRepositoryServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -104,6 +104,7 @@ class PusherServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -281,6 +281,7 @@ class SynchrotronServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -126,6 +126,7 @@ class UserDirectoryServer(HomeServer):
site_tag,
listener_config,
root_resource,
self.version_string,
)
)

View File

@@ -74,8 +74,6 @@ class Transaction(JsonEncodedObject):
"previous_ids",
"pdus",
"edus",
"transaction_id",
"destination",
"pdu_failures",
]

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
# Copyright 2017, 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,9 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from twisted.internet import defer, reactor
from ._base import BaseHandler
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
import logging
@@ -27,6 +29,14 @@ class DeactivateAccountHandler(BaseHandler):
super(DeactivateAccountHandler, self).__init__(hs)
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
self._room_member_handler = hs.get_room_member_handler()
# Flag that indicates whether the process to part users from rooms is running
self._user_parter_running = False
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
reactor.callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id):
@@ -50,3 +60,70 @@ class DeactivateAccountHandler(BaseHandler):
yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users penpding deactivation (ie.
# removal from all the rooms they're a member of)
yield self.store.add_user_pending_deactivation(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
def _start_user_parting(self):
"""
Start the process that goes through the table of users
pending deactivation, if it isn't already running.
Returns:
None
"""
if not self._user_parter_running:
run_in_background(self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
"""Loop that parts deactivated users from rooms
Returns:
None
"""
self._user_parter_running = True
logger.info("Starting user parter")
try:
while True:
user_id = yield self.store.get_user_pending_deactivation()
if user_id is None:
break
logger.info("User parter parting %r", user_id)
yield self._part_user(user_id)
yield self.store.del_user_pending_deactivation(user_id)
logger.info("User parter finished parting %r", user_id)
logger.info("User parter finished: stopping")
finally:
self._user_parter_running = False
@defer.inlineCallbacks
def _part_user(self, user_id):
"""Causes the given user_id to leave all the rooms they're joined to
Returns:
None
"""
user = UserID.from_string(user_id)
rooms_for_user = yield self.store.get_rooms_for_user(user_id)
for room_id in rooms_for_user:
logger.info("User parter parting %r from %r", user_id, room_id)
try:
yield self._room_member_handler.update_membership(
create_requester(user),
user,
room_id,
"leave",
ratelimit=False,
)
except Exception:
logger.exception(
"Failed to part user %r from room %r: ignoring and continuing",
user_id, room_id,
)

View File

@@ -181,8 +181,8 @@ class InitialSyncHandler(BaseHandler):
self.store, user_id, messages
)
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
start_token = now_token.copy_and_replace("room_key", token)
end_token = now_token.copy_and_replace("room_key", room_end_token)
time_now = self.clock.time_msec()
d["messages"] = {
@@ -325,8 +325,8 @@ class InitialSyncHandler(BaseHandler):
self.store, user_id, messages, is_peeking=is_peeking
)
start_token = StreamToken.START.copy_and_replace("room_key", token[0])
end_token = StreamToken.START.copy_and_replace("room_key", token[1])
start_token = StreamToken.START.copy_and_replace("room_key", token)
end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
time_now = self.clock.time_msec()
@@ -408,8 +408,8 @@ class InitialSyncHandler(BaseHandler):
self.store, user_id, messages, is_peeking=is_peeking,
)
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
start_token = now_token.copy_and_replace("room_key", token)
end_token = now_token
time_now = self.clock.time_msec()

View File

@@ -354,12 +354,24 @@ class SyncHandler(object):
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
# If we have a since_key then we are trying to get any events
# that have happened since `since_key` up to `end_key`, so we
# can just use `get_room_events_stream_for_room`.
# Otherwise, we want to return the last N events in the room
# in toplogical ordering.
if since_key:
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
end_token=end_key,
)
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
@@ -429,7 +441,7 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
last_events, token = yield self.store.get_recent_events_for_room(
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1,
)

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.http.server import wrap_request_handler
from synapse.http.server import wrap_json_request_handler
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
@@ -42,14 +42,13 @@ class AdditionalResource(Resource):
Resource.__init__(self)
self._handler = handler
# these are required by the request_handler wrapper
self.version_string = hs.version_string
# required by the request_handler wrapper
self.clock = hs.get_clock()
def render(self, request):
self._async_render(request)
return NOT_DONE_YET
@wrap_request_handler
@wrap_json_request_handler
def _async_render(self, request):
return self._handler(request)

View File

@@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import synapse.metrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for("synapse.http.server")
# total number of responses served, split by method/servlet/tag
response_count = metrics.register_counter(
"response_count",
labels=["method", "servlet", "tag"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_requests",
"_response_time:count",
"_response_ru_utime:count",
"_response_ru_stime:count",
"_response_db_txn_count:count",
"_response_db_txn_duration:count",
)
)
)
requests_counter = metrics.register_counter(
"requests_received",
labels=["method", "servlet", ],
)
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
response_timer = metrics.register_counter(
"response_time_seconds",
labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_time:total",
),
)
response_ru_utime = metrics.register_counter(
"response_ru_utime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_utime:total",
),
)
response_ru_stime = metrics.register_counter(
"response_ru_stime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_stime:total",
),
)
response_db_txn_count = metrics.register_counter(
"response_db_txn_count", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_count:total",
),
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
response_db_txn_duration = metrics.register_counter(
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_duration:total",
),
)
# seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
)
class RequestMetrics(object):
def start(self, time_msec, name):
self.start = time_msec
self.start_context = LoggingContext.current_context()
self.name = name
def stop(self, time_msec, request):
context = LoggingContext.current_context()
tag = ""
if context:
tag = context.tag
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
context, self.start_context
)
return
outgoing_responses_counter.inc(request.method, str(request.code))
response_count.inc(request.method, self.name, tag)
response_timer.inc_by(
time_msec - self.start, request.method,
self.name, tag
)
ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by(
ru_utime, request.method, self.name, tag
)
response_ru_stime.inc_by(
ru_stime, request.method, self.name, tag
)
response_db_txn_count.inc_by(
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)

View File

@@ -18,6 +18,9 @@
from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
)
from synapse.http.request_metrics import (
requests_counter,
)
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import intern_dict
from synapse.util.metrics import Measure
@@ -41,178 +44,103 @@ import simplejson
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
# total number of responses served, split by method/servlet/tag
response_count = metrics.register_counter(
"response_count",
labels=["method", "servlet", "tag"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_requests",
"_response_time:count",
"_response_ru_utime:count",
"_response_ru_stime:count",
"_response_db_txn_count:count",
"_response_db_txn_duration:count",
)
)
)
def wrap_json_request_handler(h):
"""Wraps a request handler method with exception handling.
requests_counter = metrics.register_counter(
"requests_received",
labels=["method", "servlet", ],
)
Also adds logging as per wrap_request_handler_with_logging.
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
response_timer = metrics.register_counter(
"response_time_seconds",
labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_time:total",
),
)
response_ru_utime = metrics.register_counter(
"response_ru_utime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_utime:total",
),
)
response_ru_stime = metrics.register_counter(
"response_ru_stime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_stime:total",
),
)
response_db_txn_count = metrics.register_counter(
"response_db_txn_count", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_count:total",
),
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
response_db_txn_duration = metrics.register_counter(
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_duration:total",
),
)
# seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
)
_next_request_id = 0
def request_handler(include_metrics=False):
"""Decorator for ``wrap_request_handler``"""
return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
def wrap_request_handler(request_handler, include_metrics=False):
"""Wraps a method that acts as a request handler with the necessary logging
and exception handling.
The method must have a signature of "handle_foo(self, request)". The
argument "self" must have "version_string" and "clock" attributes. The
argument "request" must be a twisted HTTP request.
The method must return a deferred. If the deferred succeeds we assume that
The handler must return a deferred. If the deferred succeeds we assume that
a response has been sent. If the deferred fails with a SynapseError we use
it to send a JSON response with the appropriate HTTP reponse code. If the
deferred fails with any other type of error we send a 500 reponse.
We insert a unique request-id into the logging context for this request and
log the response and duration for this request.
"""
@defer.inlineCallbacks
def wrapped_request_handler(self, request):
global _next_request_id
request_id = "%s-%s" % (request.method, _next_request_id)
_next_request_id += 1
try:
yield h(self, request)
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
else:
logger.exception(e)
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
)
except Exception:
# failure.Failure() fishes the original Failure out
# of our stack, and thus gives us a sensible stack
# trace.
f = failure.Failure()
logger.error(
"Failed handle request via %r: %r: %s",
h,
request,
f.getTraceback().rstrip(),
)
respond_with_json(
request,
500,
{
"error": "Internal server error",
"errcode": Codes.UNKNOWN,
},
send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
)
return wrap_request_handler_with_logging(wrapped_request_handler)
def wrap_request_handler_with_logging(h):
"""Wraps a request handler to provide logging and metrics
The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
As well as calling `request.processing` (which will log the response and
duration for this request), the wrapped request handler will insert the
request id into the logging context.
"""
@defer.inlineCallbacks
def wrapped_request_handler(self, request):
"""
Args:
self:
request (synapse.http.site.SynapseRequest):
"""
request_id = request.get_request_id()
with LoggingContext(request_id) as request_context:
request_context.request = request_id
with Measure(self.clock, "wrapped_request_handler"):
request_metrics = RequestMetrics()
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = self.__class__.__name__
request_metrics.start(self.clock, name=servlet_name)
with request.processing(servlet_name):
with PreserveLoggingContext(request_context):
d = h(self, request)
request_context.request = request_id
with request.processing():
try:
with PreserveLoggingContext(request_context):
if include_metrics:
yield request_handler(self, request, request_metrics)
else:
requests_counter.inc(request.method, servlet_name)
yield request_handler(self, request)
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
else:
logger.exception(e)
outgoing_responses_counter.inc(request.method, str(code))
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except Exception:
# failure.Failure() fishes the original Failure out
# of our stack, and thus gives us a sensible stack
# trace.
f = failure.Failure()
logger.error(
"Failed handle request %s.%s on %r: %r: %s",
request_handler.__module__,
request_handler.__name__,
self,
request,
f.getTraceback().rstrip(),
)
respond_with_json(
request,
500,
{
"error": "Internal server error",
"errcode": Codes.UNKNOWN,
},
send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
finally:
try:
request_metrics.stop(
self.clock, request
)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.inc(request.method,
request.request_metrics.name)
yield d
return wrapped_request_handler
@@ -262,7 +190,6 @@ class JsonResource(HttpServer, resource.Resource):
self.canonical_json = canonical_json
self.clock = hs.get_clock()
self.path_regexs = {}
self.version_string = hs.version_string
self.hs = hs
def register_paths(self, method, path_patterns, callback):
@@ -278,13 +205,9 @@ class JsonResource(HttpServer, resource.Resource):
self._async_render(request)
return server.NOT_DONE_YET
# Disable metric reporting because _async_render does its own metrics.
# It does its own metric reporting because _async_render dispatches to
# a callback and it's the class name of that callback we want to report
# against rather than the JsonResource itself.
@request_handler(include_metrics=True)
@wrap_json_request_handler
@defer.inlineCallbacks
def _async_render(self, request, request_metrics):
def _async_render(self, request):
""" This gets called from render() every time someone sends us a request.
This checks if anyone has registered a callback for that method and
path.
@@ -296,9 +219,7 @@ class JsonResource(HttpServer, resource.Resource):
servlet_classname = servlet_instance.__class__.__name__
else:
servlet_classname = "%r" % callback
request_metrics.name = servlet_classname
requests_counter.inc(request.method, servlet_classname)
request.request_metrics.name = servlet_classname
# Now trigger the callback. If it returns a response, we send it
# here. If it throws an exception, that is handled by the wrapper
@@ -345,15 +266,12 @@ class JsonResource(HttpServer, resource.Resource):
def _send_response(self, request, code, response_json_object,
response_code_message=None):
outgoing_responses_counter.inc(request.method, str(code))
# TODO: Only enable CORS for the requests that need it.
respond_with_json(
request, code, response_json_object,
send_cors=True,
response_code_message=response_code_message,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
canonical_json=self.canonical_json,
)
@@ -386,54 +304,6 @@ def _unrecognised_request_handler(request):
raise UnrecognizedRequestError()
class RequestMetrics(object):
def start(self, clock, name):
self.start = clock.time_msec()
self.start_context = LoggingContext.current_context()
self.name = name
def stop(self, clock, request):
context = LoggingContext.current_context()
tag = ""
if context:
tag = context.tag
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
context, self.start_context
)
return
response_count.inc(request.method, self.name, tag)
response_timer.inc_by(
clock.time_msec() - self.start, request.method,
self.name, tag
)
ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by(
ru_utime, request.method, self.name, tag
)
response_ru_stime.inc_by(
ru_stime, request.method, self.name, tag
)
response_db_txn_count.inc_by(
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
class RootRedirect(resource.Resource):
"""Redirects the root '/' path to another path."""
@@ -452,7 +322,7 @@ class RootRedirect(resource.Resource):
def respond_with_json(request, code, json_object, send_cors=False,
response_code_message=None, pretty_print=False,
version_string="", canonical_json=True):
canonical_json=True):
# could alternatively use request.notifyFinish() and flip a flag when
# the Deferred fires, but since the flag is RIGHT THERE it seems like
# a waste.
@@ -474,12 +344,11 @@ def respond_with_json(request, code, json_object, send_cors=False,
request, code, json_bytes,
send_cors=send_cors,
response_code_message=response_code_message,
version_string=version_string
)
def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
version_string="", response_code_message=None):
response_code_message=None):
"""Sends encoded JSON in response to the given request.
Args:
@@ -493,7 +362,6 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
request.setResponseCode(code, message=response_code_message)
request.setHeader(b"Content-Type", b"application/json")
request.setHeader(b"Server", version_string)
request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")

View File

@@ -12,24 +12,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.logcontext import LoggingContext
from twisted.web.server import Site, Request
import contextlib
import logging
import re
import time
from twisted.web.server import Site, Request
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
_next_request_seq = 0
class SynapseRequest(Request):
"""Class which encapsulates an HTTP request to synapse.
All of the requests processed in synapse are of this type.
It extends twisted's twisted.web.server.Request, and adds:
* Unique request ID
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
It provides a method `processing` which should be called by the Resource
which is handling the request, and returns a context manager.
"""
def __init__(self, site, *args, **kw):
Request.__init__(self, *args, **kw)
self.site = site
self.authenticated_entity = None
self.start_time = 0
global _next_request_seq
self.request_seq = _next_request_seq
_next_request_seq += 1
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
@@ -41,6 +65,9 @@ class SynapseRequest(Request):
self.site.site_tag,
)
def get_request_id(self):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
@@ -50,7 +77,16 @@ class SynapseRequest(Request):
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def started_processing(self):
def render(self, resrc):
# override the Server header which is set by twisted
self.setHeader("Server", self.site.server_version_string)
return Request.render(self, resrc)
def _started_processing(self, servlet_name):
self.start_time = int(time.time() * 1000)
self.request_metrics = RequestMetrics()
self.request_metrics.start(self.start_time, name=servlet_name)
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
@@ -58,10 +94,8 @@ class SynapseRequest(Request):
self.method,
self.get_redacted_uri()
)
self.start_time = int(time.time() * 1000)
def finished_processing(self):
def _finished_processing(self):
try:
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
@@ -72,6 +106,8 @@ class SynapseRequest(Request):
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_ms = (0, 0)
end_time = int(time.time() * 1000)
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
@@ -79,7 +115,7 @@ class SynapseRequest(Request):
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
int(time.time() * 1000) - self.start_time,
end_time - self.start_time,
int(ru_utime * 1000),
int(ru_stime * 1000),
db_sched_duration_ms,
@@ -93,11 +129,38 @@ class SynapseRequest(Request):
self.get_user_agent(),
)
try:
self.request_metrics.stop(end_time, self)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
@contextlib.contextmanager
def processing(self):
self.started_processing()
def processing(self, servlet_name):
"""Record the fact that we are processing this request.
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
This will log the request's arrival. Once the context manager is
closed, the completion of the request will be logged, and the various
metrics will be updated.
Args:
servlet_name (str): the name of the servlet which will be
processing this request. This is used in the metrics.
It is possible to update this afterwards by updating
self.request_metrics.servlet_name.
"""
# TODO: we should probably just move this into render() and finish(),
# to save having to call a separate method.
self._started_processing(servlet_name)
yield
self.finished_processing()
self._finished_processing()
class XForwardedForRequest(SynapseRequest):
@@ -135,7 +198,8 @@ class SynapseSite(Site):
Subclass of a twisted http Site that does access logging with python's
standard logging
"""
def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
def __init__(self, logger_name, site_tag, config, resource,
server_version_string, *args, **kwargs):
Site.__init__(self, resource, *args, **kwargs)
self.site_tag = site_tag
@@ -143,6 +207,7 @@ class SynapseSite(Site):
proxied = config.get("x_forwarded", False)
self.requestFactory = SynapseRequestFactory(self, proxied)
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string
def log(self, request):
pass

View File

@@ -71,7 +71,8 @@ class BaseMetric(object):
"""Render this metric for a single set of labels
Args:
label_values (list[str]): values for each of the labels
label_values (list[object]): values for each of the labels,
(which get stringified).
value: value of the metric at with these labels
Returns:
@@ -324,4 +325,4 @@ def _escape_character(m):
def _escape_label_value(value):
"""Takes a label value and escapes quotes, newlines and backslashes
"""
return re.sub(r"([\n\"\\])", _escape_character, value)
return re.sub(r"([\n\"\\])", _escape_character, str(value))

View File

@@ -176,7 +176,6 @@ class PushersRemoveRestServlet(RestServlet):
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Server", self.hs.version_string)
request.setHeader(b"Content-Length", b"%d" % (
len(PushersRemoveRestServlet.SUCCESS_HTML),
))

View File

@@ -129,7 +129,6 @@ class AuthRestServlet(RestServlet):
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Server", self.hs.version_string)
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)
@@ -175,7 +174,6 @@ class AuthRestServlet(RestServlet):
html_bytes = html.encode("utf8")
request.setResponseCode(200)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Server", self.hs.version_string)
request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
request.write(html_bytes)

View File

@@ -88,7 +88,7 @@ class NotificationsServlet(RestServlet):
pa["topological_ordering"], pa["stream_ordering"]
)
returned_push_actions.append(returned_pa)
next_token = pa["stream_ordering"]
next_token = str(pa["stream_ordering"])
defer.returnValue((200, {
"notifications": returned_push_actions,

View File

@@ -49,7 +49,6 @@ class LocalKey(Resource):
"""
def __init__(self, hs):
self.version_string = hs.version_string
self.response_body = encode_canonical_json(
self.response_json_object(hs.config)
)
@@ -84,7 +83,6 @@ class LocalKey(Resource):
def render_GET(self, request):
return respond_with_json_bytes(
request, 200, self.response_body,
version_string=self.version_string
)
def getChild(self, name, request):

View File

@@ -63,7 +63,6 @@ class LocalKey(Resource):
isLeaf = True
def __init__(self, hs):
self.version_string = hs.version_string
self.config = hs.config
self.clock = hs.clock
self.update_response_body(self.clock.time_msec())
@@ -115,5 +114,4 @@ class LocalKey(Resource):
self.update_response_body(time_now)
return respond_with_json_bytes(
request, 200, self.response_body,
version_string=self.version_string
)

View File

@@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.http.server import request_handler, respond_with_json_bytes
from synapse.http.server import (
respond_with_json_bytes, wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_json_object_from_request
from synapse.api.errors import SynapseError, Codes
from synapse.crypto.keyring import KeyLookupError
@@ -91,7 +93,6 @@ class RemoteKey(Resource):
def __init__(self, hs):
self.keyring = hs.get_keyring()
self.store = hs.get_datastore()
self.version_string = hs.version_string
self.clock = hs.get_clock()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
@@ -99,7 +100,7 @@ class RemoteKey(Resource):
self.async_render_GET(request)
return NOT_DONE_YET
@request_handler()
@wrap_json_request_handler
@defer.inlineCallbacks
def async_render_GET(self, request):
if len(request.postpath) == 1:
@@ -124,7 +125,7 @@ class RemoteKey(Resource):
self.async_render_POST(request)
return NOT_DONE_YET
@request_handler()
@wrap_json_request_handler
@defer.inlineCallbacks
def async_render_POST(self, request):
content = parse_json_object_from_request(request)
@@ -240,5 +241,4 @@ class RemoteKey(Resource):
respond_with_json_bytes(
request, 200, result_io.getvalue(),
version_string=self.version_string
)

View File

@@ -12,17 +12,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.http.servlet
from ._base import parse_media_id, respond_404
from twisted.web.resource import Resource
from synapse.http.server import request_handler, set_cors_headers
from twisted.web.server import NOT_DONE_YET
from twisted.internet import defer
import logging
from twisted.internet import defer
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from synapse.http.server import (
set_cors_headers,
wrap_json_request_handler,
)
import synapse.http.servlet
from ._base import parse_media_id, respond_404
logger = logging.getLogger(__name__)
@@ -35,15 +37,14 @@ class DownloadResource(Resource):
self.media_repo = media_repo
self.server_name = hs.hostname
# Both of these are expected by @request_handler()
# this is expected by @wrap_json_request_handler
self.clock = hs.get_clock()
self.version_string = hs.version_string
def render_GET(self, request):
self._async_render_GET(request)
return NOT_DONE_YET
@request_handler()
@wrap_json_request_handler
@defer.inlineCallbacks
def _async_render_GET(self, request):
set_cors_headers(request)

View File

@@ -40,8 +40,9 @@ from synapse.util.stringutils import random_string
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.http.client import SpiderHttpClient
from synapse.http.server import (
request_handler, respond_with_json_bytes,
respond_with_json_bytes,
respond_with_json,
wrap_json_request_handler,
)
from synapse.util.async import ObservableDeferred
from synapse.util.stringutils import is_ascii
@@ -57,7 +58,6 @@ class PreviewUrlResource(Resource):
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.version_string = hs.version_string
self.filepaths = media_repo.filepaths
self.max_spider_size = hs.config.max_spider_size
self.server_name = hs.hostname
@@ -90,7 +90,7 @@ class PreviewUrlResource(Resource):
self._async_render_GET(request)
return NOT_DONE_YET
@request_handler()
@wrap_json_request_handler
@defer.inlineCallbacks
def _async_render_GET(self, request):

View File

@@ -14,18 +14,21 @@
# limitations under the License.
import logging
from twisted.internet import defer
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from synapse.http.server import (
set_cors_headers,
wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_string
from ._base import (
parse_media_id, respond_404, respond_with_file, FileInfo,
FileInfo, parse_media_id, respond_404, respond_with_file,
respond_with_responder,
)
from twisted.web.resource import Resource
from synapse.http.servlet import parse_string, parse_integer
from synapse.http.server import request_handler, set_cors_headers
from twisted.web.server import NOT_DONE_YET
from twisted.internet import defer
import logging
logger = logging.getLogger(__name__)
@@ -41,14 +44,13 @@ class ThumbnailResource(Resource):
self.media_storage = media_storage
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
self.server_name = hs.hostname
self.version_string = hs.version_string
self.clock = hs.get_clock()
def render_GET(self, request):
self._async_render_GET(request)
return NOT_DONE_YET
@request_handler()
@wrap_json_request_handler
@defer.inlineCallbacks
def _async_render_GET(self, request):
set_cors_headers(request)

View File

@@ -13,16 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.http.server import respond_with_json, request_handler
import logging
from twisted.internet import defer
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import SynapseError
from twisted.web.server import NOT_DONE_YET
from twisted.internet import defer
from twisted.web.resource import Resource
import logging
from synapse.http.server import (
respond_with_json,
wrap_json_request_handler,
)
logger = logging.getLogger(__name__)
@@ -40,7 +41,6 @@ class UploadResource(Resource):
self.server_name = hs.hostname
self.auth = hs.get_auth()
self.max_upload_size = hs.config.max_upload_size
self.version_string = hs.version_string
self.clock = hs.get_clock()
def render_POST(self, request):
@@ -51,7 +51,7 @@ class UploadResource(Resource):
respond_with_json(request, 200, {}, send_cors=True)
return NOT_DONE_YET
@request_handler()
@wrap_json_request_handler
@defer.inlineCallbacks
def _async_render_POST(self, request):
requester = yield self.auth.get_user_by_req(request)

View File

@@ -15,7 +15,7 @@
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
from .sqlite3_engine import Sqlite3Engine
from .sqlite3 import Sqlite3Engine
import importlib
import platform

View File

@@ -15,7 +15,6 @@
from synapse.storage.prepare_database import prepare_database
import sqlite3
import struct
import threading
@@ -26,11 +25,6 @@ class Sqlite3Engine(object):
def __init__(self, database_module, database_config):
self.module = database_module
if sqlite3.sqlite_version_info < (3, 15, 0):
raise RuntimeError(
"SQLite3 version is too old, Synapse requires 3.15 or later",
)
# The current max state_group, or None if we haven't looked
# in the DB yet.
self._current_state_group_id = None

View File

@@ -526,3 +526,42 @@ class RegistrationStore(RegistrationWorkerStore,
except self.database_engine.module.IntegrityError:
ret = yield self.get_3pid_guest_access_token(medium, address)
defer.returnValue(ret)
def add_user_pending_deactivation(self, user_id):
"""
Adds a user to the table of users who need to be parted from all the rooms they're
in
"""
return self._simple_insert(
"users_pending_deactivation",
values={
"user_id": user_id,
},
desc="add_user_pending_deactivation",
)
def del_user_pending_deactivation(self, user_id):
"""
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
return self._simple_delete_one(
"users_pending_deactivation",
keyvalues={
"user_id": user_id,
},
desc="del_user_pending_deactivation",
)
def get_user_pending_deactivation(self):
"""
Gets one user from the table of users waiting to be parted from all the rooms
they're in.
"""
return self._simple_select_one_onecol(
"users_pending_deactivation",
keyvalues={},
retcol="user_id",
allow_none=True,
desc="get_users_pending_deactivation",
)

View File

@@ -0,0 +1,25 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Store any accounts that have been requested to be deactivated.
* We part the account from all the rooms its in when its
* deactivated. This can take some time and synapse may be restarted
* before it completes, so store the user IDs here until the process
* is complete.
*/
CREATE TABLE users_pending_deactivation (
user_id TEXT NOT NULL
);

View File

@@ -38,16 +38,16 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.engines import PostgresEngine
import abc
import logging
from six.moves import range
from collections import namedtuple
logger = logging.getLogger(__name__)
@@ -60,6 +60,12 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
# Used as return values for pagination APIs
_EventDictReturn = namedtuple("_EventDictReturn", (
"event_id", "topological_ordering", "stream_ordering",
))
def lower_bound(token, engine, inclusive=False):
inclusive = "=" if inclusive else ""
if token.topological is None:
@@ -227,54 +233,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
# Note: If from_key is None then we return in topological order. This
# is because in that case we're using this as a "get the last few messages
# in a room" function, rather than "get new messages since last sync"
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
from_id = None
to_id = RoomStreamToken.parse_stream_token(to_key).stream
"""Get new room events in stream ordering since `from_key`.
Args:
room_id (str)
from_key (str): Token from which no events are returned before
to_key (str): Token from which no events are returned after. (This
is typically the current stream token)
limit (int): Maximum number of events to return
order (str): Either "DESC" or "ASC". Determines which events are
returned when the result is limited. If "DESC" then the most
recent `limit` events are returned, otherwise returns the
oldest `limit` events.
Returns:
Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
events (in ascending order) and the token from the start of
the chunk of events returned.
"""
if from_key == to_key:
defer.returnValue(([], from_key))
if from_id:
has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id
)
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
if not has_changed:
defer.returnValue(([], from_key))
has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id
)
if not has_changed:
defer.returnValue(([], from_key))
def f(txn):
if from_id is not None:
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))
else:
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
" ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))
rows = self.cursor_to_dict(txn)
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
return rows
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
ret = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)
@@ -284,7 +291,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ret.reverse()
if rows:
key = "s%d" % min(r["stream_ordering"] for r in rows)
key = "s%d" % min(r.stream_ordering for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -294,10 +301,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_membership_changes_for_user(self, user_id, from_key, to_key):
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
from_id = None
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
if from_key == to_key:
@@ -311,34 +315,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue([])
def f(txn):
if from_id is not None:
sql = (
"SELECT m.event_id, stream_ordering FROM events AS e,"
" room_memberships AS m"
" WHERE e.event_id = m.event_id"
" AND m.user_id = ?"
" AND e.stream_ordering > ? AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
)
txn.execute(sql, (user_id, from_id, to_id,))
else:
sql = (
"SELECT m.event_id, stream_ordering FROM events AS e,"
" room_memberships AS m"
" WHERE e.event_id = m.event_id"
" AND m.user_id = ?"
" AND stream_ordering <= ?"
" ORDER BY stream_ordering ASC"
)
txn.execute(sql, (user_id, to_id,))
rows = self.cursor_to_dict(txn)
sql = (
"SELECT m.event_id, stream_ordering FROM events AS e,"
" room_memberships AS m"
" WHERE e.event_id = m.event_id"
" AND m.user_id = ?"
" AND e.stream_ordering > ? AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
)
txn.execute(sql, (user_id, from_id, to_id,))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
return rows
rows = yield self.runInteraction("get_membership_changes_for_user", f)
ret = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)
@@ -347,14 +341,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
def get_recent_events_for_room(self, room_id, limit, end_token):
"""Get the most recent events in the room in topological ordering.
Args:
room_id (str)
limit (int)
end_token (str): The stream token representing now.
Returns:
Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
events and a token pointing to the start of the returned
events.
The events returned are in ascending order.
"""
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
room_id, limit, end_token,
)
logger.debug("stream before")
events = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)
logger.debug("stream after")
@@ -363,61 +371,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token))
@cached(num_args=4)
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token)
@defer.inlineCallbacks
def get_recent_event_ids_for_room(self, room_id, limit, end_token):
"""Get the most recent events in the room in topological ordering.
if from_token is None:
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
else:
from_token = RoomStreamToken.parse_stream_token(from_token)
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering > ?"
" AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
Args:
room_id (str)
limit (int)
end_token (str): The stream token representing now.
def get_recent_events_for_room_txn(txn):
if from_token is None:
txn.execute(sql, (room_id, end_token.stream, False, limit,))
else:
txn.execute(sql, (
room_id, from_token.stream, end_token.stream, False, limit
))
Returns:
Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
_EventDictReturn and a token pointing to the start of the returned
events.
The events returned are in ascending order.
"""
# Allow a zero limit here, and no-op.
if limit == 0:
defer.returnValue(([], end_token))
rows = self.cursor_to_dict(txn)
end_token = RoomStreamToken.parse(end_token)
rows.reverse() # As we selected with reverse ordering
if rows:
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# since we are going backwards so we subtract one from the
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
start_token = str(RoomStreamToken(topo, toke))
token = (start_token, str(end_token))
else:
token = (str(end_token), str(end_token))
return rows, token
return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
rows, token = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)
# We want to return the results in ascending order.
rows.reverse()
defer.returnValue((rows, token))
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
@@ -520,10 +504,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@staticmethod
def _set_before_and_after(events, rows, topo_order=True):
"""Inserts ordering information to events' internal metadata from
the DB rows.
Args:
events (list[FrozenEvent])
rows (list[_EventDictReturn])
topo_order (bool): Whether the events were ordered topologically
or by stream ordering. If true then all rows should have a non
null topological_ordering.
"""
for event, row in zip(events, rows):
stream = row["stream_ordering"]
if topo_order:
topo = event.depth
stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
@@ -595,87 +589,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
retcols=["stream_ordering", "topological_ordering"],
)
token = RoomStreamToken(
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1,
results["stream_ordering"],
)
after_token = RoomStreamToken(
results["topological_ordering"],
results["stream_ordering"],
)
if isinstance(self.database_engine, Sqlite3Engine):
# SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
# So we give pass it to SQLite3 as the UNION ALL of the two queries.
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r.event_id for r in rows]
query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering < ?"
" UNION ALL"
" SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
)
before_args = (
room_id, token.topological,
room_id, token.topological, token.stream,
before_limit,
)
query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering > ?"
" UNION ALL"
" SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
)
after_args = (
room_id, token.topological,
room_id, token.topological, token.stream,
after_limit,
)
else:
query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND %s"
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
) % (upper_bound(token, self.database_engine, inclusive=False),)
before_args = (room_id, before_limit)
query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND %s"
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
) % (lower_bound(token, self.database_engine, inclusive=False),)
after_args = (room_id, after_limit)
txn.execute(query_before, before_args)
rows = self.cursor_to_dict(txn)
events_before = [r["event_id"] for r in rows]
if rows:
start_token = str(RoomStreamToken(
rows[0]["topological_ordering"],
rows[0]["stream_ordering"] - 1,
))
else:
start_token = str(RoomStreamToken(
token.topological,
token.stream - 1,
))
txn.execute(query_after, after_args)
rows = self.cursor_to_dict(txn)
events_after = [r["event_id"] for r in rows]
if rows:
end_token = str(RoomStreamToken(
rows[-1]["topological_ordering"],
rows[-1]["stream_ordering"],
))
else:
end_token = str(token)
rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r.event_id for r in rows]
return {
"before": {
@@ -738,17 +672,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
Args:
txn
room_id (str)
from_token (RoomStreamToken): The token used to stream from
to_token (RoomStreamToken|None): A token which if given limits the
results to only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
Returns:
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
as a list of _EventDictReturn and a token that points to the end
of the result set.
"""
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@@ -756,20 +701,20 @@ class StreamStore(StreamWorkerStore):
if direction == 'b':
order = "DESC"
bounds = upper_bound(
RoomStreamToken.parse(from_key), self.database_engine
from_token, self.database_engine
)
if to_key:
if to_token:
bounds = "%s AND %s" % (bounds, lower_bound(
RoomStreamToken.parse(to_key), self.database_engine
to_token, self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
RoomStreamToken.parse(from_key), self.database_engine
from_token, self.database_engine
)
if to_key:
if to_token:
bounds = "%s AND %s" % (bounds, upper_bound(
RoomStreamToken.parse(to_key), self.database_engine
to_token, self.database_engine
))
filter_clause, filter_args = filter_to_clause(event_filter)
@@ -785,7 +730,8 @@ class StreamStore(StreamWorkerStore):
limit_str = ""
sql = (
"SELECT * FROM events"
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
@@ -795,35 +741,72 @@ class StreamStore(StreamWorkerStore):
"limit": limit_str
}
def f(txn):
txn.execute(sql, args)
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = str(RoomStreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
if rows:
topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering
if direction == 'b':
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, next_token,
return rows, str(next_token),
rows, token = yield self.runInteraction("paginate_room_events", f)
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1, event_filter=None):
"""Returns list of events before or after a given token.
Args:
room_id (str)
from_key (str): The token used to stream from
to_key (str|None): A token which if given limits the results to
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "topological_ordering" and "stream_orderign".
"""
from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
)
events = yield self._get_events(
[r["event_id"] for r in rows],
[r.event_id for r in rows],
get_prev_content=True
)
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
def get_room_min_stream_ordering(self):
return self._backfill_id_gen.get_current_token()