1
0

Merge remote-tracking branch 'origin/master' into uhoreg/dehydration_release

This commit is contained in:
Hubert Chathi
2020-09-03 16:38:20 -04:00
169 changed files with 2924 additions and 2515 deletions
+2 -4
View File
@@ -4,18 +4,16 @@ jobs:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:${CIRCLE_TAG} -t matrixdotorg/synapse:${CIRCLE_TAG}-py3 .
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:${CIRCLE_TAG} .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker push matrixdotorg/synapse:${CIRCLE_TAG}
- run: docker push matrixdotorg/synapse:${CIRCLE_TAG}-py3
dockerhubuploadlatest:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:latest -t matrixdotorg/synapse:latest-py3 .
- run: docker build -f docker/Dockerfile --label gitsha1=${CIRCLE_SHA1} -t matrixdotorg/synapse:latest .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker push matrixdotorg/synapse:latest
- run: docker push matrixdotorg/synapse:latest-py3
workflows:
version: 2
+90
View File
@@ -1,3 +1,93 @@
Synapse 1.19.1 (2020-08-27)
===========================
No significant changes.
Synapse 1.19.1rc1 (2020-08-25)
==============================
Bugfixes
--------
- Fix a bug introduced in v1.19.0 where appservices with ratelimiting disabled would still be ratelimited when joining rooms. ([\#8139](https://github.com/matrix-org/synapse/issues/8139))
- Fix a bug introduced in v1.19.0 that would cause e.g. profile updates to fail due to incorrect application of rate limits on join requests. ([\#8153](https://github.com/matrix-org/synapse/issues/8153))
Synapse 1.19.0 (2020-08-17)
===========================
No significant changes since 1.19.0rc1.
Removal warning
---------------
As outlined in the [previous release](https://github.com/matrix-org/synapse/releases/tag/v1.18.0), we are no longer publishing Docker images with the `-py3` tag suffix. On top of that, we have also removed the `latest-py3` tag. Please see [the announcement in the upgrade notes for 1.18.0](https://github.com/matrix-org/synapse/blob/develop/UPGRADE.rst#upgrading-to-v1180).
Synapse 1.19.0rc1 (2020-08-13)
==============================
Features
--------
- Add option to allow server admins to join rooms which fail complexity checks. Contributed by @lugino-emeritus. ([\#7902](https://github.com/matrix-org/synapse/issues/7902))
- Add an option to purge room or not with delete room admin endpoint (`POST /_synapse/admin/v1/rooms/<room_id>/delete`). Contributed by @dklimpel. ([\#7964](https://github.com/matrix-org/synapse/issues/7964))
- Add rate limiting to users joining rooms. ([\#8008](https://github.com/matrix-org/synapse/issues/8008))
- Add a `/health` endpoint to every configured HTTP listener that can be used as a health check endpoint by load balancers. ([\#8048](https://github.com/matrix-org/synapse/issues/8048))
- Allow login to be blocked based on the values of SAML attributes. ([\#8052](https://github.com/matrix-org/synapse/issues/8052))
- Allow guest access to the `GET /_matrix/client/r0/rooms/{room_id}/members` endpoint, according to MSC2689. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#7314](https://github.com/matrix-org/synapse/issues/7314))
Bugfixes
--------
- Fix a bug introduced in Synapse v1.7.2 which caused inaccurate membership counts in the room directory. ([\#7977](https://github.com/matrix-org/synapse/issues/7977))
- Fix a long standing bug: 'Duplicate key value violates unique constraint "event_relations_id"' when message retention is configured. ([\#7978](https://github.com/matrix-org/synapse/issues/7978))
- Fix "no create event in auth events" when trying to reject invitation after inviter leaves. Bug introduced in Synapse v1.10.0. ([\#7980](https://github.com/matrix-org/synapse/issues/7980))
- Fix various comments and minor discrepencies in server notices code. ([\#7996](https://github.com/matrix-org/synapse/issues/7996))
- Fix a long standing bug where HTTP HEAD requests resulted in a 400 error. ([\#7999](https://github.com/matrix-org/synapse/issues/7999))
- Fix a long-standing bug which caused two copies of some log lines to be written when synctl was used along with a MemoryHandler logger. ([\#8011](https://github.com/matrix-org/synapse/issues/8011), [\#8012](https://github.com/matrix-org/synapse/issues/8012))
Updates to the Docker image
---------------------------
- We no longer publish Docker images with the `-py3` tag suffix, as [announced in the upgrade notes](https://github.com/matrix-org/synapse/blob/develop/UPGRADE.rst#upgrading-to-v1180). ([\#8056](https://github.com/matrix-org/synapse/issues/8056))
Improved Documentation
----------------------
- Document how to set up a client .well-known file and fix several pieces of outdated documentation. ([\#7899](https://github.com/matrix-org/synapse/issues/7899))
- Improve workers docs. ([\#7990](https://github.com/matrix-org/synapse/issues/7990), [\#8000](https://github.com/matrix-org/synapse/issues/8000))
- Fix typo in `docs/workers.md`. ([\#7992](https://github.com/matrix-org/synapse/issues/7992))
- Add documentation for how to undo a room shutdown. ([\#7998](https://github.com/matrix-org/synapse/issues/7998), [\#8010](https://github.com/matrix-org/synapse/issues/8010))
Internal Changes
----------------
- Reduce the amount of whitespace in JSON stored and sent in responses. Contributed by David Vo. ([\#7372](https://github.com/matrix-org/synapse/issues/7372))
- Switch to the JSON implementation from the standard library and bump the minimum version of the canonicaljson library to 1.2.0. ([\#7936](https://github.com/matrix-org/synapse/issues/7936), [\#7979](https://github.com/matrix-org/synapse/issues/7979))
- Convert various parts of the codebase to async/await. ([\#7947](https://github.com/matrix-org/synapse/issues/7947), [\#7948](https://github.com/matrix-org/synapse/issues/7948), [\#7949](https://github.com/matrix-org/synapse/issues/7949), [\#7951](https://github.com/matrix-org/synapse/issues/7951), [\#7963](https://github.com/matrix-org/synapse/issues/7963), [\#7973](https://github.com/matrix-org/synapse/issues/7973), [\#7975](https://github.com/matrix-org/synapse/issues/7975), [\#7976](https://github.com/matrix-org/synapse/issues/7976), [\#7981](https://github.com/matrix-org/synapse/issues/7981), [\#7987](https://github.com/matrix-org/synapse/issues/7987), [\#7989](https://github.com/matrix-org/synapse/issues/7989), [\#8003](https://github.com/matrix-org/synapse/issues/8003), [\#8014](https://github.com/matrix-org/synapse/issues/8014), [\#8016](https://github.com/matrix-org/synapse/issues/8016), [\#8027](https://github.com/matrix-org/synapse/issues/8027), [\#8031](https://github.com/matrix-org/synapse/issues/8031), [\#8032](https://github.com/matrix-org/synapse/issues/8032), [\#8035](https://github.com/matrix-org/synapse/issues/8035), [\#8042](https://github.com/matrix-org/synapse/issues/8042), [\#8044](https://github.com/matrix-org/synapse/issues/8044), [\#8045](https://github.com/matrix-org/synapse/issues/8045), [\#8061](https://github.com/matrix-org/synapse/issues/8061), [\#8062](https://github.com/matrix-org/synapse/issues/8062), [\#8063](https://github.com/matrix-org/synapse/issues/8063), [\#8066](https://github.com/matrix-org/synapse/issues/8066), [\#8069](https://github.com/matrix-org/synapse/issues/8069), [\#8070](https://github.com/matrix-org/synapse/issues/8070))
- Move some database-related log lines from the default logger to the database/transaction loggers. ([\#7952](https://github.com/matrix-org/synapse/issues/7952))
- Add a script to detect source code files using non-unix line terminators. ([\#7965](https://github.com/matrix-org/synapse/issues/7965), [\#7970](https://github.com/matrix-org/synapse/issues/7970))
- Log the SAML session ID during creation. ([\#7971](https://github.com/matrix-org/synapse/issues/7971))
- Implement new experimental push rules for some users. ([\#7997](https://github.com/matrix-org/synapse/issues/7997))
- Remove redundant and unreliable signature check for v1 Identity Service lookup responses. ([\#8001](https://github.com/matrix-org/synapse/issues/8001))
- Improve the performance of the register endpoint. ([\#8009](https://github.com/matrix-org/synapse/issues/8009))
- Reduce less useful output in the newsfragment CI step. Add a link to the changelog section of the contributing guide on error. ([\#8024](https://github.com/matrix-org/synapse/issues/8024))
- Rename storage layer objects to be more sensible. ([\#8033](https://github.com/matrix-org/synapse/issues/8033))
- Change the default log config to reduce disk I/O and storage for new servers. ([\#8040](https://github.com/matrix-org/synapse/issues/8040))
- Add an assertion on `prev_events` in `create_new_client_event`. ([\#8041](https://github.com/matrix-org/synapse/issues/8041))
- Add a comment to `ServerContextFactory` about the use of `SSLv23_METHOD`. ([\#8043](https://github.com/matrix-org/synapse/issues/8043))
- Log `OPTIONS` requests at `DEBUG` rather than `INFO` level to reduce amount logged at `INFO`. ([\#8049](https://github.com/matrix-org/synapse/issues/8049))
- Reduce amount of outbound request logging at `INFO` level. ([\#8050](https://github.com/matrix-org/synapse/issues/8050))
- It is no longer necessary to explicitly define `filters` in the logging configuration. (Continuing to do so is redundant but harmless.) ([\#8051](https://github.com/matrix-org/synapse/issues/8051))
- Add and improve type hints. ([\#8058](https://github.com/matrix-org/synapse/issues/8058), [\#8064](https://github.com/matrix-org/synapse/issues/8064), [\#8060](https://github.com/matrix-org/synapse/issues/8060), [\#8067](https://github.com/matrix-org/synapse/issues/8067))
Synapse 1.18.0 (2020-07-30)
===========================
-1
View File
@@ -1 +0,0 @@
Allow guest access to the `GET /_matrix/client/r0/rooms/{room_id}/members` endpoint, according to MSC2689. Contributed by Awesome Technologies Innovationslabor GmbH.
-1
View File
@@ -1 +0,0 @@
Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).
-1
View File
@@ -1 +0,0 @@
Document how to set up a Client Well-Known file and fix several pieces of outdated documentation.
-1
View File
@@ -1 +0,0 @@
Add option to allow server admins to join rooms which fail complexity checks. Contributed by @lugino-emeritus.
-1
View File
@@ -1 +0,0 @@
Switch to the JSON implementation from the standard library and bump the minimum version of the canonicaljson library to 1.2.0.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Move some database-related log lines from the default logger to the database/transaction loggers.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Add an option to purge room or not with delete room admin endpoint (`POST /_synapse/admin/v1/rooms/<room_id>/delete`). Contributed by @dklimpel.
-1
View File
@@ -1 +0,0 @@
Add a script to detect source code files using non-unix line terminators.
-1
View File
@@ -1 +0,0 @@
Add a script to detect source code files using non-unix line terminators.
-1
View File
@@ -1 +0,0 @@
Log the SAML session ID during creation.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Fix a bug introduced in Synapse v1.7.2 which caused inaccurate membership counts in the room directory.
-1
View File
@@ -1 +0,0 @@
Fix a long standing bug: 'Duplicate key value violates unique constraint "event_relations_id"' when message retention is configured.
-1
View File
@@ -1 +0,0 @@
Switch to the JSON implementation from the standard library and bump the minimum version of the canonicaljson library to 1.2.0.
-1
View File
@@ -1 +0,0 @@
Fix "no create event in auth events" when trying to reject invitation after inviter leaves. Bug introduced in Synapse v1.10.0.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Improve workers docs.
-1
View File
@@ -1 +0,0 @@
Fix typo in `docs/workers.md`.
-1
View File
@@ -1 +0,0 @@
Fix various comments and minor discrepencies in server notices code.
-1
View File
@@ -1 +0,0 @@
Add documentation for how to undo a room shutdown.
-1
View File
@@ -1 +0,0 @@
Fix a long standing bug where HTTP HEAD requests resulted in a 400 error.
-1
View File
@@ -1 +0,0 @@
Remove redundant and unreliable signature check for v1 Identity Service lookup responses.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Add rate limiting to users joining rooms.
-1
View File
@@ -1 +0,0 @@
Fix a long-standing bug which caused two copies of some log lines to be written when synctl was used along with a MemoryHandler logger.
-1
View File
@@ -1 +0,0 @@
Fix a long-standing bug which caused two copies of some log lines to be written when synctl was used along with a MemoryHandler logger.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Reduce less useful output in the newsfragment CI step. Add a link to the changelog section of the contributing guide on error.
-1
View File
@@ -1 +0,0 @@
Convert various parts of the codebase to async/await.
-1
View File
@@ -1 +0,0 @@
Rename storage layer objects to be more sensible.
+9 -3
View File
@@ -1,12 +1,18 @@
matrix-synapse-py3 (1.xx.0) stable; urgency=medium
matrix-synapse-py3 (1.19.1) stable; urgency=medium
* New synapse release 1.19.1.
-- Synapse Packaging team <packages@matrix.org> Thu, 27 Aug 2020 10:50:19 +0100
matrix-synapse-py3 (1.19.0) stable; urgency=medium
[ Synapse Packaging team ]
* New synapse release 1.xx.0.
* New synapse release 1.19.0.
[ Aaron Raimist ]
* Fix outdated documentation for SYNAPSE_CACHE_FACTOR
-- Synapse Packaging team <packages@matrix.org> XXXXX
-- Synapse Packaging team <packages@matrix.org> Mon, 17 Aug 2020 14:06:42 +0100
matrix-synapse-py3 (1.18.0) stable; urgency=medium
-6
View File
@@ -4,16 +4,10 @@ formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
console:
class: logging.StreamHandler
formatter: precise
filters: [context]
loggers:
synapse.storage.SQL:
+10 -3
View File
@@ -79,13 +79,20 @@ Response:
the structure can and does change without notice.
First, it's important to understand that a room shutdown is very destructive. Undoing a shutdown is not as simple as pretending it
never happened - work has to be done to move forward instead of resetting the past.
never happened - work has to be done to move forward instead of resetting the past. In fact, in some cases it might not be possible
to recover at all:
1. For safety reasons, it is recommended to shut down Synapse prior to continuing.
* If the room was invite-only, your users will need to be re-invited.
* If the room no longer has any members at all, it'll be impossible to rejoin.
* The first user to rejoin will have to do so via an alias on a different server.
With all that being said, if you still want to try and recover the room:
1. For safety reasons, shut down Synapse.
2. In the database, run `DELETE FROM blocked_rooms WHERE room_id = '!example:example.org';`
* For caution: it's recommended to run this in a transaction: `BEGIN; DELETE ...;`, verify you got 1 result, then `COMMIT;`.
* The room ID is the same one supplied to the shutdown room API, not the Content Violation room.
3. Restart Synapse (required).
3. Restart Synapse.
You will have to manually handle, if you so choose, the following:
+7
View File
@@ -139,3 +139,10 @@ client IP addresses are recorded correctly.
Having done so, you can then use `https://matrix.example.com` (instead
of `https://matrix.example.com:8448`) as the "Custom server" when
connecting to Synapse from a client.
## Health check endpoint
Synapse exposes a health check endpoint for use by reverse proxies.
Each configured HTTP listener has a `/health` endpoint which always returns
200 OK (and doesn't get logged).
+11
View File
@@ -1577,6 +1577,17 @@ saml2_config:
#
#grandfathered_mxid_source_attribute: upn
# It is possible to configure Synapse to only allow logins if SAML attributes
# match particular values. The requirements can be listed under
# `attribute_requirements` as shown below. All of the listed attributes must
# match for the login to be permitted.
#
#attribute_requirements:
# - attribute: userGroup
# value: "staff"
# - attribute: department
# value: "sales"
# Directory in which Synapse will try to find the template files below.
# If not set, default templates from within the Synapse package will be used.
#
+35 -11
View File
@@ -11,24 +11,33 @@ formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
class: logging.handlers.TimedRotatingFileHandler
formatter: precise
filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600
backupCount: 10
filters: [context]
when: midnight
backupCount: 3 # Does not include the current log file.
encoding: utf8
# Default to buffering writes to log file for efficiency. This means that
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
# logs will still be flushed immediately.
buffer:
class: logging.handlers.MemoryHandler
target: file
# The capacity is the number of log lines that are buffered before
# being written to disk. Increasing this will lead to better
# performance, at the expensive of it taking longer for log lines to
# be written to disk.
capacity: 10
flushLevel: 30 # Flush for WARNING logs as well
# A handler that writes logs to stderr. Unused by default, but can be used
# instead of "buffer" and "file" in the logger handlers.
console:
class: logging.StreamHandler
formatter: precise
filters: [context]
loggers:
synapse.storage.SQL:
@@ -36,8 +45,23 @@ loggers:
# information such as access tokens.
level: INFO
twisted:
# We send the twisted logging directly to the file handler,
# to work around https://github.com/matrix-org/synapse/issues/3471
# when using "buffer" logger. Use "console" to log to stderr instead.
handlers: [file]
propagate: false
root:
level: INFO
handlers: [file, console]
# Write logs to the `buffer` handler, which will buffer them together in memory,
# then write them to a file.
#
# Replace "buffer" with "console" to log to stderr instead. (Note that you'll
# also need to update the configuation for the `twisted` logger above, in
# this case.)
#
handlers: [buffer]
disable_existing_loggers: false
@@ -1,7 +1,7 @@
worker_app: synapse.app.federation_reader
worker_name: federation_reader1
worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_replication_http_port: 9093
worker_listeners:
+41 -13
View File
@@ -23,7 +23,7 @@ The processes communicate with each other via a Synapse-specific protocol called
feeds streams of newly written data between processes so they can be kept in
sync with the database state.
When configured to do so, Synapse uses a
When configured to do so, Synapse uses a
[Redis pub/sub channel](https://redis.io/topics/pubsub) to send the replication
stream between all configured Synapse processes. Additionally, processes may
make HTTP requests to each other, primarily for operations which need to wait
@@ -66,23 +66,31 @@ https://hub.docker.com/r/matrixdotorg/synapse/.
To make effective use of the workers, you will need to configure an HTTP
reverse-proxy such as nginx or haproxy, which will direct incoming requests to
the correct worker, or to the main synapse instance. See
the correct worker, or to the main synapse instance. See
[reverse_proxy.md](reverse_proxy.md) for information on setting up a reverse
proxy.
To enable workers you should create a configuration file for each worker
process. Each worker configuration file inherits the configuration of the shared
homeserver configuration file. You can then override configuration specific to
that worker, e.g. the HTTP listener that it provides (if any); logging
configuration; etc. You should minimise the number of overrides though to
maintain a usable config.
When using workers, each worker process has its own configuration file which
contains settings specific to that worker, such as the HTTP listener that it
provides (if any), logging configuration, etc.
Normally, the worker processes are configured to read from a shared
configuration file as well as the worker-specific configuration files. This
makes it easier to keep common configuration settings synchronised across all
the processes.
The main process is somewhat special in this respect: it does not normally
need its own configuration file and can take all of its configuration from the
shared configuration file.
### Shared Configuration
### Shared configuration
Normally, only a couple of changes are needed to make an existing configuration
file suitable for use with workers. First, you need to enable an "HTTP replication
listener" for the main process; and secondly, you need to enable redis-based
replication. For example:
Next you need to add both a HTTP replication listener, used for HTTP requests
between processes, and redis config to the shared Synapse configuration file
(`homeserver.yaml`). For example:
```yaml
# extend the existing `listeners` section. This defines the ports that the
@@ -105,7 +113,7 @@ Under **no circumstances** should the replication listener be exposed to the
public internet; it has no authentication and is unencrypted.
### Worker Configuration
### Worker configuration
In the config file for each worker, you must specify the type of worker
application (`worker_app`), and you should specify a unqiue name for the worker
@@ -145,6 +153,9 @@ plain HTTP endpoint on port 8083 separately serving various endpoints, e.g.
Obviously you should configure your reverse-proxy to route the relevant
endpoints to the worker (`localhost:8083` in the above example).
### Running Synapse with workers
Finally, you need to start your worker processes. This can be done with either
`synctl` or your distribution's preferred service manager such as `systemd`. We
recommend the use of `systemd` where available: for information on setting up
@@ -407,6 +418,23 @@ all these to be folded into the `generic_worker` app and to use config to define
which processes handle the various proccessing such as push notifications.
## Migration from old config
There are two main independent changes that have been made: introducing Redis
support and merging apps into `synapse.app.generic_worker`. Both these changes
are backwards compatible and so no changes to the config are required, however
server admins are encouraged to plan to migrate to Redis as the old style direct
TCP replication config is deprecated.
To migrate to Redis add the `redis` config as above, and optionally remove the
TCP `replication` listener from master and `worker_replication_port` from worker
config.
To migrate apps to use `synapse.app.generic_worker` simply update the
`worker_app` option in the worker configs, and where worker are started (e.g.
in systemd service files, but not required for synctl).
## Architectural diagram
The following shows an example setup using Redis and a reverse proxy:
+3
View File
@@ -81,3 +81,6 @@ ignore_missing_imports = True
[mypy-rust_python_jaeger_reporter.*]
ignore_missing_imports = True
[mypy-nacl.*]
ignore_missing_imports = True
+1 -1
View File
@@ -67,7 +67,7 @@ logger = logging.getLogger("synapse_port_db")
BOOLEAN_COLUMNS = {
"events": ["processed", "outlier", "contains_url", "count_as_unread"],
"events": ["processed", "outlier", "contains_url"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
+1 -1
View File
@@ -48,7 +48,7 @@ try:
except ImportError:
pass
__version__ = "1.18.0"
__version__ = "1.19.1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
+56 -67
View File
@@ -13,12 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from typing import List, Optional, Tuple
import pymacaroons
from netaddr import IPAddress
from twisted.internet import defer
from twisted.web.server import Request
import synapse.types
@@ -80,13 +79,14 @@ class Auth(object):
self._track_appservice_user_ips = hs.config.track_appservice_user_ips
self._macaroon_secret_key = hs.config.macaroon_secret_key
@defer.inlineCallbacks
def check_from_context(self, room_version: str, event, context, do_sig_check=True):
prev_state_ids = yield defer.ensureDeferred(context.get_prev_state_ids())
auth_events_ids = yield self.compute_auth_events(
async def check_from_context(
self, room_version: str, event, context, do_sig_check=True
):
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
@@ -94,14 +94,13 @@ class Auth(object):
room_version_obj, event, auth_events=auth_events, do_sig_check=do_sig_check
)
@defer.inlineCallbacks
def check_user_in_room(
async def check_user_in_room(
self,
room_id: str,
user_id: str,
current_state: Optional[StateMap[EventBase]] = None,
allow_departed_users: bool = False,
):
) -> EventBase:
"""Check if the user is in the room, or was at some point.
Args:
room_id: The room to check.
@@ -119,37 +118,35 @@ class Auth(object):
Raises:
AuthError if the user is/was not in the room.
Returns:
Deferred[Optional[EventBase]]:
Membership event for the user if the user was in the
room. This will be the join event if they are currently joined to
the room. This will be the leave event if they have left the room.
Membership event for the user if the user was in the
room. This will be the join event if they are currently joined to
the room. This will be the leave event if they have left the room.
"""
if current_state:
member = current_state.get((EventTypes.Member, user_id), None)
else:
member = yield defer.ensureDeferred(
self.state.get_current_state(
room_id=room_id, event_type=EventTypes.Member, state_key=user_id
)
member = await self.state.get_current_state(
room_id=room_id, event_type=EventTypes.Member, state_key=user_id
)
membership = member.membership if member else None
if membership == Membership.JOIN:
return member
if member:
membership = member.membership
# XXX this looks totally bogus. Why do we not allow users who have been banned,
# or those who were members previously and have been re-invited?
if allow_departed_users and membership == Membership.LEAVE:
forgot = yield self.store.did_forget(user_id, room_id)
if not forgot:
if membership == Membership.JOIN:
return member
# XXX this looks totally bogus. Why do we not allow users who have been banned,
# or those who were members previously and have been re-invited?
if allow_departed_users and membership == Membership.LEAVE:
forgot = await self.store.did_forget(user_id, room_id)
if not forgot:
return member
raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
@defer.inlineCallbacks
def check_host_in_room(self, room_id, host):
async def check_host_in_room(self, room_id, host):
with Measure(self.clock, "check_host_in_room"):
latest_event_ids = yield self.store.is_host_joined(room_id, host)
latest_event_ids = await self.store.is_host_joined(room_id, host)
return latest_event_ids
def can_federate(self, event, auth_events):
@@ -160,14 +157,13 @@ class Auth(object):
def get_public_keys(self, invite_event):
return event_auth.get_public_keys(invite_event)
@defer.inlineCallbacks
def get_user_by_req(
async def get_user_by_req(
self,
request: Request,
allow_guest: bool = False,
rights: str = "access",
allow_expired: bool = False,
):
) -> synapse.types.Requester:
""" Get a registered user's ID.
Args:
@@ -180,7 +176,7 @@ class Auth(object):
/login will deliver access tokens regardless of expiration.
Returns:
defer.Deferred: resolves to a `synapse.types.Requester` object
Resolves to the requester
Raises:
InvalidClientCredentialsError if no user by that token exists or the token
is invalid.
@@ -194,14 +190,14 @@ class Auth(object):
access_token = self.get_access_token_from_request(request)
user_id, app_service = yield self._get_appservice_user_id(request)
user_id, app_service = await self._get_appservice_user_id(request)
if user_id:
request.authenticated_entity = user_id
opentracing.set_tag("authenticated_entity", user_id)
opentracing.set_tag("appservice_id", app_service.id)
if ip_addr and self._track_appservice_user_ips:
yield self.store.insert_client_ip(
await self.store.insert_client_ip(
user_id=user_id,
access_token=access_token,
ip=ip_addr,
@@ -211,7 +207,7 @@ class Auth(object):
return synapse.types.create_requester(user_id, app_service=app_service)
user_info = yield self.get_user_by_access_token(
user_info = await self.get_user_by_access_token(
access_token, rights, allow_expired=allow_expired
)
user = user_info["user"]
@@ -221,7 +217,7 @@ class Auth(object):
# Deny the request if the user account has expired.
if self._account_validity.enabled and not allow_expired:
user_id = user.to_string()
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
if (
expiration_ts is not None
and self.clock.time_msec() >= expiration_ts
@@ -235,7 +231,7 @@ class Auth(object):
device_id = user_info.get("device_id")
if user and access_token and ip_addr:
yield self.store.insert_client_ip(
await self.store.insert_client_ip(
user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
@@ -261,8 +257,7 @@ class Auth(object):
except KeyError:
raise MissingClientTokenError()
@defer.inlineCallbacks
def _get_appservice_user_id(self, request):
async def _get_appservice_user_id(self, request):
app_service = self.store.get_app_service_by_token(
self.get_access_token_from_request(request)
)
@@ -283,14 +278,13 @@ class Auth(object):
if not app_service.is_interested_in_user(user_id):
raise AuthError(403, "Application service cannot masquerade as this user.")
if not (yield self.store.get_user_by_id(user_id)):
if not (await self.store.get_user_by_id(user_id)):
raise AuthError(403, "Application service has not registered this user")
return user_id, app_service
@defer.inlineCallbacks
def get_user_by_access_token(
async def get_user_by_access_token(
self, token: str, rights: str = "access", allow_expired: bool = False,
):
) -> dict:
""" Validate access token and get user_id from it
Args:
@@ -300,7 +294,7 @@ class Auth(object):
allow_expired: If False, raises an InvalidClientTokenError
if the token is expired
Returns:
Deferred[dict]: dict that includes:
dict that includes:
`user` (UserID)
`is_guest` (bool)
`token_id` (int|None): access token id. May be None if guest
@@ -314,7 +308,7 @@ class Auth(object):
if rights == "access":
# first look in the database
r = yield self._look_up_user_by_access_token(token)
r = await self._look_up_user_by_access_token(token)
if r:
valid_until_ms = r["valid_until_ms"]
if (
@@ -352,7 +346,7 @@ class Auth(object):
# It would of course be much easier to store guest access
# tokens in the database as well, but that would break existing
# guest tokens.
stored_user = yield self.store.get_user_by_id(user_id)
stored_user = await self.store.get_user_by_id(user_id)
if not stored_user:
raise InvalidClientTokenError("Unknown user_id %s" % user_id)
if not stored_user["is_guest"]:
@@ -482,9 +476,8 @@ class Auth(object):
now = self.hs.get_clock().time_msec()
return now < expiry
@defer.inlineCallbacks
def _look_up_user_by_access_token(self, token):
ret = yield self.store.get_user_by_access_token(token)
async def _look_up_user_by_access_token(self, token):
ret = await self.store.get_user_by_access_token(token)
if not ret:
return None
@@ -507,7 +500,7 @@ class Auth(object):
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
request.authenticated_entity = service.sender
return defer.succeed(service)
return service
async def is_server_admin(self, user: UserID) -> bool:
""" Check if the given user is a local server admin.
@@ -522,7 +515,7 @@ class Auth(object):
def compute_auth_events(
self, event, current_state_ids: StateMap[str], for_verification: bool = False,
):
) -> List[str]:
"""Given an event and current state return the list of event IDs used
to auth an event.
@@ -530,11 +523,11 @@ class Auth(object):
should be added to the event's `auth_events`.
Returns:
defer.Deferred(list[str]): List of event IDs.
List of event IDs.
"""
if event.type == EventTypes.Create:
return defer.succeed([])
return []
# Currently we ignore the `for_verification` flag even though there are
# some situations where we can drop particular auth events when adding
@@ -553,7 +546,7 @@ class Auth(object):
if auth_ev_id:
auth_ids.append(auth_ev_id)
return defer.succeed(auth_ids)
return auth_ids
async def check_can_change_room_list(self, room_id: str, user: UserID):
"""Determine whether the user is allowed to edit the room's entry in the
@@ -636,10 +629,9 @@ class Auth(object):
return query_params[0].decode("ascii")
@defer.inlineCallbacks
def check_user_in_room_or_world_readable(
async def check_user_in_room_or_world_readable(
self, room_id: str, user_id: str, allow_departed_users: bool = False
):
) -> Tuple[str, Optional[str]]:
"""Checks that the user is or was in the room or the room is world
readable. If it isn't then an exception is raised.
@@ -650,10 +642,9 @@ class Auth(object):
members but have now departed
Returns:
Deferred[tuple[str, str|None]]: Resolves to the current membership of
the user in the room and the membership event ID of the user. If
the user is not in the room and never has been, then
`(Membership.JOIN, None)` is returned.
Resolves to the current membership of the user in the room and the
membership event ID of the user. If the user is not in the room and
never has been, then `(Membership.JOIN, None)` is returned.
"""
try:
@@ -662,15 +653,13 @@ class Auth(object):
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.check_user_in_room(
member_event = await self.check_user_in_room(
room_id, user_id, allow_departed_users=allow_departed_users
)
return member_event.membership, member_event.event_id
except AuthError:
visibility = yield defer.ensureDeferred(
self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
visibility = await self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility
+5 -8
View File
@@ -15,8 +15,6 @@
import logging
from twisted.internet import defer
from synapse.api.constants import LimitBlockingTypes, UserTypes
from synapse.api.errors import Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
@@ -36,8 +34,7 @@ class AuthBlocking(object):
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids
@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
async def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
@@ -60,7 +57,7 @@ class AuthBlocking(object):
if user_id is not None:
if user_id == self._server_notices_mxid:
return
if (yield self.store.is_support_user(user_id)):
if await self.store.is_support_user(user_id):
return
if self._hs_disabled:
@@ -76,11 +73,11 @@ class AuthBlocking(object):
# If the user is already part of the MAU cohort or a trial user
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
timestamp = await self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
is_trial = yield self.store.is_trial_user(user_id)
is_trial = await self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
@@ -93,7 +90,7 @@ class AuthBlocking(object):
# allow registration. Support users are excluded from MAU checks.
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
current_mau = await self.store.get_monthly_active_count()
if current_mau >= self._max_mau_value:
raise ResourceLimitError(
403,
+3 -1
View File
@@ -238,14 +238,16 @@ class InteractiveAuthIncompleteError(Exception):
(This indicates we should return a 401 with 'result' as the body)
Attributes:
session_id: The ID of the ongoing interactive auth session.
result: the server response to the request, which should be
passed back to the client
"""
def __init__(self, result: "JsonDict"):
def __init__(self, session_id: str, result: "JsonDict"):
super(InteractiveAuthIncompleteError, self).__init__(
"Interactive auth not yet complete"
)
self.session_id = session_id
self.result = result
+2 -5
View File
@@ -21,8 +21,6 @@ import jsonschema
from canonicaljson import json
from jsonschema import FormatChecker
from twisted.internet import defer
from synapse.api.constants import EventContentFields
from synapse.api.errors import SynapseError
from synapse.storage.presence import UserPresenceState
@@ -137,9 +135,8 @@ class Filtering(object):
super(Filtering, self).__init__()
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_user_filter(self, user_localpart, filter_id):
result = yield self.store.get_user_filter(user_localpart, filter_id)
async def get_user_filter(self, user_localpart, filter_id):
result = await self.store.get_user_filter(user_localpart, filter_id)
return FilterCollection(result)
def add_user_filter(self, user_localpart, user_filter):
+37
View File
@@ -17,6 +17,7 @@ from collections import OrderedDict
from typing import Any, Optional, Tuple
from synapse.api.errors import LimitExceededError
from synapse.types import Requester
from synapse.util import Clock
@@ -43,6 +44,42 @@ class Ratelimiter(object):
# * The rate_hz of this particular entry. This can vary per request
self.actions = OrderedDict() # type: OrderedDict[Any, Tuple[float, int, float]]
def can_requester_do_action(
self,
requester: Requester,
rate_hz: Optional[float] = None,
burst_count: Optional[int] = None,
update: bool = True,
_time_now_s: Optional[int] = None,
) -> Tuple[bool, float]:
"""Can the requester perform the action?
Args:
requester: The requester to key off when rate limiting. The user property
will be used.
rate_hz: The long term number of actions that can be performed in a second.
Overrides the value set during instantiation if set.
burst_count: How many actions that can be performed before being limited.
Overrides the value set during instantiation if set.
update: Whether to count this check as performing the action
_time_now_s: The current time. Optional, defaults to the current time according
to self.clock. Only used by tests.
Returns:
A tuple containing:
* A bool indicating if they can perform the action now
* The reactor timestamp for when the action can be performed next.
-1 if rate_hz is less than or equal to zero
"""
# Disable rate limiting of users belonging to any AS that is configured
# not to be rate limited in its registration file (rate_limited: true|false).
if requester.app_service and not requester.app_service.is_rate_limited():
return True, -1.0
return self.can_do_action(
requester.user.to_string(), rate_hz, burst_count, update, _time_now_s
)
def can_do_action(
self,
key: Any,
+10 -4
View File
@@ -123,8 +123,9 @@ from synapse.rest.client.v2_alpha.account_data import (
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.monthly_active_users import (
@@ -493,7 +494,10 @@ class GenericWorkerServer(HomeServer):
site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = port
resources = {}
# We always include a health resource.
resources = {"/health": HealthResource()}
for res in listener_config.http_options.resources:
for name in res.names:
if name == "metrics":
@@ -631,10 +635,12 @@ class GenericWorkerServer(HomeServer):
async def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
def build_replication_data_handler(self):
@cache_in_self
def get_replication_data_handler(self):
return GenericWorkerReplicationHandler(self)
def build_presence_handler(self):
@cache_in_self
def get_presence_handler(self):
return GenericWorkerPresence(self)
+4 -1
View File
@@ -68,6 +68,7 @@ from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
@@ -98,7 +99,9 @@ class SynapseHomeServer(HomeServer):
if site_tag is None:
site_tag = port
resources = {}
# We always include a health resource.
resources = {"/health": HealthResource()}
for res in listener_config.http_options.resources:
for name in res.names:
if name == "openid" and "federation" in res.names:
+49
View File
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, List
import jsonschema
from synapse.config._base import ConfigError
from synapse.types import JsonDict
def validate_config(json_schema: JsonDict, config: Any, config_path: List[str]) -> None:
"""Validates a config setting against a JsonSchema definition
This can be used to validate a section of the config file against a schema
definition. If the validation fails, a ConfigError is raised with a textual
description of the problem.
Args:
json_schema: the schema to validate against
config: the configuration value to be validated
config_path: the path within the config file. This will be used as a basis
for the error message.
"""
try:
jsonschema.validate(config, json_schema)
except jsonschema.ValidationError as e:
# copy `config_path` before modifying it.
path = list(config_path)
for p in list(e.path):
if isinstance(p, int):
path.append("<item %i>" % p)
else:
path.append(str(p))
raise ConfigError(
"Unable to parse configuration: %s at %s" % (e.message, ".".join(path))
)
+51 -12
View File
@@ -55,24 +55,33 @@ formatters:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - \
%(request)s - %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
class: logging.handlers.TimedRotatingFileHandler
formatter: precise
filename: ${log_file}
maxBytes: 104857600
backupCount: 10
filters: [context]
when: midnight
backupCount: 3 # Does not include the current log file.
encoding: utf8
# Default to buffering writes to log file for efficiency. This means that
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
# logs will still be flushed immediately.
buffer:
class: logging.handlers.MemoryHandler
target: file
# The capacity is the number of log lines that are buffered before
# being written to disk. Increasing this will lead to better
# performance, at the expensive of it taking longer for log lines to
# be written to disk.
capacity: 10
flushLevel: 30 # Flush for WARNING logs as well
# A handler that writes logs to stderr. Unused by default, but can be used
# instead of "buffer" and "file" in the logger handlers.
console:
class: logging.StreamHandler
formatter: precise
filters: [context]
loggers:
synapse.storage.SQL:
@@ -80,9 +89,24 @@ loggers:
# information such as access tokens.
level: INFO
twisted:
# We send the twisted logging directly to the file handler,
# to work around https://github.com/matrix-org/synapse/issues/3471
# when using "buffer" logger. Use "console" to log to stderr instead.
handlers: [file]
propagate: false
root:
level: INFO
handlers: [file, console]
# Write logs to the `buffer` handler, which will buffer them together in memory,
# then write them to a file.
#
# Replace "buffer" with "console" to log to stderr instead. (Note that you'll
# also need to update the configuation for the `twisted` logger above, in
# this case.)
#
handlers: [buffer]
disable_existing_loggers: false
"""
@@ -168,11 +192,26 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.addFilter(LoggingContextFilter(request=""))
logger.addHandler(handler)
else:
logging.config.dictConfig(log_config)
# We add a log record factory that runs all messages through the
# LoggingContextFilter so that we get the context *at the time we log*
# rather than when we write to a handler. This can be done in config using
# filter options, but care must when using e.g. MemoryHandler to buffer
# writes.
log_filter = LoggingContextFilter(request="")
old_factory = logging.getLogRecordFactory()
def factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
log_filter.filter(record)
return record
logging.setLogRecordFactory(factory)
# Route Twisted's native logging through to the standard library logging
# system.
observer = STDLibLogObserver()
+50
View File
@@ -15,7 +15,9 @@
# limitations under the License.
import logging
from typing import Any, List
import attr
import jinja2
import pkg_resources
@@ -23,6 +25,7 @@ from synapse.python_dependencies import DependencyException, check_requirements
from synapse.util.module_loader import load_module, load_python_module
from ._base import Config, ConfigError
from ._util import validate_config
logger = logging.getLogger(__name__)
@@ -80,6 +83,11 @@ class SAML2Config(Config):
self.saml2_enabled = True
attribute_requirements = saml2_config.get("attribute_requirements") or []
self.attribute_requirements = _parse_attribute_requirements_def(
attribute_requirements
)
self.saml2_grandfathered_mxid_source_attribute = saml2_config.get(
"grandfathered_mxid_source_attribute", "uid"
)
@@ -341,6 +349,17 @@ class SAML2Config(Config):
#
#grandfathered_mxid_source_attribute: upn
# It is possible to configure Synapse to only allow logins if SAML attributes
# match particular values. The requirements can be listed under
# `attribute_requirements` as shown below. All of the listed attributes must
# match for the login to be permitted.
#
#attribute_requirements:
# - attribute: userGroup
# value: "staff"
# - attribute: department
# value: "sales"
# Directory in which Synapse will try to find the template files below.
# If not set, default templates from within the Synapse package will be used.
#
@@ -368,3 +387,34 @@ class SAML2Config(Config):
""" % {
"config_dir_path": config_dir_path
}
@attr.s(frozen=True)
class SamlAttributeRequirement:
"""Object describing a single requirement for SAML attributes."""
attribute = attr.ib(type=str)
value = attr.ib(type=str)
JSON_SCHEMA = {
"type": "object",
"properties": {"attribute": {"type": "string"}, "value": {"type": "string"}},
"required": ["attribute", "value"],
}
ATTRIBUTE_REQUIREMENTS_SCHEMA = {
"type": "array",
"items": SamlAttributeRequirement.JSON_SCHEMA,
}
def _parse_attribute_requirements_def(
attribute_requirements: Any,
) -> List[SamlAttributeRequirement]:
validate_config(
ATTRIBUTE_REQUIREMENTS_SCHEMA,
attribute_requirements,
config_path=["saml2_config", "attribute_requirements"],
)
return [SamlAttributeRequirement(**x) for x in attribute_requirements]
+15
View File
@@ -530,6 +530,21 @@ class ServerConfig(Config):
"request_token_inhibit_3pid_errors", False,
)
# List of users trialing the new experimental default push rules. This setting is
# not included in the sample configuration file on purpose as it's a temporary
# hack, so that some users can trial the new defaults without impacting every
# user on the homeserver.
users_new_default_push_rules = (
config.get("users_new_default_push_rules") or []
) # type: list
if not isinstance(users_new_default_push_rules, list):
raise ConfigError("'users_new_default_push_rules' must be a list")
# Turn the list into a set to improve lookup speed.
self.users_new_default_push_rules = set(
users_new_default_push_rules
) # type: set
def has_tls_listener(self) -> bool:
return any(listener.tls for listener in self.listeners)
+8
View File
@@ -48,6 +48,14 @@ class ServerContextFactory(ContextFactory):
connections."""
def __init__(self, config):
# TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
# switch to those (see https://github.com/pyca/cryptography/issues/5379).
#
# note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
# or v3, but is a synonym for TLS_METHOD, which allows the client and server
# to negotiate an appropriate version of TLS constrained by the version options
# set with context.set_options.
#
self._context = SSL.Context(SSL.SSLv23_METHOD)
self.configure_context(self._context, config)
+31 -27
View File
@@ -17,6 +17,7 @@ from typing import Optional
import attr
from nacl.signing import SigningKey
from synapse.api.auth import Auth
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import UnsupportedRoomVersionError
from synapse.api.room_versions import (
@@ -27,6 +28,8 @@ from synapse.api.room_versions import (
)
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import EventID, JsonDict
from synapse.util import Clock
from synapse.util.stringutils import random_string
@@ -42,45 +45,46 @@ class EventBuilder(object):
Attributes:
room_version: Version of the target room
room_id (str)
type (str)
sender (str)
content (dict)
unsigned (dict)
internal_metadata (_EventInternalMetadata)
room_id
type
sender
content
unsigned
internal_metadata
_state (StateHandler)
_auth (synapse.api.Auth)
_store (DataStore)
_clock (Clock)
_hostname (str): The hostname of the server creating the event
_state
_auth
_store
_clock
_hostname: The hostname of the server creating the event
_signing_key: The signing key to use to sign the event as the server
"""
_state = attr.ib()
_auth = attr.ib()
_store = attr.ib()
_clock = attr.ib()
_hostname = attr.ib()
_signing_key = attr.ib()
_state = attr.ib(type=StateHandler)
_auth = attr.ib(type=Auth)
_store = attr.ib(type=DataStore)
_clock = attr.ib(type=Clock)
_hostname = attr.ib(type=str)
_signing_key = attr.ib(type=SigningKey)
room_version = attr.ib(type=RoomVersion)
room_id = attr.ib()
type = attr.ib()
sender = attr.ib()
room_id = attr.ib(type=str)
type = attr.ib(type=str)
sender = attr.ib(type=str)
content = attr.ib(default=attr.Factory(dict))
unsigned = attr.ib(default=attr.Factory(dict))
content = attr.ib(default=attr.Factory(dict), type=JsonDict)
unsigned = attr.ib(default=attr.Factory(dict), type=JsonDict)
# These only exist on a subset of events, so they raise AttributeError if
# someone tries to get them when they don't exist.
_state_key = attr.ib(default=None)
_redacts = attr.ib(default=None)
_origin_server_ts = attr.ib(default=None)
_state_key = attr.ib(default=None, type=Optional[str])
_redacts = attr.ib(default=None, type=Optional[str])
_origin_server_ts = attr.ib(default=None, type=Optional[int])
internal_metadata = attr.ib(
default=attr.Factory(lambda: _EventInternalMetadata({}))
default=attr.Factory(lambda: _EventInternalMetadata({})),
type=_EventInternalMetadata,
)
@property
@@ -106,7 +110,7 @@ class EventBuilder(object):
state_ids = await self._state.get_current_state_ids(
self.room_id, prev_event_ids
)
auth_ids = await self._auth.compute_auth_events(self, state_ids)
auth_ids = self._auth.compute_auth_events(self, state_ids)
format_version = self.room_version.event_format
if format_version == EventFormatVersions.V1:
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, List, Tuple
from canonicaljson import json
@@ -54,7 +54,10 @@ class TransactionManager(object):
@measure_func("_send_new_transaction")
async def send_new_transaction(
self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
self,
destination: str,
pending_pdus: List[Tuple[EventBase, int]],
pending_edus: List[Edu],
):
# Make a transaction-sending opentracing span. This span follows on from
+1 -1
View File
@@ -101,7 +101,7 @@ class ApplicationServicesHandler(object):
async def start_scheduler():
try:
return self.scheduler.start()
return await self.scheduler.start()
except Exception:
logger.error("Application Services Failure")
+12 -7
View File
@@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
request_body: Dict[str, Any],
clientip: str,
description: str,
) -> dict:
) -> Tuple[dict, str]:
"""
Checks that the user is who they claim to be, via a UI auth.
@@ -183,9 +183,14 @@ class AuthHandler(BaseHandler):
describes the operation happening on their account.
Returns:
The parameters for this request (which may
A tuple of (params, session_id).
'params' contains the parameters for this request (which may
have been given only in a previous call).
'session_id' is the ID of this session, either passed in by the
client or assigned by this call
Raises:
InteractiveAuthIncompleteError if the client has not yet completed
any of the permitted login flows
@@ -207,7 +212,7 @@ class AuthHandler(BaseHandler):
flows = [[login_type] for login_type in self._supported_ui_auth_types]
try:
result, params, _ = await self.check_auth(
result, params, session_id = await self.check_ui_auth(
flows, request, request_body, clientip, description
)
except LoginError:
@@ -230,7 +235,7 @@ class AuthHandler(BaseHandler):
if user_id != requester.user.to_string():
raise AuthError(403, "Invalid auth")
return params
return params, session_id
def get_enabled_auth_types(self):
"""Return the enabled user-interactive authentication types
@@ -240,7 +245,7 @@ class AuthHandler(BaseHandler):
"""
return self.checkers.keys()
async def check_auth(
async def check_ui_auth(
self,
flows: List[List[str]],
request: SynapseRequest,
@@ -363,7 +368,7 @@ class AuthHandler(BaseHandler):
if not authdict:
raise InteractiveAuthIncompleteError(
self._auth_dict_for_flows(flows, session.session_id)
session.session_id, self._auth_dict_for_flows(flows, session.session_id)
)
# check auth type currently being presented
@@ -410,7 +415,7 @@ class AuthHandler(BaseHandler):
ret = self._auth_dict_for_flows(flows, session.session_id)
ret["completed"] = list(creds)
ret.update(errordict)
raise InteractiveAuthIncompleteError(ret)
raise InteractiveAuthIncompleteError(session.session_id, ret)
async def add_oob_auth(
self, stagetype: str, authdict: Dict[str, Any], clientip: str
-4
View File
@@ -57,13 +57,10 @@ class EventStreamHandler(BaseHandler):
timeout=0,
as_client_event=True,
affect_presence=True,
only_keys=None,
room_id=None,
is_guest=False,
):
"""Fetches the events stream for a given user.
If `only_keys` is not None, events from keys will be sent down.
"""
if room_id:
@@ -93,7 +90,6 @@ class EventStreamHandler(BaseHandler):
auth_user,
pagin_config,
timeout,
only_keys=only_keys,
is_guest=is_guest,
explicit_room_id=room_id,
)
+1 -1
View File
@@ -2064,7 +2064,7 @@ class FederationHandler(BaseHandler):
if not auth_events:
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = await self.auth.compute_auth_events(
auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events_x = await self.store.get_events(auth_events_ids)
+23 -10
View File
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from canonicaljson import encode_canonical_json, json
@@ -93,11 +93,11 @@ class MessageHandler(object):
async def get_room_data(
self,
user_id: str = None,
room_id: str = None,
event_type: Optional[str] = None,
state_key: str = "",
is_guest: bool = False,
user_id: str,
room_id: str,
event_type: str,
state_key: str,
is_guest: bool,
) -> dict:
""" Get data from a room.
@@ -407,7 +407,7 @@ class EventCreationHandler(object):
#
# map from room id to time-of-last-attempt.
#
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int]
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int]
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
@@ -707,7 +707,7 @@ class EventCreationHandler(object):
async def create_and_send_nonmember_event(
self,
requester: Requester,
event_dict: EventBase,
event_dict: dict,
ratelimit: bool = True,
txn_id: Optional[str] = None,
) -> Tuple[EventBase, int]:
@@ -768,6 +768,15 @@ class EventCreationHandler(object):
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
# we now ought to have some prev_events (unless it's a create event).
#
# do a quick sanity check here, rather than waiting until we've created the
# event and then try to auth it (which fails with a somewhat confusing "No
# create event in auth events")
assert (
builder.type == EventTypes.Create or len(prev_event_ids) > 0
), "Attempting to create an event with no prev_events"
event = await builder.build(prev_event_ids=prev_event_ids)
context = await self.state.compute_event_context(event)
if requester:
@@ -962,7 +971,7 @@ class EventCreationHandler(object):
# Validate a newly added alias or newly added alt_aliases.
original_alias = None
original_alt_aliases = set()
original_alt_aliases = [] # type: List[str]
original_event_id = event.unsigned.get("replaces_state")
if original_event_id:
@@ -1010,6 +1019,10 @@ class EventCreationHandler(object):
current_state_ids = await context.get_current_state_ids()
# We know this event is not an outlier, so this must be
# non-None.
assert current_state_ids is not None
state_to_include_ids = [
e_id
for k, e_id in current_state_ids.items()
@@ -1061,7 +1074,7 @@ class EventCreationHandler(object):
raise SynapseError(400, "Cannot redact event from a different room")
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = await self.auth.compute_auth_events(
auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events = await self.store.get_events(auth_events_ids)
+5 -3
View File
@@ -14,7 +14,7 @@
# limitations under the License.
import json
import logging
from typing import Dict, Generic, List, Optional, Tuple, TypeVar
from typing import TYPE_CHECKING, Dict, Generic, List, Optional, Tuple, TypeVar
from urllib.parse import urlencode
import attr
@@ -39,9 +39,11 @@ from synapse.http.server import respond_with_html
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.push.mailer import load_jinja2_templates
from synapse.server import HomeServer
from synapse.types import UserID, map_username_to_mxid_localpart
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
SESSION_COOKIE_NAME = b"oidc_session"
@@ -91,7 +93,7 @@ class OidcHandler:
"""Handles requests related to the OpenID Connect login flow.
"""
def __init__(self, hs: HomeServer):
def __init__(self, hs: "HomeServer"):
self._callback_url = hs.config.oidc_callback_url # type: str
self._scopes = hs.config.oidc_scopes # type: List[str]
self._client_auth = ClientAuth(
+36 -26
View File
@@ -16,7 +16,7 @@
import abc
import logging
from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
from unpaddedbase64 import encode_base64
@@ -37,6 +37,10 @@ from synapse.util.distributor import user_joined_room, user_left_room
from ._base import BaseHandler
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -48,7 +52,7 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@@ -206,24 +210,40 @@ class RoomMemberHandler(object):
_, stream_id = await self.store.get_event_ordering(duplicate.event_id)
return duplicate.event_id, stream_id
stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit
)
prev_state_ids = await context.get_prev_state_ids()
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
newly_joined = False
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
# Only rate-limit if the user actually joined the room, otherwise we'll end
# up blocking profile updates.
if newly_joined:
await self._user_joined_room(target, room_id)
time_now_s = self.clock.time()
(
allowed,
time_allowed,
) = self._join_rate_limiter_local.can_requester_do_action(requester)
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)
stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
if event.membership == Membership.JOIN and newly_joined:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
await self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
@@ -453,22 +473,12 @@ class RoomMemberHandler(object):
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
if is_host_in_room:
if not is_host_in_room:
time_now_s = self.clock.time()
allowed, time_allowed = self._join_rate_limiter_local.can_do_action(
requester.user.to_string(),
)
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now_s))
)
else:
time_now_s = self.clock.time()
allowed, time_allowed = self._join_rate_limiter_remote.can_do_action(
requester.user.to_string(),
)
(
allowed,
time_allowed,
) = self._join_rate_limiter_remote.can_requester_do_action(requester,)
if not allowed:
raise LimitExceededError(
@@ -1000,7 +1010,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
check_complexity = self.hs.config.limit_remote_rooms.enabled
if check_complexity and self.hs.config.limit_remote_rooms.admins_can_join:
check_complexity = not await self.hs.auth.is_server_admin(user)
check_complexity = not await self.auth.is_server_admin(user)
if check_complexity:
# Fetch the room complexity
+36 -6
View File
@@ -14,15 +14,16 @@
# limitations under the License.
import logging
import re
from typing import Callable, Dict, Optional, Set, Tuple
from typing import TYPE_CHECKING, Callable, Dict, Optional, Set, Tuple
import attr
import saml2
import saml2.response
from saml2.client import Saml2Client
from synapse.api.errors import SynapseError
from synapse.api.errors import AuthError, SynapseError
from synapse.config import ConfigError
from synapse.config.saml2_config import SamlAttributeRequirement
from synapse.http.servlet import parse_string
from synapse.http.site import SynapseRequest
from synapse.module_api import ModuleApi
@@ -34,6 +35,9 @@ from synapse.types import (
from synapse.util.async_helpers import Linearizer
from synapse.util.iterutils import chunk_seq
if TYPE_CHECKING:
import synapse.server
logger = logging.getLogger(__name__)
@@ -49,7 +53,7 @@ class Saml2SessionData:
class SamlHandler:
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self._saml_client = Saml2Client(hs.config.saml2_sp_config)
self._auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
@@ -62,6 +66,7 @@ class SamlHandler:
self._grandfathered_mxid_source_attribute = (
hs.config.saml2_grandfathered_mxid_source_attribute
)
self._saml2_attribute_requirements = hs.config.saml2.attribute_requirements
# plugin to do custom mapping from saml response to mxid
self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class(
@@ -73,7 +78,7 @@ class SamlHandler:
self._auth_provider_id = "saml"
# a map from saml session id to Saml2SessionData object
self._outstanding_requests_dict = {}
self._outstanding_requests_dict = {} # type: Dict[str, Saml2SessionData]
# a lock on the mappings
self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock)
@@ -165,11 +170,18 @@ class SamlHandler:
saml2.BINDING_HTTP_POST,
outstanding=self._outstanding_requests_dict,
)
except saml2.response.UnsolicitedResponse as e:
# the pysaml2 library helpfully logs an ERROR here, but neglects to log
# the session ID. I don't really want to put the full text of the exception
# in the (user-visible) exception message, so let's log the exception here
# so we can track down the session IDs later.
logger.warning(str(e))
raise SynapseError(400, "Unexpected SAML2 login.")
except Exception as e:
raise SynapseError(400, "Unable to parse SAML2 response: %s" % (e,))
raise SynapseError(400, "Unable to parse SAML2 response: %s." % (e,))
if saml2_auth.not_signed:
raise SynapseError(400, "SAML2 response was not signed")
raise SynapseError(400, "SAML2 response was not signed.")
logger.debug("SAML2 response: %s", saml2_auth.origxml)
for assertion in saml2_auth.assertions:
@@ -188,6 +200,9 @@ class SamlHandler:
saml2_auth.in_response_to, None
)
for requirement in self._saml2_attribute_requirements:
_check_attribute_requirement(saml2_auth.ava, requirement)
remote_user_id = self._user_mapping_provider.get_remote_user_id(
saml2_auth, client_redirect_url
)
@@ -294,6 +309,21 @@ class SamlHandler:
del self._outstanding_requests_dict[reqid]
def _check_attribute_requirement(ava: dict, req: SamlAttributeRequirement):
values = ava.get(req.attribute, [])
for v in values:
if v == req.value:
return
logger.info(
"SAML2 attribute %s did not match required value '%s' (was '%s')",
req.attribute,
req.value,
values,
)
raise AuthError(403, "You are not authorized to log in here.")
DOT_REPLACE_PATTERN = re.compile(
("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),))
)
-6
View File
@@ -103,7 +103,6 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -1895,10 +1894,6 @@ class SyncHandler(object):
if room_builder.rtype == "joined":
unread_notifications = {} # type: Dict[str, str]
unread_count = await self.store.get_unread_message_count_for_user(
room_id, sync_config.user.to_string(),
)
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
@@ -1907,7 +1902,6 @@ class SyncHandler(object):
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=unread_count,
)
if room_sync or always_include:
+1 -1
View File
@@ -297,7 +297,7 @@ class SimpleHttpClient(object):
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
logger.debug("Sending request %s %s", method, redact_uri(uri))
with start_active_span(
"outgoing-client-request",
@@ -247,7 +247,7 @@ class MatrixHostnameEndpoint(object):
port = server.port
try:
logger.info("Connecting to %s:%i", host.decode("ascii"), port)
logger.debug("Connecting to %s:%i", host.decode("ascii"), port)
endpoint = HostnameEndpoint(self._reactor, host, port)
if self._tls_options:
endpoint = wrapClientTLS(self._tls_options, endpoint)
+71 -23
View File
@@ -29,10 +29,11 @@ from zope.interface import implementer
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import IReactorPluggableNameResolver
from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -74,7 +75,7 @@ MAXINT = sys.maxsize
_next_id = 1
@attr.s
@attr.s(frozen=True)
class MatrixFederationRequest(object):
method = attr.ib()
"""HTTP method
@@ -110,26 +111,52 @@ class MatrixFederationRequest(object):
:type: str|None
"""
uri = attr.ib(init=False, type=bytes)
"""The URI of this request
"""
def __attrs_post_init__(self):
global _next_id
self.txn_id = "%s-O-%s" % (self.method, _next_id)
txn_id = "%s-O-%s" % (self.method, _next_id)
_next_id = (_next_id + 1) % (MAXINT - 1)
object.__setattr__(self, "txn_id", txn_id)
destination_bytes = self.destination.encode("ascii")
path_bytes = self.path.encode("ascii")
if self.query:
query_bytes = encode_query_args(self.query)
else:
query_bytes = b""
# The object is frozen so we can pre-compute this.
uri = urllib.parse.urlunparse(
(b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
)
object.__setattr__(self, "uri", uri)
def get_json(self):
if self.json_callback:
return self.json_callback()
return self.json
async def _handle_json_response(reactor, timeout_sec, request, response):
async def _handle_json_response(
reactor: IReactorTime,
timeout_sec: float,
request: MatrixFederationRequest,
response: IResponse,
start_ms: int,
):
"""
Reads the JSON body of a response, with a timeout
Args:
reactor (IReactor): twisted reactor, for the timeout
timeout_sec (float): number of seconds to wait for response to complete
request (MatrixFederationRequest): the request that triggered the response
response (IResponse): response to the request
reactor: twisted reactor, for the timeout
timeout_sec: number of seconds to wait for response to complete
request: the request that triggered the response
response: response to the request
start_ms: Timestamp when request was made
Returns:
dict: parsed JSON response
@@ -143,23 +170,35 @@ async def _handle_json_response(reactor, timeout_sec, request, response):
body = await make_deferred_yieldable(d)
except TimeoutError as e:
logger.warning(
"{%s} [%s] Timed out reading response", request.txn_id, request.destination,
"{%s} [%s] Timed out reading response - %s %s",
request.txn_id,
request.destination,
request.method,
request.uri.decode("ascii"),
)
raise RequestSendFailed(e, can_retry=True) from e
except Exception as e:
logger.warning(
"{%s} [%s] Error reading response: %s",
"{%s} [%s] Error reading response %s %s: %s",
request.txn_id,
request.destination,
request.method,
request.uri.decode("ascii"),
e,
)
raise
time_taken_secs = reactor.seconds() - start_ms / 1000
logger.info(
"{%s} [%s] Completed: %d %s",
"{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
time_taken_secs,
request.method,
request.uri.decode("ascii"),
)
return body
@@ -261,7 +300,9 @@ class MatrixFederationHttpClient(object):
# 'M_UNRECOGNIZED' which some endpoints can return when omitting a
# trailing slash on Synapse <= v0.99.3.
logger.info("Retrying request with trailing slash")
request.path += "/"
# Request is frozen so we create a new instance
request = attr.evolve(request, path=request.path + "/")
response = await self._send_request(request, **send_request_args)
@@ -373,9 +414,7 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
url_bytes = urllib.parse.urlunparse(
(b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
)
url_bytes = request.uri
url_str = url_bytes.decode("ascii")
url_to_sign_bytes = urllib.parse.urlunparse(
@@ -402,7 +441,7 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
logger.info(
logger.debug(
"{%s} [%s] Sending request: %s %s; timeout %fs",
request.txn_id,
request.destination,
@@ -436,7 +475,6 @@ class MatrixFederationHttpClient(object):
except DNSLookupError as e:
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
except Exception as e:
logger.info("Failed to send request: %s", e)
raise RequestSendFailed(e, can_retry=True) from e
incoming_responses_counter.labels(
@@ -496,7 +534,7 @@ class MatrixFederationHttpClient(object):
break
except RequestSendFailed as e:
logger.warning(
logger.info(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
request.destination,
@@ -654,6 +692,8 @@ class MatrixFederationHttpClient(object):
json=data,
)
start_ms = self.clock.time_msec()
response = await self._send_request_with_optional_trailing_slash(
request,
try_trailing_slash_on_400,
@@ -664,7 +704,7 @@ class MatrixFederationHttpClient(object):
)
body = await _handle_json_response(
self.reactor, self.default_timeout, request, response
self.reactor, self.default_timeout, request, response, start_ms
)
return body
@@ -720,6 +760,8 @@ class MatrixFederationHttpClient(object):
method="POST", destination=destination, path=path, query=args, json=data
)
start_ms = self.clock.time_msec()
response = await self._send_request(
request,
long_retries=long_retries,
@@ -733,7 +775,7 @@ class MatrixFederationHttpClient(object):
_sec_timeout = self.default_timeout
body = await _handle_json_response(
self.reactor, _sec_timeout, request, response
self.reactor, _sec_timeout, request, response, start_ms,
)
return body
@@ -786,6 +828,8 @@ class MatrixFederationHttpClient(object):
method="GET", destination=destination, path=path, query=args
)
start_ms = self.clock.time_msec()
response = await self._send_request_with_optional_trailing_slash(
request,
try_trailing_slash_on_400,
@@ -796,7 +840,7 @@ class MatrixFederationHttpClient(object):
)
body = await _handle_json_response(
self.reactor, self.default_timeout, request, response
self.reactor, self.default_timeout, request, response, start_ms
)
return body
@@ -846,6 +890,8 @@ class MatrixFederationHttpClient(object):
method="DELETE", destination=destination, path=path, query=args
)
start_ms = self.clock.time_msec()
response = await self._send_request(
request,
long_retries=long_retries,
@@ -854,7 +900,7 @@ class MatrixFederationHttpClient(object):
)
body = await _handle_json_response(
self.reactor, self.default_timeout, request, response
self.reactor, self.default_timeout, request, response, start_ms
)
return body
@@ -914,12 +960,14 @@ class MatrixFederationHttpClient(object):
)
raise
logger.info(
"{%s} [%s] Completed: %d %s [%d bytes]",
"{%s} [%s] Completed: %d %s [%d bytes] %s %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
length,
request.method,
request.uri.decode("ascii"),
)
return (length, headers)
+3 -2
View File
@@ -25,7 +25,7 @@ from io import BytesIO
from typing import Any, Callable, Dict, Tuple, Union
import jinja2
from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
from canonicaljson import encode_canonical_json, encode_pretty_printed_json
from twisted.internet import defer
from twisted.python import failure
@@ -46,6 +46,7 @@ from synapse.api.errors import (
from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn
from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
logger = logging.getLogger(__name__)
@@ -538,7 +539,7 @@ def respond_with_json(
# canonicaljson already encodes to bytes
json_bytes = encode_canonical_json(json_object)
else:
json_bytes = json.dumps(json_object).encode("utf-8")
json_bytes = json_encoder.encode(json_object).encode("utf-8")
return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
+16 -4
View File
@@ -146,10 +146,9 @@ class SynapseRequest(Request):
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
async def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
await really_handle_the_request()
Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated.
@@ -287,7 +286,9 @@ class SynapseRequest(Request):
# the connection dropped)
code += "!"
self.site.access_logger.info(
log_level = logging.INFO if self._should_log_request() else logging.DEBUG
self.site.access_logger.log(
log_level,
"%s - %s - {%s}"
" Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
' %sB %s "%s %s %s" "%s" [%d dbevts]',
@@ -315,6 +316,17 @@ class SynapseRequest(Request):
except Exception as e:
logger.warning("Failed to stop metrics: %r", e)
def _should_log_request(self) -> bool:
"""Whether we should log at INFO that we processed the request.
"""
if self.path == b"/health":
return False
if self.method == b"OPTIONS":
return False
return True
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):
+12 -22
View File
@@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import logging
import threading
from asyncio import iscoroutine
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set
from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -167,7 +166,7 @@ class _BackgroundProcess(object):
)
def run_as_background_process(desc, func, *args, **kwargs):
def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
normal synapse inlineCallbacks function).
Args:
desc (str): a description for this background process type
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
@@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""
@defer.inlineCallbacks
def run():
async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
@@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
# We probably don't have an ensureDeferred in our call stack to handle
# coroutine results, so we need to ensureDeferred here.
#
# But we need this check because ensureDeferred doesn't like being
# called on immediate values (as opposed to Deferreds or coroutines).
if iscoroutine(result):
result = defer.ensureDeferred(result)
if inspect.isawaitable(result):
result = await result
return (yield result)
return result
except Exception:
# failure.Failure() fishes the original Failure out of our stack, and
# thus gives us a sensible stack trace.
f = Failure()
logger.error(
"Background process '%s' threw an exception",
desc,
exc_info=(f.type, f.value, f.getTracebackObject()),
logger.exception(
"Background process '%s' threw an exception", desc,
)
finally:
_background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
return run()
# Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.
return defer.ensureDeferred(run())
def wrap_as_background_process(desc):
+6 -2
View File
@@ -194,12 +194,16 @@ class ModuleApi(object):
synapse.api.errors.AuthError: the access token is invalid
"""
# see if the access token corresponds to a device
user_info = yield self._auth.get_user_by_access_token(access_token)
user_info = yield defer.ensureDeferred(
self._auth.get_user_by_access_token(access_token)
)
device_id = user_info.get("device_id")
user_id = user_info["user"].to_string()
if device_id:
# delete the device, which will also delete its access tokens
yield self._hs.get_device_handler().delete_device(user_id, device_id)
yield defer.ensureDeferred(
self._hs.get_device_handler().delete_device(user_id, device_id)
)
else:
# no associated device. Just delete the access token.
yield defer.ensureDeferred(
+87 -48
View File
@@ -15,7 +15,18 @@
import logging
from collections import namedtuple
from typing import Callable, Iterable, List, TypeVar
from typing import (
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
from prometheus_client import Counter
@@ -24,12 +35,14 @@ from twisted.internet import defer
import synapse.server
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
from synapse.streams.config import PaginationConfig
from synapse.types import Collection, StreamToken, UserID
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -77,7 +90,13 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class.
"""
def __init__(self, user_id, rooms, current_token, time_now_ms):
def __init__(
self,
user_id: str,
rooms: Collection[str],
current_token: StreamToken,
time_now_ms: int,
):
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
@@ -93,13 +112,13 @@ class _NotifierUserStream(object):
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(self, stream_key, stream_id, time_now_ms):
def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
"""Notify any listeners for this user of a new event from an
event source.
Args:
stream_key(str): The stream the event came from.
stream_id(str): The new id for the stream the event came from.
time_now_ms(int): The current time in milliseconds.
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
time_now_ms: The current time in milliseconds.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
@@ -112,7 +131,7 @@ class _NotifierUserStream(object):
self.notify_deferred = ObservableDeferred(defer.Deferred())
noify_deferred.callback(self.current_token)
def remove(self, notifier):
def remove(self, notifier: "Notifier"):
""" Remove this listener from all the indexes in the Notifier
it knows about.
"""
@@ -123,10 +142,10 @@ class _NotifierUserStream(object):
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self):
def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
def new_listener(self, token):
def new_listener(self, token: StreamToken) -> _NotificationListener:
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@@ -159,14 +178,16 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs: "synapse.server.HomeServer"):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
self.user_to_user_stream = {} # type: Dict[str, _NotifierUserStream]
self.room_to_user_streams = {} # type: Dict[str, Set[_NotifierUserStream]]
self.hs = hs
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
self.pending_new_room_events = (
[]
) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -178,10 +199,9 @@ class Notifier(object):
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
else:
self.federation_sender = None
self.state_handler = hs.get_state_handler()
@@ -193,12 +213,12 @@ class Notifier(object):
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
def count_listeners():
all_user_streams = set()
all_user_streams = set() # type: Set[_NotifierUserStream]
for x in list(self.room_to_user_streams.values()):
all_user_streams |= x
for x in list(self.user_to_user_stream.values()):
all_user_streams.add(x)
for streams in list(self.room_to_user_streams.values()):
all_user_streams |= streams
for stream in list(self.user_to_user_stream.values()):
all_user_streams.add(stream)
return sum(stream.count_listeners() for stream in all_user_streams)
@@ -223,7 +243,11 @@ class Notifier(object):
self.replication_callbacks.append(cb)
def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[]
self,
event: EventBase,
room_stream_id: int,
max_room_stream_id: int,
extra_users: Collection[Union[str, UserID]] = [],
):
""" Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -241,11 +265,11 @@ class Notifier(object):
self.notify_replication()
def _notify_pending_new_room_events(self, max_room_stream_id):
def _notify_pending_new_room_events(self, max_room_stream_id: int):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
Args:
max_room_stream_id(int): The highest stream_id below which all
max_room_stream_id: The highest stream_id below which all
events have been persisted.
"""
pending = self.pending_new_room_events
@@ -258,7 +282,12 @@ class Notifier(object):
else:
self._on_new_room_event(event, room_stream_id, extra_users)
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
def _on_new_room_event(
self,
event: EventBase,
room_stream_id: int,
extra_users: Collection[Union[str, UserID]] = [],
):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_as_background_process(
@@ -275,13 +304,19 @@ class Notifier(object):
"room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
)
async def _notify_app_services(self, room_stream_id):
async def _notify_app_services(self, room_stream_id: int):
try:
await self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
def on_new_event(
self,
stream_key: str,
new_token: int,
users: Collection[Union[str, UserID]] = [],
rooms: Collection[str] = [],
):
""" Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
@@ -307,14 +342,19 @@ class Notifier(object):
self.notify_replication()
def on_new_replication_data(self):
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
self.notify_replication()
async def wait_for_events(
self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START
):
self,
user_id: str,
timeout: int,
callback: Callable[[StreamToken, StreamToken], Awaitable[T]],
room_ids=None,
from_token=StreamToken.START,
) -> T:
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
@@ -377,19 +417,16 @@ class Notifier(object):
async def get_events_for(
self,
user,
pagination_config,
timeout,
only_keys=None,
is_guest=False,
explicit_room_id=None,
):
user: UserID,
pagination_config: PaginationConfig,
timeout: int,
is_guest: bool = False,
explicit_room_id: str = None,
) -> EventStreamResult:
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
new events to happen before returning.
If `only_keys` is not None, events from keys will be sent down.
If explicit_room_id is not set, the user's joined rooms will be polled
for events.
If explicit_room_id is set, that room will be polled for events only if
@@ -404,11 +441,13 @@ class Notifier(object):
room_ids, is_joined = await self._get_room_ids(user, explicit_room_id)
is_peeking = not is_joined
async def check_for_updates(before_token, after_token):
async def check_for_updates(
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
if not after_token.is_after(before_token):
return EventStreamResult([], (from_token, from_token))
events = []
events = [] # type: List[EventBase]
end_token = from_token
for name, source in self.event_sources.sources.items():
@@ -417,8 +456,6 @@ class Notifier(object):
after_id = getattr(after_token, keyname)
if before_id == after_id:
continue
if only_keys and name not in only_keys:
continue
new_events, new_key = await source.get_new_events(
user=user,
@@ -476,7 +513,9 @@ class Notifier(object):
return result
async def _get_room_ids(self, user, explicit_room_id):
async def _get_room_ids(
self, user: UserID, explicit_room_id: Optional[str]
) -> Tuple[Collection[str], bool]:
joined_room_ids = await self.store.get_rooms_for_user(user.to_string())
if explicit_room_id:
if explicit_room_id in joined_room_ids:
@@ -486,7 +525,7 @@ class Notifier(object):
raise AuthError(403, "Non-joined access not allowed")
return joined_room_ids, True
async def _is_world_readable(self, room_id):
async def _is_world_readable(self, room_id: str) -> bool:
state = await self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
@@ -496,7 +535,7 @@ class Notifier(object):
return False
@log_function
def remove_expired_streams(self):
def remove_expired_streams(self) -> None:
time_now_ms = self.clock.time_msec()
expired_streams = []
expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
@@ -510,21 +549,21 @@ class Notifier(object):
expired_stream.remove(self)
@log_function
def _register_with_keys(self, user_stream):
def _register_with_keys(self, user_stream: _NotifierUserStream):
self.user_to_user_stream[user_stream.user_id] = user_stream
for room in user_stream.rooms:
s = self.room_to_user_streams.setdefault(room, set())
s.add(user_stream)
def _user_joined_room(self, user_id, room_id):
def _user_joined_room(self, user_id: str, room_id: str):
new_user_stream = self.user_to_user_stream.get(user_id)
if new_user_stream is not None:
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
new_user_stream.rooms.add(room_id)
def notify_replication(self):
def notify_replication(self) -> None:
"""Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks:
cb()
+208 -8
View File
@@ -19,11 +19,13 @@ import copy
from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
def list_with_base_rules(rawrules):
def list_with_base_rules(rawrules, use_new_defaults=False):
"""Combine the list of rules set by the user with the default push rules
Args:
rawrules(list): The rules the user has modified or set.
use_new_defaults(bool): Whether to use the new experimental default rules when
appending or prepending default rules.
Returns:
A new list with the rules set by the user combined with the defaults.
@@ -43,7 +45,9 @@ def list_with_base_rules(rawrules):
ruleslist.extend(
make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
modified_base_rules,
use_new_defaults,
)
)
@@ -54,6 +58,7 @@ def list_with_base_rules(rawrules):
make_base_append_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
modified_base_rules,
use_new_defaults,
)
)
current_prio_class -= 1
@@ -62,6 +67,7 @@ def list_with_base_rules(rawrules):
make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
modified_base_rules,
use_new_defaults,
)
)
@@ -70,27 +76,39 @@ def list_with_base_rules(rawrules):
while current_prio_class > 0:
ruleslist.extend(
make_base_append_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
modified_base_rules,
use_new_defaults,
)
)
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(
make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
modified_base_rules,
use_new_defaults,
)
)
return ruleslist
def make_base_append_rules(kind, modified_base_rules):
def make_base_append_rules(kind, modified_base_rules, use_new_defaults=False):
rules = []
if kind == "override":
rules = BASE_APPEND_OVERRIDE_RULES
rules = (
NEW_APPEND_OVERRIDE_RULES
if use_new_defaults
else BASE_APPEND_OVERRIDE_RULES
)
elif kind == "underride":
rules = BASE_APPEND_UNDERRIDE_RULES
rules = (
NEW_APPEND_UNDERRIDE_RULES
if use_new_defaults
else BASE_APPEND_UNDERRIDE_RULES
)
elif kind == "content":
rules = BASE_APPEND_CONTENT_RULES
@@ -105,7 +123,7 @@ def make_base_append_rules(kind, modified_base_rules):
return rules
def make_base_prepend_rules(kind, modified_base_rules):
def make_base_prepend_rules(kind, modified_base_rules, use_new_defaults=False):
rules = []
if kind == "override":
@@ -270,6 +288,135 @@ BASE_APPEND_OVERRIDE_RULES = [
]
NEW_APPEND_OVERRIDE_RULES = [
{
"rule_id": "global/override/.m.rule.encrypted",
"conditions": [
{
"kind": "event_match",
"key": "type",
"pattern": "m.room.encrypted",
"_id": "_encrypted",
}
],
"actions": ["notify"],
},
{
"rule_id": "global/override/.m.rule.suppress_notices",
"conditions": [
{
"kind": "event_match",
"key": "type",
"pattern": "m.room.message",
"_id": "_suppress_notices_type",
},
{
"kind": "event_match",
"key": "content.msgtype",
"pattern": "m.notice",
"_id": "_suppress_notices",
},
],
"actions": [],
},
{
"rule_id": "global/underride/.m.rule.suppress_edits",
"conditions": [
{
"kind": "event_match",
"key": "m.relates_to.m.rel_type",
"pattern": "m.replace",
"_id": "_suppress_edits",
}
],
"actions": [],
},
{
"rule_id": "global/override/.m.rule.invite_for_me",
"conditions": [
{
"kind": "event_match",
"key": "type",
"pattern": "m.room.member",
"_id": "_member",
},
{
"kind": "event_match",
"key": "content.membership",
"pattern": "invite",
"_id": "_invite_member",
},
{"kind": "event_match", "key": "state_key", "pattern_type": "user_id"},
],
"actions": ["notify", {"set_tweak": "sound", "value": "default"}],
},
{
"rule_id": "global/override/.m.rule.contains_display_name",
"conditions": [{"kind": "contains_display_name"}],
"actions": [
"notify",
{"set_tweak": "sound", "value": "default"},
{"set_tweak": "highlight"},
],
},
{
"rule_id": "global/override/.m.rule.tombstone",
"conditions": [
{
"kind": "event_match",
"key": "type",
"pattern": "m.room.tombstone",
"_id": "_tombstone",
},
{
"kind": "event_match",
"key": "state_key",
"pattern": "",
"_id": "_tombstone_statekey",
},
],
"actions": [
"notify",
{"set_tweak": "sound", "value": "default"},
{"set_tweak": "highlight"},
],
},
{
"rule_id": "global/override/.m.rule.roomnotif",
"conditions": [
{
"kind": "event_match",
"key": "content.body",
"pattern": "@room",
"_id": "_roomnotif_content",
},
{
"kind": "sender_notification_permission",
"key": "room",
"_id": "_roomnotif_pl",
},
],
"actions": [
"notify",
{"set_tweak": "highlight"},
{"set_tweak": "sound", "value": "default"},
],
},
{
"rule_id": "global/override/.m.rule.call",
"conditions": [
{
"kind": "event_match",
"key": "type",
"pattern": "m.call.invite",
"_id": "_call",
}
],
"actions": ["notify", {"set_tweak": "sound", "value": "ring"}],
},
]
BASE_APPEND_UNDERRIDE_RULES = [
{
"rule_id": "global/underride/.m.rule.call",
@@ -354,6 +501,36 @@ BASE_APPEND_UNDERRIDE_RULES = [
]
NEW_APPEND_UNDERRIDE_RULES = [
{
"rule_id": "global/underride/.m.rule.room_one_to_one",
"conditions": [
{"kind": "room_member_count", "is": "2", "_id": "member_count"},
{
"kind": "event_match",
"key": "content.body",
"pattern": "*",
"_id": "body",
},
],
"actions": ["notify", {"set_tweak": "sound", "value": "default"}],
},
{
"rule_id": "global/underride/.m.rule.message",
"conditions": [
{
"kind": "event_match",
"key": "content.body",
"pattern": "*",
"_id": "body",
},
],
"actions": ["notify"],
"enabled": False,
},
]
BASE_RULE_IDS = set()
for r in BASE_APPEND_CONTENT_RULES:
@@ -375,3 +552,26 @@ for r in BASE_APPEND_UNDERRIDE_RULES:
r["priority_class"] = PRIORITY_CLASS_MAP["underride"]
r["default"] = True
BASE_RULE_IDS.add(r["rule_id"])
NEW_RULE_IDS = set()
for r in BASE_APPEND_CONTENT_RULES:
r["priority_class"] = PRIORITY_CLASS_MAP["content"]
r["default"] = True
NEW_RULE_IDS.add(r["rule_id"])
for r in BASE_PREPEND_OVERRIDE_RULES:
r["priority_class"] = PRIORITY_CLASS_MAP["override"]
r["default"] = True
NEW_RULE_IDS.add(r["rule_id"])
for r in NEW_APPEND_OVERRIDE_RULES:
r["priority_class"] = PRIORITY_CLASS_MAP["override"]
r["default"] = True
NEW_RULE_IDS.add(r["rule_id"])
for r in NEW_APPEND_UNDERRIDE_RULES:
r["priority_class"] = PRIORITY_CLASS_MAP["underride"]
r["default"] = True
NEW_RULE_IDS.add(r["rule_id"])
+1 -1
View File
@@ -120,7 +120,7 @@ class BulkPushRuleEvaluator(object):
pl_event = await self.store.get_event(pl_event_id)
auth_events = {POWER_KEY: pl_event}
else:
auth_events_ids = await self.auth.compute_auth_events(
auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=False
)
auth_events = await self.store.get_events(auth_events_ids)
+13 -4
View File
@@ -21,13 +21,22 @@ async def get_badge_count(store, user_id):
invites = await store.get_invited_rooms_for_local_user(user_id)
joins = await store.get_rooms_for_user(user_id)
my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read")
badge = len(invites)
for room_id in joins:
unread_count = await store.get_unread_message_count_for_user(room_id, user_id)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if unread_count else 0
if room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[room_id]
notifs = await (
store.get_unread_event_push_actions_by_room_for_user(
room_id, user_id, last_unread_event_id
)
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
return badge
@@ -28,7 +28,7 @@ class SlavedClientIpStore(BaseSlavedStore):
name="client_ip_last_seen", keylen=4, max_entries=50000
)
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
key = (user_id, access_token, ip)
+3 -2
View File
@@ -18,11 +18,12 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import abc
import json
import logging
from typing import Tuple, Type
_json_encoder = json.JSONEncoder()
from canonicaljson import json
from synapse.util import json_encoder as _json_encoder
logger = logging.getLogger(__name__)
+12 -5
View File
@@ -2,10 +2,17 @@
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSO error</title>
<title>SSO login error</title>
</head>
<body>
<p>Oops! Something went wrong during authentication<span id="errormsg"></span>.</p>
{# a 403 means we have actively rejected their login #}
{% if code == 403 %}
<p>You are not allowed to log in here.</p>
{% else %}
<p>
There was an error during authentication:
</p>
<div id="errormsg" style="margin:20px 80px">{{ msg }}</div>
<p>
If you are seeing this page after clicking a link sent to you via email, make
sure you only click the confirmation link once, and that you open the
@@ -37,9 +44,9 @@
// to print one.
let errorDesc = new URLSearchParams(searchStr).get("error_description")
if (errorDesc) {
document.getElementById("errormsg").innerText = ` ("${errorDesc}")`;
document.getElementById("errormsg").innerText = errorDesc;
}
</script>
{% endif %}
</body>
</html>
</html>
+1 -1
View File
@@ -89,7 +89,7 @@ class ClientDirectoryServer(RestServlet):
dir_handler = self.handlers.directory_handler
try:
service = await self.auth.get_appservice_by_req(request)
service = self.auth.get_appservice_by_req(request)
room_alias = RoomAlias.from_string(room_alias)
await dir_handler.delete_appservice_association(service, room_alias)
logger.info(
+9 -2
View File
@@ -25,7 +25,7 @@ from synapse.http.servlet import (
parse_json_value_from_request,
parse_string,
)
from synapse.push.baserules import BASE_RULE_IDS
from synapse.push.baserules import BASE_RULE_IDS, NEW_RULE_IDS
from synapse.push.clientformat import format_push_rules_for_user
from synapse.push.rulekinds import PRIORITY_CLASS_MAP
from synapse.rest.client.v2_alpha._base import client_patterns
@@ -45,6 +45,8 @@ class PushRuleRestServlet(RestServlet):
self.notifier = hs.get_notifier()
self._is_worker = hs.config.worker_app is not None
self._users_new_default_push_rules = hs.config.users_new_default_push_rules
async def on_PUT(self, request, path):
if self._is_worker:
raise Exception("Cannot handle PUT /push_rules on worker")
@@ -179,7 +181,12 @@ class PushRuleRestServlet(RestServlet):
rule_id = spec["rule_id"]
is_default_rule = rule_id.startswith(".")
if is_default_rule:
if namespaced_rule_id not in BASE_RULE_IDS:
if user_id in self._users_new_default_push_rules:
rule_ids = NEW_RULE_IDS
else:
rule_ids = BASE_RULE_IDS
if namespaced_rule_id not in rule_ids:
raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,))
return self.store.set_push_rule_actions(
user_id, namespaced_rule_id, actions, is_default_rule
+60 -26
View File
@@ -18,7 +18,12 @@ import logging
from http import HTTPStatus
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
from synapse.api.errors import (
Codes,
InteractiveAuthIncompleteError,
SynapseError,
ThreepidValidationError,
)
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.http.server import finish_request, respond_with_html
from synapse.http.servlet import (
@@ -239,18 +244,12 @@ class PasswordRestServlet(RestServlet):
# we do basic sanity checks here because the auth layer will store these
# in sessions. Pull out the new password provided to us.
if "new_password" in body:
new_password = body.pop("new_password")
new_password = body.pop("new_password", None)
if new_password is not None:
if not isinstance(new_password, str) or len(new_password) > 512:
raise SynapseError(400, "Invalid password")
self.password_policy_handler.validate_password(new_password)
# If the password is valid, hash it and store it back on the body.
# This ensures that only the hashed password is handled everywhere.
if "new_password_hash" in body:
raise SynapseError(400, "Unexpected property: new_password_hash")
body["new_password_hash"] = await self.auth_handler.hash(new_password)
# there are two possibilities here. Either the user does not have an
# access token, and needs to do a password reset; or they have one and
# need to validate their identity.
@@ -263,23 +262,49 @@ class PasswordRestServlet(RestServlet):
if self.auth.has_access_token(request):
requester = await self.auth.get_user_by_req(request)
params = await self.auth_handler.validate_user_via_ui_auth(
requester,
request,
body,
self.hs.get_ip_from_request(request),
"modify your account password",
)
try:
params, session_id = await self.auth_handler.validate_user_via_ui_auth(
requester,
request,
body,
self.hs.get_ip_from_request(request),
"modify your account password",
)
except InteractiveAuthIncompleteError as e:
# The user needs to provide more steps to complete auth, but
# they're not required to provide the password again.
#
# If a password is available now, hash the provided password and
# store it for later.
if new_password:
password_hash = await self.auth_handler.hash(new_password)
await self.auth_handler.set_session_data(
e.session_id, "password_hash", password_hash
)
raise
user_id = requester.user.to_string()
else:
requester = None
result, params, _ = await self.auth_handler.check_auth(
[[LoginType.EMAIL_IDENTITY]],
request,
body,
self.hs.get_ip_from_request(request),
"modify your account password",
)
try:
result, params, session_id = await self.auth_handler.check_ui_auth(
[[LoginType.EMAIL_IDENTITY]],
request,
body,
self.hs.get_ip_from_request(request),
"modify your account password",
)
except InteractiveAuthIncompleteError as e:
# The user needs to provide more steps to complete auth, but
# they're not required to provide the password again.
#
# If a password is available now, hash the provided password and
# store it for later.
if new_password:
password_hash = await self.auth_handler.hash(new_password)
await self.auth_handler.set_session_data(
e.session_id, "password_hash", password_hash
)
raise
if LoginType.EMAIL_IDENTITY in result:
threepid = result[LoginType.EMAIL_IDENTITY]
@@ -304,12 +329,21 @@ class PasswordRestServlet(RestServlet):
logger.error("Auth succeeded but no known type! %r", result.keys())
raise SynapseError(500, "", Codes.UNKNOWN)
assert_params_in_dict(params, ["new_password_hash"])
new_password_hash = params["new_password_hash"]
# If we have a password in this request, prefer it. Otherwise, there
# must be a password hash from an earlier request.
if new_password:
password_hash = await self.auth_handler.hash(new_password)
else:
password_hash = await self.auth_handler.get_session_data(
session_id, "password_hash", None
)
if not password_hash:
raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
logout_devices = params.get("logout_devices", True)
await self._set_password_handler.set_password(
user_id, new_password_hash, logout_devices, requester
user_id, password_hash, logout_devices, requester
)
return 200, {}
+71 -41
View File
@@ -24,6 +24,7 @@ import synapse.types
from synapse.api.constants import LoginType
from synapse.api.errors import (
Codes,
InteractiveAuthIncompleteError,
SynapseError,
ThreepidValidationError,
UnrecognizedRequestError,
@@ -387,6 +388,7 @@ class RegisterRestServlet(RestServlet):
self.ratelimiter = hs.get_registration_ratelimiter()
self.password_policy_handler = hs.get_password_policy_handler()
self.clock = hs.get_clock()
self._registration_enabled = self.hs.config.enable_registration
self._registration_flows = _calculate_registration_flows(
hs.config, self.auth_handler
@@ -412,20 +414,8 @@ class RegisterRestServlet(RestServlet):
"Do not understand membership kind: %s" % (kind.decode("utf8"),)
)
# we do basic sanity checks here because the auth layer will store these
# in sessions. Pull out the username/password provided to us.
if "password" in body:
password = body.pop("password")
if not isinstance(password, str) or len(password) > 512:
raise SynapseError(400, "Invalid password")
self.password_policy_handler.validate_password(password)
# If the password is valid, hash it and store it back on the body.
# This ensures that only the hashed password is handled everywhere.
if "password_hash" in body:
raise SynapseError(400, "Unexpected property: password_hash")
body["password_hash"] = await self.auth_handler.hash(password)
# Pull out the provided username and do basic sanity checks early since
# the auth layer will store these in sessions.
desired_username = None
if "username" in body:
if not isinstance(body["username"], str) or len(body["username"]) > 512:
@@ -434,7 +424,7 @@ class RegisterRestServlet(RestServlet):
appservice = None
if self.auth.has_access_token(request):
appservice = await self.auth.get_appservice_by_req(request)
appservice = self.auth.get_appservice_by_req(request)
# fork off as soon as possible for ASes which have completely
# different registration flows to normal users
@@ -459,22 +449,35 @@ class RegisterRestServlet(RestServlet):
)
return 200, result # we throw for non 200 responses
# for regular registration, downcase the provided username before
# attempting to register it. This should mean
# that people who try to register with upper-case in their usernames
# don't get a nasty surprise. (Note that we treat username
# case-insenstively in login, so they are free to carry on imagining
# that their username is CrAzYh4cKeR if that keeps them happy)
# == Normal User Registration == (everyone else)
if not self._registration_enabled:
raise SynapseError(403, "Registration has been disabled")
# For regular registration, convert the provided username to lowercase
# before attempting to register it. This should mean that people who try
# to register with upper-case in their usernames don't get a nasty surprise.
#
# Note that we treat usernames case-insensitively in login, so they are
# free to carry on imagining that their username is CrAzYh4cKeR if that
# keeps them happy.
if desired_username is not None:
desired_username = desired_username.lower()
# == Normal User Registration == (everyone else)
if not self.hs.config.enable_registration:
raise SynapseError(403, "Registration has been disabled")
# Check if this account is upgrading from a guest account.
guest_access_token = body.get("guest_access_token", None)
if "initial_device_display_name" in body and "password_hash" not in body:
# Pull out the provided password and do basic sanity checks early.
#
# Note that we remove the password from the body since the auth layer
# will store the body in the session and we don't want a plaintext
# password store there.
password = body.pop("password", None)
if password is not None:
if not isinstance(password, str) or len(password) > 512:
raise SynapseError(400, "Invalid password")
self.password_policy_handler.validate_password(password)
if "initial_device_display_name" in body and password is None:
# ignore 'initial_device_display_name' if sent without
# a password to work around a client bug where it sent
# the 'initial_device_display_name' param alone, wiping out
@@ -484,6 +487,7 @@ class RegisterRestServlet(RestServlet):
session_id = self.auth_handler.get_session_id(body)
registered_user_id = None
password_hash = None
if session_id:
# if we get a registered user id out of here, it means we previously
# registered a user for this session, so we could just return the
@@ -492,7 +496,12 @@ class RegisterRestServlet(RestServlet):
registered_user_id = await self.auth_handler.get_session_data(
session_id, "registered_user_id", None
)
# Extract the previously-hashed password from the session.
password_hash = await self.auth_handler.get_session_data(
session_id, "password_hash", None
)
# Ensure that the username is valid.
if desired_username is not None:
await self.registration_handler.check_username(
desired_username,
@@ -500,20 +509,38 @@ class RegisterRestServlet(RestServlet):
assigned_user_id=registered_user_id,
)
auth_result, params, session_id = await self.auth_handler.check_auth(
self._registration_flows,
request,
body,
self.hs.get_ip_from_request(request),
"register a new account",
)
# Check if the user-interactive authentication flows are complete, if
# not this will raise a user-interactive auth error.
try:
auth_result, params, session_id = await self.auth_handler.check_ui_auth(
self._registration_flows,
request,
body,
self.hs.get_ip_from_request(request),
"register a new account",
)
except InteractiveAuthIncompleteError as e:
# The user needs to provide more steps to complete auth.
#
# Hash the password and store it with the session since the client
# is not required to provide the password again.
#
# If a password hash was previously stored we will not attempt to
# re-hash and store it for efficiency. This assumes the password
# does not change throughout the authentication flow, but this
# should be fine since the data is meant to be consistent.
if not password_hash and password:
password_hash = await self.auth_handler.hash(password)
await self.auth_handler.set_session_data(
e.session_id, "password_hash", password_hash
)
raise
# Check that we're not trying to register a denied 3pid.
#
# the user-facing checks will probably already have happened in
# /register/email/requestToken when we requested a 3pid, but that's not
# guaranteed.
if auth_result:
for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]:
if login_type in auth_result:
@@ -535,12 +562,15 @@ class RegisterRestServlet(RestServlet):
# don't re-register the threepids
registered = False
else:
# NB: This may be from the auth handler and NOT from the POST
assert_params_in_dict(params, ["password_hash"])
# If we have a password in this request, prefer it. Otherwise, there
# might be a password hash from an earlier request.
if password:
password_hash = await self.auth_handler.hash(password)
if not password_hash:
raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
desired_username = params.get("username", None)
guest_access_token = params.get("guest_access_token", None)
new_password_hash = params.get("password_hash", None)
if desired_username is not None:
desired_username = desired_username.lower()
@@ -582,7 +612,7 @@ class RegisterRestServlet(RestServlet):
registered_user_id = await self.registration_handler.register_user(
localpart=desired_username,
password_hash=new_password_hash,
password_hash=password_hash,
guest_access_token=guest_access_token,
threepid=threepid,
address=client_addr,
@@ -595,8 +625,8 @@ class RegisterRestServlet(RestServlet):
):
await self.store.upsert_monthly_active_user(registered_user_id)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
# Remember that the user account has been registered (and the user
# ID it was registered with, since it might not have been specified).
await self.auth_handler.set_session_data(
session_id, "registered_user_id", registered_user_id
)
@@ -635,7 +665,7 @@ class RegisterRestServlet(RestServlet):
(object) params: registration parameters, from which we pull
device_id, initial_device_name and inhibit_login
Returns:
defer.Deferred: (object) dictionary for response from /register
(object) dictionary for response from /register
"""
result = {"user_id": user_id, "home_server": self.hs.hostname}
if not params.get("inhibit_login", False):
-1
View File
@@ -427,7 +427,6 @@ class SyncRestServlet(RestServlet):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
result["org.matrix.msc2654.unread_count"] = room.unread_count
return result
+1 -3
View File
@@ -22,8 +22,6 @@ from os import path
import jinja2
from jinja2 import TemplateNotFound
from twisted.internet import defer
from synapse.api.errors import NotFoundError, StoreError, SynapseError
from synapse.config import ConfigError
from synapse.http.server import DirectServeHtmlResource, respond_with_html
@@ -135,7 +133,7 @@ class ConsentResource(DirectServeHtmlResource):
else:
qualified_user_id = UserID(username, self.hs.hostname).to_string()
u = await defer.maybeDeferred(self.store.get_user_by_id, qualified_user_id)
u = await self.store.get_user_by_id(qualified_user_id)
if u is None:
raise NotFoundError("Unknown user")
+31
View File
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.web.resource import Resource
class HealthResource(Resource):
"""A resource that does nothing except return a 200 with a body of `OK`,
which can be used as a health check.
Note: `SynapseRequest._should_log_request` ensures that requests to
`/health` do not get logged at INFO.
"""
isLeaf = 1
def render_GET(self, request):
request.setHeader(b"Content-Type", b"text/plain")
return b"OK"
@@ -27,9 +27,7 @@ from typing import Dict, Optional
from urllib import parse as urlparse
import attr
from canonicaljson import json
from twisted.internet import defer
from twisted.internet.error import DNSLookupError
from synapse.api.errors import Codes, SynapseError
@@ -43,6 +41,7 @@ from synapse.http.servlet import parse_integer, parse_string
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.media.v1._base import get_filename_from_headers
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.stringutils import random_string
@@ -228,7 +227,7 @@ class PreviewUrlResource(DirectServeJsonResource):
else:
logger.info("Returning cached response")
og = await make_deferred_yieldable(defer.maybeDeferred(observable.observe))
og = await make_deferred_yieldable(observable.observe())
respond_with_json_bytes(request, 200, og, send_cors=True)
async def _do_preview(self, url: str, user: str, ts: int) -> bytes:
@@ -355,7 +354,7 @@ class PreviewUrlResource(DirectServeJsonResource):
logger.debug("Calculated OG for %s as %s", url, og)
jsonog = json.dumps(og)
jsonog = json_encoder.encode(og)
# store OG in history-aware DB cache
await self.store.store_url_cache(
+6 -2
View File
@@ -25,8 +25,12 @@ import sys
if sys.version_info[0:2] >= (3, 6):
import secrets
def Secrets():
return secrets
class Secrets:
def token_bytes(self, nbytes=32):
return secrets.token_bytes(nbytes)
def token_hex(self, nbytes=32):
return secrets.token_hex(nbytes)
else:
+235 -222
View File
@@ -22,10 +22,14 @@
# Imports required for the default HomeServer() implementation
import abc
import functools
import logging
import os
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar, cast
import twisted
from twisted.mail.smtp import sendmail
from twisted.web.iweb import IPolicyForHTTPS
from synapse.api.auth import Auth
from synapse.api.filtering import Filtering
@@ -93,7 +97,7 @@ from synapse.push.pusherpool import PusherPool
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.resource import ReplicationStreamer
from synapse.replication.tcp.streams import STREAMS_MAP
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
@@ -107,30 +111,72 @@ from synapse.server_notices.worker_server_notices_sender import (
from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import Databases, DataStore, Storage
from synapse.streams.events import EventSources
from synapse.types import DomainSpecificString
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from synapse.handlers.oidc_handler import OidcHandler
from synapse.handlers.saml_handler import SamlHandler
class HomeServer(object):
T = TypeVar("T", bound=Callable[..., Any])
def cache_in_self(builder: T) -> T:
"""Wraps a function called e.g. `get_foo`, checking if `self.foo` exists and
returning if so. If not, calls the given function and sets `self.foo` to it.
Also ensures that dependency cycles throw an exception correctly, rather
than overflowing the stack.
"""
if not builder.__name__.startswith("get_"):
raise Exception(
"@cache_in_self can only be used on functions starting with `get_`"
)
depname = builder.__name__[len("get_") :]
building = [False]
@functools.wraps(builder)
def _get(self):
try:
return getattr(self, depname)
except AttributeError:
pass
# Prevent cyclic dependencies from deadlocking
if building[0]:
raise ValueError("Cyclic dependency while building %s" % (depname,))
building[0] = True
try:
dep = builder(self)
setattr(self, depname, dep)
finally:
building[0] = False
return dep
# We cast here as we need to tell mypy that `_get` has the same signature as
# `builder`.
return cast(T, _get)
class HomeServer(metaclass=abc.ABCMeta):
"""A basic homeserver object without lazy component builders.
This will need all of the components it requires to either be passed as
constructor arguments, or the relevant methods overriding to create them.
Typically this would only be used for unit tests.
For every dependency in the DEPENDENCIES list below, this class creates one
method,
def get_DEPENDENCY(self)
which returns the value of that dependency. If no value has yet been set
nor was provided to the constructor, it will attempt to call a lazy builder
method called
def build_DEPENDENCY(self)
which must be implemented by the subclass. This code may call any of the
required "get" methods on the instance to obtain the sub-dependencies that
one requires.
Dependencies should be added by creating a `def get_<depname>(self)`
function, wrapping it in `@cache_in_self`.
Attributes:
config (synapse.config.homeserver.HomeserverConfig):
@@ -138,86 +184,6 @@ class HomeServer(object):
we are listening on to provide HTTP services.
"""
__metaclass__ = abc.ABCMeta
DEPENDENCIES = [
"http_client",
"federation_client",
"federation_server",
"handlers",
"auth",
"room_creation_handler",
"room_shutdown_handler",
"state_handler",
"state_resolution_handler",
"presence_handler",
"sync_handler",
"typing_handler",
"room_list_handler",
"acme_handler",
"auth_handler",
"device_handler",
"stats_handler",
"e2e_keys_handler",
"e2e_room_keys_handler",
"event_handler",
"event_stream_handler",
"initial_sync_handler",
"application_service_api",
"application_service_scheduler",
"application_service_handler",
"device_message_handler",
"profile_handler",
"event_creation_handler",
"deactivate_account_handler",
"set_password_handler",
"notifier",
"event_sources",
"keyring",
"pusherpool",
"event_builder_factory",
"filtering",
"http_client_context_factory",
"simple_http_client",
"proxied_http_client",
"media_repository",
"media_repository_resource",
"federation_transport_client",
"federation_sender",
"receipts_handler",
"macaroon_generator",
"tcp_replication",
"read_marker_handler",
"action_generator",
"user_directory_handler",
"groups_local_handler",
"groups_server_handler",
"groups_attestation_signing",
"groups_attestation_renewer",
"secrets",
"spam_checker",
"third_party_event_rules",
"room_member_handler",
"federation_registry",
"server_notices_manager",
"server_notices_sender",
"message_handler",
"pagination_handler",
"room_context_handler",
"sendmail",
"registration_handler",
"account_validity_handler",
"cas_handler",
"saml_handler",
"oidc_handler",
"event_client_serializer",
"password_policy_handler",
"storage",
"replication_streamer",
"replication_data_handler",
"replication_streams",
]
REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
# This is overridden in derived application classes
@@ -232,16 +198,17 @@ class HomeServer(object):
config: The full config for the homeserver.
"""
if not reactor:
from twisted.internet import reactor
from twisted.internet import reactor as _reactor
reactor = _reactor
self._reactor = reactor
self.hostname = hostname
# the key we use to sign events and requests
self.signing_key = config.key.signing_key[0]
self.config = config
self._building = {}
self._listening_services = []
self.start_time = None
self._listening_services = [] # type: List[twisted.internet.tcp.Port]
self.start_time = None # type: Optional[int]
self._instance_id = random_string(5)
self._instance_name = config.worker_name or "master"
@@ -255,13 +222,13 @@ class HomeServer(object):
burst_count=config.rc_registration.burst_count,
)
self.datastores = None
self.datastores = None # type: Optional[Databases]
# Other kwargs are explicit dependencies
for depname in kwargs:
setattr(self, depname, kwargs[depname])
def get_instance_id(self):
def get_instance_id(self) -> str:
"""A unique ID for this synapse process instance.
This is used to distinguish running instances in worker-based
@@ -277,13 +244,13 @@ class HomeServer(object):
"""
return self._instance_name
def setup(self):
def setup(self) -> None:
logger.info("Setting up.")
self.start_time = int(self.get_clock().time())
self.datastores = Databases(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")
def setup_master(self):
def setup_master(self) -> None:
"""
Some handlers have side effects on instantiation (like registering
background updates). This function causes them to be fetched, and
@@ -292,192 +259,242 @@ class HomeServer(object):
for i in self.REQUIRED_ON_MASTER_STARTUP:
getattr(self, "get_" + i)()
def get_reactor(self):
def get_reactor(self) -> twisted.internet.base.ReactorBase:
"""
Fetch the Twisted reactor in use by this HomeServer.
"""
return self._reactor
def get_ip_from_request(self, request):
def get_ip_from_request(self, request) -> str:
# X-Forwarded-For is handled by our custom request type.
return request.getClientIP()
def is_mine(self, domain_specific_string):
def is_mine(self, domain_specific_string: DomainSpecificString) -> bool:
return domain_specific_string.domain == self.hostname
def is_mine_id(self, string):
def is_mine_id(self, string: str) -> bool:
return string.split(":", 1)[1] == self.hostname
def get_clock(self):
def get_clock(self) -> Clock:
return self.clock
def get_datastore(self) -> DataStore:
if not self.datastores:
raise Exception("HomeServer.setup must be called before getting datastores")
return self.datastores.main
def get_datastores(self):
def get_datastores(self) -> Databases:
if not self.datastores:
raise Exception("HomeServer.setup must be called before getting datastores")
return self.datastores
def get_config(self):
def get_config(self) -> HomeServerConfig:
return self.config
def get_distributor(self):
def get_distributor(self) -> Distributor:
return self.distributor
def get_registration_ratelimiter(self) -> Ratelimiter:
return self.registration_ratelimiter
def build_federation_client(self):
@cache_in_self
def get_federation_client(self) -> FederationClient:
return FederationClient(self)
def build_federation_server(self):
@cache_in_self
def get_federation_server(self) -> FederationServer:
return FederationServer(self)
def build_handlers(self):
@cache_in_self
def get_handlers(self) -> Handlers:
return Handlers(self)
def build_notifier(self):
@cache_in_self
def get_notifier(self) -> Notifier:
return Notifier(self)
def build_auth(self):
@cache_in_self
def get_auth(self) -> Auth:
return Auth(self)
def build_http_client_context_factory(self):
@cache_in_self
def get_http_client_context_factory(self) -> IPolicyForHTTPS:
return (
InsecureInterceptableContextFactory()
if self.config.use_insecure_ssl_client_just_for_testing_do_not_use
else RegularPolicyForHTTPS()
)
def build_simple_http_client(self):
@cache_in_self
def get_simple_http_client(self) -> SimpleHttpClient:
return SimpleHttpClient(self)
def build_proxied_http_client(self):
@cache_in_self
def get_proxied_http_client(self) -> SimpleHttpClient:
return SimpleHttpClient(
self,
http_proxy=os.getenvb(b"http_proxy"),
https_proxy=os.getenvb(b"HTTPS_PROXY"),
)
def build_room_creation_handler(self):
@cache_in_self
def get_room_creation_handler(self) -> RoomCreationHandler:
return RoomCreationHandler(self)
def build_room_shutdown_handler(self):
@cache_in_self
def get_room_shutdown_handler(self) -> RoomShutdownHandler:
return RoomShutdownHandler(self)
def build_sendmail(self):
@cache_in_self
def get_sendmail(self) -> sendmail:
return sendmail
def build_state_handler(self):
@cache_in_self
def get_state_handler(self) -> StateHandler:
return StateHandler(self)
def build_state_resolution_handler(self):
@cache_in_self
def get_state_resolution_handler(self) -> StateResolutionHandler:
return StateResolutionHandler(self)
def build_presence_handler(self):
@cache_in_self
def get_presence_handler(self) -> PresenceHandler:
return PresenceHandler(self)
def build_typing_handler(self):
@cache_in_self
def get_typing_handler(self):
if self.config.worker.writers.typing == self.get_instance_name():
return TypingWriterHandler(self)
else:
return FollowerTypingHandler(self)
def build_sync_handler(self):
@cache_in_self
def get_sync_handler(self) -> SyncHandler:
return SyncHandler(self)
def build_room_list_handler(self):
@cache_in_self
def get_room_list_handler(self) -> RoomListHandler:
return RoomListHandler(self)
def build_auth_handler(self):
@cache_in_self
def get_auth_handler(self) -> AuthHandler:
return AuthHandler(self)
def build_macaroon_generator(self):
@cache_in_self
def get_macaroon_generator(self) -> MacaroonGenerator:
return MacaroonGenerator(self)
def build_device_handler(self):
@cache_in_self
def get_device_handler(self):
if self.config.worker_app:
return DeviceWorkerHandler(self)
else:
return DeviceHandler(self)
def build_device_message_handler(self):
@cache_in_self
def get_device_message_handler(self) -> DeviceMessageHandler:
return DeviceMessageHandler(self)
def build_e2e_keys_handler(self):
@cache_in_self
def get_e2e_keys_handler(self) -> E2eKeysHandler:
return E2eKeysHandler(self)
def build_e2e_room_keys_handler(self):
@cache_in_self
def get_e2e_room_keys_handler(self) -> E2eRoomKeysHandler:
return E2eRoomKeysHandler(self)
def build_acme_handler(self):
@cache_in_self
def get_acme_handler(self) -> AcmeHandler:
return AcmeHandler(self)
def build_application_service_api(self):
@cache_in_self
def get_application_service_api(self) -> ApplicationServiceApi:
return ApplicationServiceApi(self)
def build_application_service_scheduler(self):
@cache_in_self
def get_application_service_scheduler(self) -> ApplicationServiceScheduler:
return ApplicationServiceScheduler(self)
def build_application_service_handler(self):
@cache_in_self
def get_application_service_handler(self) -> ApplicationServicesHandler:
return ApplicationServicesHandler(self)
def build_event_handler(self):
@cache_in_self
def get_event_handler(self) -> EventHandler:
return EventHandler(self)
def build_event_stream_handler(self):
@cache_in_self
def get_event_stream_handler(self) -> EventStreamHandler:
return EventStreamHandler(self)
def build_initial_sync_handler(self):
@cache_in_self
def get_initial_sync_handler(self) -> InitialSyncHandler:
return InitialSyncHandler(self)
def build_profile_handler(self):
@cache_in_self
def get_profile_handler(self):
if self.config.worker_app:
return BaseProfileHandler(self)
else:
return MasterProfileHandler(self)
def build_event_creation_handler(self):
@cache_in_self
def get_event_creation_handler(self) -> EventCreationHandler:
return EventCreationHandler(self)
def build_deactivate_account_handler(self):
@cache_in_self
def get_deactivate_account_handler(self) -> DeactivateAccountHandler:
return DeactivateAccountHandler(self)
def build_set_password_handler(self):
@cache_in_self
def get_set_password_handler(self) -> SetPasswordHandler:
return SetPasswordHandler(self)
def build_event_sources(self):
@cache_in_self
def get_event_sources(self) -> EventSources:
return EventSources(self)
def build_keyring(self):
@cache_in_self
def get_keyring(self) -> Keyring:
return Keyring(self)
def build_event_builder_factory(self):
@cache_in_self
def get_event_builder_factory(self) -> EventBuilderFactory:
return EventBuilderFactory(self)
def build_filtering(self):
@cache_in_self
def get_filtering(self) -> Filtering:
return Filtering(self)
def build_pusherpool(self):
@cache_in_self
def get_pusherpool(self) -> PusherPool:
return PusherPool(self)
def build_http_client(self):
@cache_in_self
def get_http_client(self) -> MatrixFederationHttpClient:
tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
self.config
)
return MatrixFederationHttpClient(self, tls_client_options_factory)
def build_media_repository_resource(self):
@cache_in_self
def get_media_repository_resource(self) -> MediaRepositoryResource:
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
return MediaRepositoryResource(self)
def build_media_repository(self):
@cache_in_self
def get_media_repository(self) -> MediaRepository:
return MediaRepository(self)
def build_federation_transport_client(self):
@cache_in_self
def get_federation_transport_client(self) -> TransportLayerClient:
return TransportLayerClient(self)
def build_federation_sender(self):
@cache_in_self
def get_federation_sender(self):
if self.should_send_federation():
return FederationSender(self)
elif not self.config.worker_app:
@@ -485,156 +502,152 @@ class HomeServer(object):
else:
raise Exception("Workers cannot send federation traffic")
def build_receipts_handler(self):
@cache_in_self
def get_receipts_handler(self) -> ReceiptsHandler:
return ReceiptsHandler(self)
def build_read_marker_handler(self):
@cache_in_self
def get_read_marker_handler(self) -> ReadMarkerHandler:
return ReadMarkerHandler(self)
def build_tcp_replication(self):
@cache_in_self
def get_tcp_replication(self) -> ReplicationCommandHandler:
return ReplicationCommandHandler(self)
def build_action_generator(self):
@cache_in_self
def get_action_generator(self) -> ActionGenerator:
return ActionGenerator(self)
def build_user_directory_handler(self):
@cache_in_self
def get_user_directory_handler(self) -> UserDirectoryHandler:
return UserDirectoryHandler(self)
def build_groups_local_handler(self):
@cache_in_self
def get_groups_local_handler(self):
if self.config.worker_app:
return GroupsLocalWorkerHandler(self)
else:
return GroupsLocalHandler(self)
def build_groups_server_handler(self):
@cache_in_self
def get_groups_server_handler(self):
if self.config.worker_app:
return GroupsServerWorkerHandler(self)
else:
return GroupsServerHandler(self)
def build_groups_attestation_signing(self):
@cache_in_self
def get_groups_attestation_signing(self) -> GroupAttestationSigning:
return GroupAttestationSigning(self)
def build_groups_attestation_renewer(self):
@cache_in_self
def get_groups_attestation_renewer(self) -> GroupAttestionRenewer:
return GroupAttestionRenewer(self)
def build_secrets(self):
@cache_in_self
def get_secrets(self) -> Secrets:
return Secrets()
def build_stats_handler(self):
@cache_in_self
def get_stats_handler(self) -> StatsHandler:
return StatsHandler(self)
def build_spam_checker(self):
@cache_in_self
def get_spam_checker(self):
return SpamChecker(self)
def build_third_party_event_rules(self):
@cache_in_self
def get_third_party_event_rules(self) -> ThirdPartyEventRules:
return ThirdPartyEventRules(self)
def build_room_member_handler(self):
@cache_in_self
def get_room_member_handler(self):
if self.config.worker_app:
return RoomMemberWorkerHandler(self)
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
@cache_in_self
def get_federation_registry(self) -> FederationHandlerRegistry:
return FederationHandlerRegistry(self)
def build_server_notices_manager(self):
@cache_in_self
def get_server_notices_manager(self):
if self.config.worker_app:
raise Exception("Workers cannot send server notices")
return ServerNoticesManager(self)
def build_server_notices_sender(self):
@cache_in_self
def get_server_notices_sender(self):
if self.config.worker_app:
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self)
def build_message_handler(self):
@cache_in_self
def get_message_handler(self) -> MessageHandler:
return MessageHandler(self)
def build_pagination_handler(self):
@cache_in_self
def get_pagination_handler(self) -> PaginationHandler:
return PaginationHandler(self)
def build_room_context_handler(self):
@cache_in_self
def get_room_context_handler(self) -> RoomContextHandler:
return RoomContextHandler(self)
def build_registration_handler(self):
@cache_in_self
def get_registration_handler(self) -> RegistrationHandler:
return RegistrationHandler(self)
def build_account_validity_handler(self):
@cache_in_self
def get_account_validity_handler(self) -> AccountValidityHandler:
return AccountValidityHandler(self)
def build_cas_handler(self):
@cache_in_self
def get_cas_handler(self) -> CasHandler:
return CasHandler(self)
def build_saml_handler(self):
@cache_in_self
def get_saml_handler(self) -> "SamlHandler":
from synapse.handlers.saml_handler import SamlHandler
return SamlHandler(self)
def build_oidc_handler(self):
@cache_in_self
def get_oidc_handler(self) -> "OidcHandler":
from synapse.handlers.oidc_handler import OidcHandler
return OidcHandler(self)
def build_event_client_serializer(self):
@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
return EventClientSerializer(self)
def build_password_policy_handler(self):
@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:
return PasswordPolicyHandler(self)
def build_storage(self) -> Storage:
return Storage(self, self.datastores)
@cache_in_self
def get_storage(self) -> Storage:
return Storage(self, self.get_datastores())
def build_replication_streamer(self) -> ReplicationStreamer:
@cache_in_self
def get_replication_streamer(self) -> ReplicationStreamer:
return ReplicationStreamer(self)
def build_replication_data_handler(self):
@cache_in_self
def get_replication_data_handler(self) -> ReplicationDataHandler:
return ReplicationDataHandler(self)
def build_replication_streams(self):
@cache_in_self
def get_replication_streams(self) -> Dict[str, Stream]:
return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
def should_send_federation(self):
def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
return self.config.send_federation and (
not self.config.worker_app
or self.config.worker_app == "synapse.app.federation_sender"
)
def _make_dependency_method(depname):
def _get(hs):
try:
return getattr(hs, depname)
except AttributeError:
pass
try:
builder = getattr(hs, "build_%s" % (depname))
except AttributeError:
raise NotImplementedError(
"%s has no %s nor a builder for it" % (type(hs).__name__, depname)
)
# Prevent cyclic dependencies from deadlocking
if depname in hs._building:
raise ValueError("Cyclic dependency while building %s" % (depname,))
hs._building[depname] = 1
try:
dep = builder()
setattr(hs, depname, dep)
finally:
del hs._building[depname]
return dep
setattr(HomeServer, "get_%s" % (depname), _get)
# Build magic accessors for every dependency
for depname in HomeServer.DEPENDENCIES:
_make_dependency_method(depname)

Some files were not shown because too many files have changed in this diff Show More