Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0dd33f1803 | ||
|
|
e2582a5dd7 | ||
|
|
a195efa1fe |
36
.buildkite/scripts/create_postgres_db.py
Executable file
36
.buildkite/scripts/create_postgres_db.py
Executable file
@@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from synapse.storage.engines import create_engine
|
||||
|
||||
logger = logging.getLogger("create_postgres_db")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Create a PostgresEngine.
|
||||
db_engine = create_engine({"name": "psycopg2", "args": {}})
|
||||
|
||||
# Connect to postgres to create the base database.
|
||||
# We use "postgres" as a database because it's bound to exist and the "synapse" one
|
||||
# doesn't exist yet.
|
||||
db_conn = db_engine.module.connect(
|
||||
user="postgres", host="postgres", password="postgres", dbname="postgres"
|
||||
)
|
||||
db_conn.autocommit = True
|
||||
cur = db_conn.cursor()
|
||||
cur.execute("CREATE DATABASE synapse;")
|
||||
cur.close()
|
||||
db_conn.close()
|
||||
@@ -1,31 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
|
||||
import psycopg2
|
||||
|
||||
# a very simple replacment for `psql`, to make up for the lack of the postgres client
|
||||
# libraries in the synapse docker image.
|
||||
|
||||
# We use "postgres" as a database because it's bound to exist and the "synapse" one
|
||||
# doesn't exist yet.
|
||||
db_conn = psycopg2.connect(
|
||||
user="postgres", host="postgres", password="postgres", dbname="postgres"
|
||||
)
|
||||
db_conn.autocommit = True
|
||||
cur = db_conn.cursor()
|
||||
for c in sys.argv[1:]:
|
||||
cur.execute(c)
|
||||
@@ -1,10 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Test script for 'synapse_port_db'.
|
||||
# - sets up synapse and deps
|
||||
# - runs the port script on a prepopulated test sqlite db
|
||||
# - also runs it against an new sqlite db
|
||||
|
||||
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
|
||||
# with additional dependencies needed for the test (such as coverage or the PostgreSQL
|
||||
# driver), update the schema of the test SQLite database and run background updates on it,
|
||||
# create an empty test database in PostgreSQL, then run the 'synapse_port_db' script to
|
||||
# test porting the SQLite database to the PostgreSQL database (with coverage).
|
||||
|
||||
set -xe
|
||||
cd `dirname $0`/../..
|
||||
@@ -22,32 +22,15 @@ echo "--- Generate the signing key"
|
||||
# Generate the server's signing key.
|
||||
python -m synapse.app.homeserver --generate-keys -c .buildkite/sqlite-config.yaml
|
||||
|
||||
echo "--- Prepare test database"
|
||||
echo "--- Prepare the databases"
|
||||
|
||||
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
|
||||
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
|
||||
|
||||
# Create the PostgreSQL database.
|
||||
./.buildkite/scripts/postgres_exec.py "CREATE DATABASE synapse"
|
||||
./.buildkite/scripts/create_postgres_db.py
|
||||
|
||||
echo "+++ Run synapse_port_db against test database"
|
||||
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml
|
||||
|
||||
#####
|
||||
|
||||
# Now do the same again, on an empty database.
|
||||
|
||||
echo "--- Prepare empty SQLite database"
|
||||
|
||||
# we do this by deleting the sqlite db, and then doing the same again.
|
||||
rm .buildkite/test_db.db
|
||||
|
||||
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
|
||||
|
||||
# re-create the PostgreSQL database.
|
||||
./.buildkite/scripts/postgres_exec.py \
|
||||
"DROP DATABASE synapse" \
|
||||
"CREATE DATABASE synapse"
|
||||
|
||||
echo "+++ Run synapse_port_db against empty database"
|
||||
echo "+++ Run synapse_port_db"
|
||||
|
||||
# Run the script
|
||||
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml
|
||||
|
||||
2
.github/workflows/tests.yml
vendored
2
.github/workflows/tests.yml
vendored
@@ -273,7 +273,7 @@ jobs:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Patch Buildkite-specific test scripts
|
||||
run: |
|
||||
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/postgres_exec.py
|
||||
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/create_postgres_db.py
|
||||
sed -i -e 's/host: postgres/host: localhost/' .buildkite/postgres-config.yaml
|
||||
sed -i -e 's|/src/||' .buildkite/{sqlite,postgres}-config.yaml
|
||||
sed -i -e 's/\$TOP/\$GITHUB_WORKSPACE/' .coveragerc
|
||||
|
||||
125
CHANGES.md
125
CHANGES.md
@@ -1,118 +1,3 @@
|
||||
Synapse 1.34.0 (2021-05-17)
|
||||
===========================
|
||||
|
||||
This release deprecates the `room_invite_state_types` configuration setting. See the [upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.34.0/UPGRADE.rst#upgrading-to-v1340) for instructions on updating your configuration file to use the new `room_prejoin_state` setting.
|
||||
|
||||
This release also deprecates the `POST /_synapse/admin/v1/rooms/<room_id>/delete` admin API route. Server administrators are encouraged to update their scripts to use the new `DELETE /_synapse/admin/v1/rooms/<room_id>` route instead.
|
||||
|
||||
|
||||
No significant changes since v1.34.0rc1.
|
||||
|
||||
|
||||
Synapse 1.34.0rc1 (2021-05-12)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Add experimental option to track memory usage of the caches. ([\#9881](https://github.com/matrix-org/synapse/issues/9881))
|
||||
- Add support for `DELETE /_synapse/admin/v1/rooms/<room_id>`. ([\#9889](https://github.com/matrix-org/synapse/issues/9889))
|
||||
- Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set. ([\#9902](https://github.com/matrix-org/synapse/issues/9902))
|
||||
- Improve performance of sending events for worker-based deployments using Redis. ([\#9905](https://github.com/matrix-org/synapse/issues/9905), [\#9950](https://github.com/matrix-org/synapse/issues/9950), [\#9951](https://github.com/matrix-org/synapse/issues/9951))
|
||||
- Improve performance after joining a large room when presence is enabled. ([\#9910](https://github.com/matrix-org/synapse/issues/9910), [\#9916](https://github.com/matrix-org/synapse/issues/9916))
|
||||
- Support stable identifiers for [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772) Spaces. `m.space.child` events will now be taken into account when populating the experimental spaces summary response. Please see [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.34.0/UPGRADE.rst#upgrading-to-v1340) if you have customised `room_invite_state_types` in your configuration. ([\#9915](https://github.com/matrix-org/synapse/issues/9915), [\#9966](https://github.com/matrix-org/synapse/issues/9966))
|
||||
- Improve performance of backfilling in large rooms. ([\#9935](https://github.com/matrix-org/synapse/issues/9935))
|
||||
- Add a config option to allow you to prevent device display names from being shared over federation. Contributed by @aaronraimist. ([\#9945](https://github.com/matrix-org/synapse/issues/9945))
|
||||
- Update support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946): Spaces Summary. ([\#9947](https://github.com/matrix-org/synapse/issues/9947), [\#9954](https://github.com/matrix-org/synapse/issues/9954))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements. ([\#9895](https://github.com/matrix-org/synapse/issues/9895))
|
||||
- Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`. ([\#9896](https://github.com/matrix-org/synapse/issues/9896))
|
||||
- Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession. ([\#9910](https://github.com/matrix-org/synapse/issues/9910))
|
||||
- Include the `origin_server_ts` property in the experimental [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) support to allow clients to properly sort rooms. ([\#9928](https://github.com/matrix-org/synapse/issues/9928))
|
||||
- Fix bugs introduced in v1.23.0 which made the PostgreSQL port script fail when run with a newly-created SQLite database. ([\#9930](https://github.com/matrix-org/synapse/issues/9930))
|
||||
- Fix a bug introduced in Synapse 1.29.0 which caused `m.room_key_request` to-device messages sent from one user to another to be dropped. ([\#9961](https://github.com/matrix-org/synapse/issues/9961), [\#9965](https://github.com/matrix-org/synapse/issues/9965))
|
||||
- Fix a bug introduced in v1.27.0 preventing users and appservices exempt from ratelimiting from creating rooms with many invitees. ([\#9968](https://github.com/matrix-org/synapse/issues/9968))
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
---------------------------
|
||||
|
||||
- Add `startup_delay` to docker healthcheck to reduce waiting time for coming online and update the documentation with extra options. Contributed by @Maquis196. ([\#9913](https://github.com/matrix-org/synapse/issues/9913))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Add `port` argument to the Postgres database sample config section. ([\#9911](https://github.com/matrix-org/synapse/issues/9911))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- Mark as deprecated `POST /_synapse/admin/v1/rooms/<room_id>/delete`. ([\#9889](https://github.com/matrix-org/synapse/issues/9889))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Reduce the length of Synapse's access tokens. ([\#5588](https://github.com/matrix-org/synapse/issues/5588))
|
||||
- Export jemalloc stats to Prometheus if it is being used. ([\#9882](https://github.com/matrix-org/synapse/issues/9882))
|
||||
- Add type hints to presence handler. ([\#9885](https://github.com/matrix-org/synapse/issues/9885))
|
||||
- Reduce memory usage of the LRU caches. ([\#9886](https://github.com/matrix-org/synapse/issues/9886))
|
||||
- Add type hints to the `synapse.handlers` module. ([\#9896](https://github.com/matrix-org/synapse/issues/9896))
|
||||
- Time response time for external cache requests. ([\#9904](https://github.com/matrix-org/synapse/issues/9904))
|
||||
- Minor fixes to the `make_full_schema.sh` script. ([\#9931](https://github.com/matrix-org/synapse/issues/9931))
|
||||
- Move database schema files into a common directory. ([\#9932](https://github.com/matrix-org/synapse/issues/9932))
|
||||
- Add debug logging for lost/delayed to-device messages. ([\#9959](https://github.com/matrix-org/synapse/issues/9959))
|
||||
|
||||
|
||||
Synapse 1.33.2 (2021-05-11)
|
||||
===========================
|
||||
|
||||
Due to the security issue highlighted below, server administrators are encouraged to update Synapse. We are not aware of these vulnerabilities being exploited in the wild.
|
||||
|
||||
Security advisory
|
||||
-----------------
|
||||
|
||||
This release fixes a denial of service attack ([CVE-2021-29471](https://github.com/matrix-org/synapse/security/advisories/GHSA-x345-32rc-8h85)) against Synapse's push rules implementation. Server admins are encouraged to upgrade.
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Unpin attrs dependency. ([\#9946](https://github.com/matrix-org/synapse/issues/9946))
|
||||
|
||||
|
||||
Synapse 1.33.1 (2021-05-06)
|
||||
===========================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug where `/sync` would break if using the latest version of `attrs` dependency, by pinning to a previous version. ([\#9937](https://github.com/matrix-org/synapse/issues/9937))
|
||||
|
||||
|
||||
Synapse 1.33.0 (2021-05-05)
|
||||
===========================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909))
|
||||
|
||||
|
||||
Synapse 1.33.0rc2 (2021-04-29)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))
|
||||
|
||||
|
||||
Synapse 1.33.0rc1 (2021-04-28)
|
||||
==============================
|
||||
|
||||
@@ -181,7 +66,7 @@ Synapse 1.32.1 (2021-04-21)
|
||||
===========================
|
||||
|
||||
This release fixes [a regression](https://github.com/matrix-org/synapse/issues/9853)
|
||||
in Synapse 1.32.0 that caused connected Prometheus instances to become unstable.
|
||||
in Synapse 1.32.0 that caused connected Prometheus instances to become unstable.
|
||||
|
||||
However, as this release is still subject to the `LoggingContext` change in 1.32.0,
|
||||
it is recommended to remain on or downgrade to 1.31.0.
|
||||
@@ -197,11 +82,11 @@ Synapse 1.32.0 (2021-04-20)
|
||||
|
||||
**Note:** This release introduces [a regression](https://github.com/matrix-org/synapse/issues/9853)
|
||||
that can overwhelm connected Prometheus instances. This issue was not present in
|
||||
1.32.0rc1. If affected, it is recommended to downgrade to 1.31.0 in the meantime, and
|
||||
1.32.0rc1. If affected, it is recommended to downgrade to 1.31.0 in the meantime, and
|
||||
follow [these instructions](https://github.com/matrix-org/synapse/pull/9854#issuecomment-823472183)
|
||||
to clean up any excess writeahead logs.
|
||||
|
||||
**Note:** This release also mistakenly included a change that may affected Synapse
|
||||
**Note:** This release also mistakenly included a change that may affected Synapse
|
||||
modules that import `synapse.logging.context.LoggingContext`, such as
|
||||
[synapse-s3-storage-provider](https://github.com/matrix-org/synapse-s3-storage-provider).
|
||||
This will be fixed in a later Synapse version.
|
||||
@@ -212,8 +97,8 @@ This release removes the deprecated `GET /_synapse/admin/v1/users/<user_id>` adm
|
||||
|
||||
This release requires Application Services to use type `m.login.application_service` when registering users via the `/_matrix/client/r0/register` endpoint to comply with the spec. Please ensure your Application Services are up to date.
|
||||
|
||||
If you are using the `packages.matrix.org` Debian repository for Synapse packages,
|
||||
note that we have recently updated the expiry date on the gpg signing key. If you see an
|
||||
If you are using the `packages.matrix.org` Debian repository for Synapse packages,
|
||||
note that we have recently updated the expiry date on the gpg signing key. If you see an
|
||||
error similar to `The following signatures were invalid: EXPKEYSIG F473DD4473365DE1`, you
|
||||
will need to get a fresh copy of the keys. You can do so with:
|
||||
|
||||
|
||||
26
UPGRADE.rst
26
UPGRADE.rst
@@ -85,32 +85,6 @@ for example:
|
||||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
|
||||
Upgrading to v1.34.0
|
||||
====================
|
||||
|
||||
``room_invite_state_types`` configuration setting
|
||||
-----------------------------------------------
|
||||
|
||||
The ``room_invite_state_types`` configuration setting has been deprecated and
|
||||
replaced with ``room_prejoin_state``. See the `sample configuration file <https://github.com/matrix-org/synapse/blob/v1.34.0/docs/sample_config.yaml#L1515>`_.
|
||||
|
||||
If you have set ``room_invite_state_types`` to the default value you should simply
|
||||
remove it from your configuration file. The default value used to be:
|
||||
|
||||
.. code:: yaml
|
||||
|
||||
room_invite_state_types:
|
||||
- "m.room.join_rules"
|
||||
- "m.room.canonical_alias"
|
||||
- "m.room.avatar"
|
||||
- "m.room.encryption"
|
||||
- "m.room.name"
|
||||
|
||||
If you have customised this value, you should remove ``room_invite_state_types`` and
|
||||
configure ``room_prejoin_state`` instead.
|
||||
|
||||
|
||||
|
||||
Upgrading to v1.33.0
|
||||
====================
|
||||
|
||||
|
||||
1
changelog.d/9885.misc
Normal file
1
changelog.d/9885.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add type hints to presence handler.
|
||||
1
changelog.d/9886.misc
Normal file
1
changelog.d/9886.misc
Normal file
@@ -0,0 +1 @@
|
||||
Reduce memory usage of the LRU caches.
|
||||
1
changelog.d/9895.bugfix
Normal file
1
changelog.d/9895.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements.
|
||||
1
changelog.d/9900.bugfix
Normal file
1
changelog.d/9900.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.
|
||||
24
debian/changelog
vendored
24
debian/changelog
vendored
@@ -1,27 +1,3 @@
|
||||
matrix-synapse-py3 (1.34.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.34.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Mon, 17 May 2021 11:34:18 +0100
|
||||
|
||||
matrix-synapse-py3 (1.33.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 11 May 2021 11:17:59 +0100
|
||||
|
||||
matrix-synapse-py3 (1.33.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 06 May 2021 14:06:33 +0100
|
||||
|
||||
matrix-synapse-py3 (1.33.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 05 May 2021 14:15:27 +0100
|
||||
|
||||
matrix-synapse-py3 (1.32.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.32.2.
|
||||
|
||||
@@ -88,5 +88,5 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
||||
|
||||
ENTRYPOINT ["/start.py"]
|
||||
|
||||
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
||||
HEALTHCHECK --interval=1m --timeout=5s \
|
||||
CMD curl -fSs http://localhost:8008/health || exit 1
|
||||
|
||||
@@ -191,16 +191,6 @@ whilst running the above `docker run` commands.
|
||||
```
|
||||
--no-healthcheck
|
||||
```
|
||||
|
||||
## Disabling the healthcheck in docker-compose file
|
||||
|
||||
If you wish to disable the healthcheck via docker-compose, append the following to your service configuration.
|
||||
|
||||
```
|
||||
healthcheck:
|
||||
disable: true
|
||||
```
|
||||
|
||||
## Setting custom healthcheck on docker run
|
||||
|
||||
If you wish to point the healthcheck at a different port with docker command, add the following
|
||||
@@ -212,15 +202,14 @@ If you wish to point the healthcheck at a different port with docker command, ad
|
||||
## Setting the healthcheck in docker-compose file
|
||||
|
||||
You can add the following to set a custom healthcheck in a docker compose file.
|
||||
You will need docker-compose version >2.1 for this to work.
|
||||
You will need version >2.1 for this to work.
|
||||
|
||||
```
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fSs", "http://localhost:8008/health"]
|
||||
interval: 15s
|
||||
timeout: 5s
|
||||
interval: 1m
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 5s
|
||||
```
|
||||
|
||||
## Using jemalloc
|
||||
|
||||
@@ -427,7 +427,7 @@ the new room. Users on other servers will be unaffected.
|
||||
The API is:
|
||||
|
||||
```
|
||||
DELETE /_synapse/admin/v1/rooms/<room_id>
|
||||
POST /_synapse/admin/v1/rooms/<room_id>/delete
|
||||
```
|
||||
|
||||
with a body of:
|
||||
@@ -528,15 +528,6 @@ You will have to manually handle, if you so choose, the following:
|
||||
* Users that would have been booted from the room (and will have been force-joined to the Content Violation room).
|
||||
* Removal of the Content Violation room if desired.
|
||||
|
||||
## Deprecated endpoint
|
||||
|
||||
The previous deprecated API will be removed in a future release, it was:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/rooms/<room_id>/delete
|
||||
```
|
||||
|
||||
It behaves the same way than the current endpoint except the path and the method.
|
||||
|
||||
# Make Room Admin API
|
||||
|
||||
|
||||
@@ -152,16 +152,6 @@ presence:
|
||||
#
|
||||
#gc_thresholds: [700, 10, 10]
|
||||
|
||||
# The minimum time in seconds between each GC for a generation, regardless of
|
||||
# the GC thresholds. This ensures that we don't do GC too frequently.
|
||||
#
|
||||
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
|
||||
# generation 0 GCs, etc.
|
||||
#
|
||||
# Defaults to `[1s, 10s, 30s]`.
|
||||
#
|
||||
#gc_min_interval: [0.5s, 30s, 1m]
|
||||
|
||||
# Set the limit on the returned events in the timeline in the get
|
||||
# and sync operations. The default value is 100. -1 means no upper limit.
|
||||
#
|
||||
@@ -741,12 +731,6 @@ acme:
|
||||
#
|
||||
#allow_profile_lookup_over_federation: false
|
||||
|
||||
# Uncomment to disable device display name lookup over federation. By default, the
|
||||
# Federation API allows other homeservers to obtain device display names of any user
|
||||
# on this homeserver. Defaults to 'true'.
|
||||
#
|
||||
#allow_device_name_lookup_over_federation: false
|
||||
|
||||
|
||||
## Caching ##
|
||||
|
||||
@@ -826,7 +810,6 @@ caches:
|
||||
# password: secretpassword
|
||||
# database: synapse
|
||||
# host: localhost
|
||||
# port: 5432
|
||||
# cp_min: 5
|
||||
# cp_max: 10
|
||||
#
|
||||
@@ -1521,7 +1504,6 @@ room_prejoin_state:
|
||||
# - m.room.avatar
|
||||
# - m.room.encryption
|
||||
# - m.room.name
|
||||
# - m.room.create
|
||||
#
|
||||
# Uncomment the following to disable these defaults (so that only the event
|
||||
# types listed in 'additional_event_types' are shared). Defaults to 'false'.
|
||||
|
||||
3
mypy.ini
3
mypy.ini
@@ -171,6 +171,3 @@ ignore_missing_imports = True
|
||||
|
||||
[mypy-txacme.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-pympler.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
@@ -21,10 +21,9 @@ DISTS = (
|
||||
"debian:buster",
|
||||
"debian:bullseye",
|
||||
"debian:sid",
|
||||
"ubuntu:bionic", # 18.04 LTS (our EOL forced by Py36 on 2021-12-23)
|
||||
"ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14)
|
||||
"ubuntu:groovy", # 20.10 (EOL 2021-07-07)
|
||||
"ubuntu:hirsute", # 21.04 (EOL 2022-01-05)
|
||||
"ubuntu:bionic",
|
||||
"ubuntu:focal",
|
||||
"ubuntu:groovy",
|
||||
)
|
||||
|
||||
DESC = '''\
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python2
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
# It does so by having Synapse generate an up-to-date SQLite DB, then running
|
||||
# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
|
||||
|
||||
export PGHOST="localhost"
|
||||
POSTGRES_HOST="localhost"
|
||||
POSTGRES_DB_NAME="synapse_full_schema.$$"
|
||||
|
||||
SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
|
||||
@@ -32,7 +32,7 @@ usage() {
|
||||
while getopts "p:co:h" opt; do
|
||||
case $opt in
|
||||
p)
|
||||
export PGUSER=$OPTARG
|
||||
POSTGRES_USERNAME=$OPTARG
|
||||
;;
|
||||
c)
|
||||
# Print all commands that are being executed
|
||||
@@ -69,7 +69,7 @@ if [ ${#unsatisfied_requirements} -ne 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "$PGUSER" ]; then
|
||||
if [ -z "$POSTGRES_USERNAME" ]; then
|
||||
echo "No postgres username supplied"
|
||||
usage
|
||||
exit 1
|
||||
@@ -84,9 +84,8 @@ fi
|
||||
# Create the output directory if it doesn't exist
|
||||
mkdir -p "$OUTPUT_DIR"
|
||||
|
||||
read -rsp "Postgres password for '$PGUSER': " PGPASSWORD
|
||||
read -rsp "Postgres password for '$POSTGRES_USERNAME': " POSTGRES_PASSWORD
|
||||
echo ""
|
||||
export PGPASSWORD
|
||||
|
||||
# Exit immediately if a command fails
|
||||
set -e
|
||||
@@ -132,9 +131,9 @@ report_stats: false
|
||||
database:
|
||||
name: "psycopg2"
|
||||
args:
|
||||
user: "$PGUSER"
|
||||
host: "$PGHOST"
|
||||
password: "$PGPASSWORD"
|
||||
user: "$POSTGRES_USERNAME"
|
||||
host: "$POSTGRES_HOST"
|
||||
password: "$POSTGRES_PASSWORD"
|
||||
database: "$POSTGRES_DB_NAME"
|
||||
|
||||
# Suppress the key server warning.
|
||||
@@ -151,7 +150,7 @@ scripts-dev/update_database --database-config "$SQLITE_CONFIG"
|
||||
|
||||
# Create the PostgreSQL database.
|
||||
echo "Creating postgres database..."
|
||||
createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_DB_NAME"
|
||||
createdb $POSTGRES_DB_NAME
|
||||
|
||||
echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
|
||||
if [ -z "$COVERAGE" ]; then
|
||||
@@ -182,7 +181,7 @@ DROP TABLE user_directory_search_docsize;
|
||||
DROP TABLE user_directory_search_stat;
|
||||
"
|
||||
sqlite3 "$SQLITE_DB" <<< "$SQL"
|
||||
psql "$POSTGRES_DB_NAME" -w <<< "$SQL"
|
||||
psql $POSTGRES_DB_NAME -U "$POSTGRES_USERNAME" -w <<< "$SQL"
|
||||
|
||||
echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
|
||||
sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"
|
||||
|
||||
@@ -913,11 +913,10 @@ class Porter(object):
|
||||
(curr_forward_id + 1,),
|
||||
)
|
||||
|
||||
if curr_backward_id:
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
|
||||
(curr_backward_id + 1,),
|
||||
)
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
|
||||
(curr_backward_id + 1,),
|
||||
)
|
||||
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
|
||||
@@ -955,11 +954,10 @@ class Porter(object):
|
||||
(curr_chain_id,),
|
||||
)
|
||||
|
||||
if curr_chain_id is not None:
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_event_auth_chain_id",
|
||||
r,
|
||||
)
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_event_auth_chain_id", r,
|
||||
)
|
||||
|
||||
|
||||
|
||||
##############################################
|
||||
|
||||
@@ -47,7 +47,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.34.0"
|
||||
__version__ = "1.33.0rc1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -110,18 +110,13 @@ class EventTypes:
|
||||
|
||||
Dummy = "org.matrix.dummy_event"
|
||||
|
||||
SpaceChild = "m.space.child"
|
||||
SpaceParent = "m.space.parent"
|
||||
MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child"
|
||||
MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent"
|
||||
|
||||
|
||||
class ToDeviceEventTypes:
|
||||
RoomKeyRequest = "m.room_key_request"
|
||||
|
||||
|
||||
class EduTypes:
|
||||
Presence = "m.presence"
|
||||
RoomKeyRequest = "m.room_key_request"
|
||||
|
||||
|
||||
class RejectedReason:
|
||||
@@ -179,7 +174,6 @@ class EventContentFields:
|
||||
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
|
||||
|
||||
# cf https://github.com/matrix-org/matrix-doc/pull/1772
|
||||
ROOM_TYPE = "type"
|
||||
MSC1772_ROOM_TYPE = "org.matrix.msc1772.type"
|
||||
|
||||
|
||||
|
||||
@@ -57,7 +57,6 @@ class Ratelimiter:
|
||||
rate_hz: Optional[float] = None,
|
||||
burst_count: Optional[int] = None,
|
||||
update: bool = True,
|
||||
n_actions: int = 1,
|
||||
_time_now_s: Optional[int] = None,
|
||||
) -> Tuple[bool, float]:
|
||||
"""Can the entity (e.g. user or IP address) perform the action?
|
||||
@@ -77,9 +76,6 @@ class Ratelimiter:
|
||||
burst_count: How many actions that can be performed before being limited.
|
||||
Overrides the value set during instantiation if set.
|
||||
update: Whether to count this check as performing the action
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
|
||||
@@ -128,20 +124,17 @@ class Ratelimiter:
|
||||
time_delta = time_now_s - time_start
|
||||
performed_count = action_count - time_delta * rate_hz
|
||||
if performed_count < 0:
|
||||
performed_count = 0
|
||||
# Allow, reset back to count 1
|
||||
allowed = True
|
||||
time_start = time_now_s
|
||||
|
||||
# This check would be easier read as performed_count + n_actions > burst_count,
|
||||
# but performed_count might be a very precise float (with lots of numbers
|
||||
# following the point) in which case Python might round it up when adding it to
|
||||
# n_actions. Writing it this way ensures it doesn't happen.
|
||||
if performed_count > burst_count - n_actions:
|
||||
action_count = 1.0
|
||||
elif performed_count > burst_count - 1.0:
|
||||
# Deny, we have exceeded our burst count
|
||||
allowed = False
|
||||
else:
|
||||
# We haven't reached our limit yet
|
||||
allowed = True
|
||||
action_count = performed_count + n_actions
|
||||
action_count += 1.0
|
||||
|
||||
if update:
|
||||
self.actions[key] = (action_count, time_start, rate_hz)
|
||||
@@ -189,7 +182,6 @@ class Ratelimiter:
|
||||
rate_hz: Optional[float] = None,
|
||||
burst_count: Optional[int] = None,
|
||||
update: bool = True,
|
||||
n_actions: int = 1,
|
||||
_time_now_s: Optional[int] = None,
|
||||
):
|
||||
"""Checks if an action can be performed. If not, raises a LimitExceededError
|
||||
@@ -209,9 +201,6 @@ class Ratelimiter:
|
||||
burst_count: How many actions that can be performed before being limited.
|
||||
Overrides the value set during instantiation if set.
|
||||
update: Whether to count this check as performing the action
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
|
||||
@@ -227,7 +216,6 @@ class Ratelimiter:
|
||||
rate_hz=rate_hz,
|
||||
burst_count=burst_count,
|
||||
update=update,
|
||||
n_actions=n_actions,
|
||||
_time_now_s=time_now_s,
|
||||
)
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.daemonize import daemonize_process
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
@@ -116,7 +115,6 @@ def start_reactor(
|
||||
|
||||
def run():
|
||||
logger.info("Running")
|
||||
setup_jemalloc_stats()
|
||||
change_resource_limit(soft_file_limit)
|
||||
if gc_thresholds:
|
||||
gc.set_threshold(*gc_thresholds)
|
||||
|
||||
@@ -454,10 +454,6 @@ def start(config_options):
|
||||
config.server.update_user_directory = False
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
|
||||
|
||||
if config.server.gc_seconds:
|
||||
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
|
||||
|
||||
hs = GenericWorkerServer(
|
||||
config.server_name,
|
||||
|
||||
@@ -341,10 +341,6 @@ def setup(config_options):
|
||||
sys.exit(0)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
|
||||
|
||||
if config.server.gc_seconds:
|
||||
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
|
||||
|
||||
hs = SynapseHomeServer(
|
||||
config.server_name,
|
||||
|
||||
@@ -88,6 +88,10 @@ class ApiConfig(Config):
|
||||
if not room_prejoin_state_config.get("disable_default_event_types"):
|
||||
yield from _DEFAULT_PREJOIN_STATE_TYPES
|
||||
|
||||
if self.spaces_enabled:
|
||||
# MSC1772 suggests adding m.room.create to the prejoin state
|
||||
yield EventTypes.Create
|
||||
|
||||
yield from room_prejoin_state_config.get("additional_event_types", [])
|
||||
|
||||
|
||||
@@ -105,8 +109,6 @@ _DEFAULT_PREJOIN_STATE_TYPES = [
|
||||
EventTypes.RoomAvatar,
|
||||
EventTypes.RoomEncryption,
|
||||
EventTypes.Name,
|
||||
# Per MSC1772.
|
||||
EventTypes.Create,
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -17,8 +17,6 @@ import re
|
||||
import threading
|
||||
from typing import Callable, Dict
|
||||
|
||||
from synapse.python_dependencies import DependencyException, check_requirements
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
# The prefix for all cache factor-related environment variables
|
||||
@@ -191,15 +189,6 @@ class CacheConfig(Config):
|
||||
)
|
||||
self.cache_factors[cache] = factor
|
||||
|
||||
self.track_memory_usage = cache_config.get("track_memory_usage", False)
|
||||
if self.track_memory_usage:
|
||||
try:
|
||||
check_requirements("cache_memory")
|
||||
except DependencyException as e:
|
||||
raise ConfigError(
|
||||
e.message # noqa: B306, DependencyException.message is a property
|
||||
)
|
||||
|
||||
# Resize all caches (if necessary) with the new factors we've loaded
|
||||
self.resize_all_caches()
|
||||
|
||||
|
||||
@@ -58,7 +58,6 @@ DEFAULT_CONFIG = """\
|
||||
# password: secretpassword
|
||||
# database: synapse
|
||||
# host: localhost
|
||||
# port: 5432
|
||||
# cp_min: 5
|
||||
# cp_max: 10
|
||||
#
|
||||
|
||||
@@ -44,10 +44,6 @@ class FederationConfig(Config):
|
||||
"allow_profile_lookup_over_federation", True
|
||||
)
|
||||
|
||||
self.allow_device_name_lookup_over_federation = config.get(
|
||||
"allow_device_name_lookup_over_federation", True
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
return """\
|
||||
## Federation ##
|
||||
@@ -79,12 +75,6 @@ class FederationConfig(Config):
|
||||
# on this homeserver. Defaults to 'true'.
|
||||
#
|
||||
#allow_profile_lookup_over_federation: false
|
||||
|
||||
# Uncomment to disable device display name lookup over federation. By default, the
|
||||
# Federation API allows other homeservers to obtain device display names of any user
|
||||
# on this homeserver. Defaults to 'true'.
|
||||
#
|
||||
#allow_device_name_lookup_over_federation: false
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ import logging
|
||||
import os.path
|
||||
import re
|
||||
from textwrap import indent
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set
|
||||
|
||||
import attr
|
||||
import yaml
|
||||
@@ -572,7 +572,6 @@ class ServerConfig(Config):
|
||||
_warn_if_webclient_configured(self.listeners)
|
||||
|
||||
self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
|
||||
self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None))
|
||||
|
||||
@attr.s
|
||||
class LimitRemoteRoomsConfig:
|
||||
@@ -918,16 +917,6 @@ class ServerConfig(Config):
|
||||
#
|
||||
#gc_thresholds: [700, 10, 10]
|
||||
|
||||
# The minimum time in seconds between each GC for a generation, regardless of
|
||||
# the GC thresholds. This ensures that we don't do GC too frequently.
|
||||
#
|
||||
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
|
||||
# generation 0 GCs, etc.
|
||||
#
|
||||
# Defaults to `[1s, 10s, 30s]`.
|
||||
#
|
||||
#gc_min_interval: [0.5s, 30s, 1m]
|
||||
|
||||
# Set the limit on the returned events in the timeline in the get
|
||||
# and sync operations. The default value is 100. -1 means no upper limit.
|
||||
#
|
||||
@@ -1316,24 +1305,6 @@ class ServerConfig(Config):
|
||||
help="Turn on the twisted telnet manhole service on the given port.",
|
||||
)
|
||||
|
||||
def read_gc_intervals(self, durations) -> Optional[Tuple[float, float, float]]:
|
||||
"""Reads the three durations for the GC min interval option, returning seconds."""
|
||||
if durations is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
if len(durations) != 3:
|
||||
raise ValueError()
|
||||
return (
|
||||
self.parse_duration(durations[0]) / 1000,
|
||||
self.parse_duration(durations[1]) / 1000,
|
||||
self.parse_duration(durations[2]) / 1000,
|
||||
)
|
||||
except Exception:
|
||||
raise ConfigError(
|
||||
"Value of `gc_min_interval` must be a list of three durations if set"
|
||||
)
|
||||
|
||||
|
||||
def is_threepid_reserved(reserved_threepids, threepid):
|
||||
"""Check the threepid against the reserved threepid config
|
||||
|
||||
@@ -17,7 +17,7 @@ import os
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
from typing import List, Optional, Pattern
|
||||
from typing import List, Optional
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
@@ -124,7 +124,7 @@ class TlsConfig(Config):
|
||||
fed_whitelist_entries = []
|
||||
|
||||
# Support globs (*) in whitelist values
|
||||
self.federation_certificate_verification_whitelist = [] # type: List[Pattern]
|
||||
self.federation_certificate_verification_whitelist = [] # type: List[str]
|
||||
for entry in fed_whitelist_entries:
|
||||
try:
|
||||
entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii"))
|
||||
|
||||
@@ -20,7 +20,6 @@ from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple,
|
||||
from synapse.rest.media.v1._base import FileInfo
|
||||
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
|
||||
from synapse.spam_checker_api import RegistrationBehaviour
|
||||
from synapse.types import RoomAlias
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -114,9 +113,7 @@ class SpamChecker:
|
||||
|
||||
return True
|
||||
|
||||
async def user_may_create_room_alias(
|
||||
self, userid: str, room_alias: RoomAlias
|
||||
) -> bool:
|
||||
async def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool:
|
||||
"""Checks if a given user may create a room alias
|
||||
|
||||
If this method returns false, the association request will be rejected.
|
||||
|
||||
@@ -44,6 +44,7 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
UnsupportedRoomVersionError,
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
@@ -864,6 +865,14 @@ class FederationHandlerRegistry:
|
||||
# EDU received.
|
||||
self._edu_type_to_instance = {} # type: Dict[str, List[str]]
|
||||
|
||||
# A rate limiter for incoming room key requests per origin.
|
||||
self._room_key_request_rate_limiter = Ratelimiter(
|
||||
store=hs.get_datastore(),
|
||||
clock=self.clock,
|
||||
rate_hz=self.config.rc_key_requests.per_second,
|
||||
burst_count=self.config.rc_key_requests.burst_count,
|
||||
)
|
||||
|
||||
def register_edu_handler(
|
||||
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
|
||||
) -> None:
|
||||
@@ -917,6 +926,16 @@ class FederationHandlerRegistry:
|
||||
if not self.config.use_presence and edu_type == EduTypes.Presence:
|
||||
return
|
||||
|
||||
# If the incoming room key requests from a particular origin are over
|
||||
# the limit, drop them.
|
||||
if (
|
||||
edu_type == EduTypes.RoomKeyRequest
|
||||
and not await self._room_key_request_rate_limiter.can_do_action(
|
||||
None, origin
|
||||
)
|
||||
):
|
||||
return
|
||||
|
||||
# Check if we have a handler on this instance
|
||||
handler = self.edu_handlers.get(edu_type)
|
||||
if handler:
|
||||
|
||||
@@ -28,7 +28,6 @@ from synapse.api.presence import UserPresenceState
|
||||
from synapse.events import EventBase
|
||||
from synapse.federation.units import Edu
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag
|
||||
from synapse.metrics import sent_transactions_counter
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
@@ -575,14 +574,6 @@ class PerDestinationQueue:
|
||||
for content in contents
|
||||
]
|
||||
|
||||
if edus:
|
||||
issue9533_logger.debug(
|
||||
"Sending %i to-device messages to %s, up to stream id %i",
|
||||
len(edus),
|
||||
self._destination,
|
||||
stream_id,
|
||||
)
|
||||
|
||||
return (edus, stream_id)
|
||||
|
||||
def _start_catching_up(self) -> None:
|
||||
|
||||
@@ -995,7 +995,6 @@ class TransportLayerClient:
|
||||
returned per space
|
||||
exclude_rooms: a list of any rooms we can skip
|
||||
"""
|
||||
# TODO When switching to the stable endpoint, use GET instead of POST.
|
||||
path = _create_path(
|
||||
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
|
||||
)
|
||||
|
||||
@@ -1376,32 +1376,6 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
|
||||
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
|
||||
PATH = "/spaces/(?P<room_id>[^/]*)"
|
||||
|
||||
async def on_GET(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Mapping[bytes, Sequence[bytes]],
|
||||
room_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
suggested_only = parse_boolean_from_args(query, "suggested_only", default=False)
|
||||
max_rooms_per_space = parse_integer_from_args(query, "max_rooms_per_space")
|
||||
|
||||
exclude_rooms = []
|
||||
if b"exclude_rooms" in query:
|
||||
try:
|
||||
exclude_rooms = [
|
||||
room_id.decode("ascii") for room_id in query[b"exclude_rooms"]
|
||||
]
|
||||
except Exception:
|
||||
raise SynapseError(
|
||||
400, "Bad query parameter for exclude_rooms", Codes.INVALID_PARAM
|
||||
)
|
||||
|
||||
return 200, await self.handler.federation_space_summary(
|
||||
room_id, suggested_only, max_rooms_per_space, exclude_rooms
|
||||
)
|
||||
|
||||
# TODO When switching to the stable endpoint, remove the POST handler.
|
||||
async def on_POST(
|
||||
self,
|
||||
origin: str,
|
||||
|
||||
@@ -17,7 +17,6 @@ import logging
|
||||
import time
|
||||
import unicodedata
|
||||
import urllib.parse
|
||||
from binascii import crc32
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
@@ -35,7 +34,6 @@ from typing import (
|
||||
import attr
|
||||
import bcrypt
|
||||
import pymacaroons
|
||||
import unpaddedbase64
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
@@ -68,7 +66,6 @@ from synapse.util import stringutils as stringutils
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
|
||||
from synapse.util.msisdn import phone_number_to_msisdn
|
||||
from synapse.util.stringutils import base62_encode
|
||||
from synapse.util.threepids import canonicalise_email
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -811,12 +808,10 @@ class AuthHandler(BaseHandler):
|
||||
logger.info(
|
||||
"Logging in user %s as %s%s", user_id, puppets_user_id, fmt_expiry
|
||||
)
|
||||
target_user_id_obj = UserID.from_string(puppets_user_id)
|
||||
else:
|
||||
logger.info(
|
||||
"Logging in user %s on device %s%s", user_id, device_id, fmt_expiry
|
||||
)
|
||||
target_user_id_obj = UserID.from_string(user_id)
|
||||
|
||||
if (
|
||||
not is_appservice_ghost
|
||||
@@ -824,7 +819,7 @@ class AuthHandler(BaseHandler):
|
||||
):
|
||||
await self.auth.check_auth_blocking(user_id)
|
||||
|
||||
access_token = self.generate_access_token(target_user_id_obj)
|
||||
access_token = self.macaroon_gen.generate_access_token(user_id)
|
||||
await self.store.add_access_token_to_user(
|
||||
user_id=user_id,
|
||||
token=access_token,
|
||||
@@ -1197,19 +1192,6 @@ class AuthHandler(BaseHandler):
|
||||
return None
|
||||
return user_id
|
||||
|
||||
def generate_access_token(self, for_user: UserID) -> str:
|
||||
"""Generates an opaque string, for use as an access token"""
|
||||
|
||||
# we use the following format for access tokens:
|
||||
# syt_<base64 local part>_<random string>_<base62 crc check>
|
||||
|
||||
b64local = unpaddedbase64.encode_base64(for_user.localpart.encode("utf-8"))
|
||||
random_string = stringutils.random_string(20)
|
||||
base = f"syt_{b64local}_{random_string}"
|
||||
|
||||
crc = base62_encode(crc32(base.encode("ascii")), minwidth=6)
|
||||
return f"{base}_{crc}"
|
||||
|
||||
async def validate_short_term_login_token(
|
||||
self, login_token: str
|
||||
) -> LoginTokenAttributes:
|
||||
@@ -1603,7 +1585,10 @@ class MacaroonGenerator:
|
||||
|
||||
hs = attr.ib()
|
||||
|
||||
def generate_guest_access_token(self, user_id: str) -> str:
|
||||
def generate_access_token(
|
||||
self, user_id: str, extra_caveats: Optional[List[str]] = None
|
||||
) -> str:
|
||||
extra_caveats = extra_caveats or []
|
||||
macaroon = self._generate_base_macaroon(user_id)
|
||||
macaroon.add_first_party_caveat("type = access")
|
||||
# Include a nonce, to make sure that each login gets a different
|
||||
@@ -1611,7 +1596,8 @@ class MacaroonGenerator:
|
||||
macaroon.add_first_party_caveat(
|
||||
"nonce = %s" % (stringutils.random_string_with_symbols(16),)
|
||||
)
|
||||
macaroon.add_first_party_caveat("guest = true")
|
||||
for caveat in extra_caveats:
|
||||
macaroon.add_first_party_caveat(caveat)
|
||||
return macaroon.serialize()
|
||||
|
||||
def generate_short_term_login_token(
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
|
||||
from synapse.api.constants import ToDeviceEventTypes
|
||||
from synapse.api.constants import EduTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.logging.context import run_in_background
|
||||
@@ -79,8 +79,6 @@ class DeviceMessageHandler:
|
||||
ReplicationUserDevicesResyncRestServlet.make_client(hs)
|
||||
)
|
||||
|
||||
# a rate limiter for room key requests. The keys are
|
||||
# (sending_user_id, sending_device_id).
|
||||
self._ratelimiter = Ratelimiter(
|
||||
store=self.store,
|
||||
clock=hs.get_clock(),
|
||||
@@ -102,25 +100,12 @@ class DeviceMessageHandler:
|
||||
for user_id, by_device in content["messages"].items():
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
logger.warning("To-device message to non-local user %s", user_id)
|
||||
logger.warning("Request for keys for non-local user %s", user_id)
|
||||
raise SynapseError(400, "Not a user here")
|
||||
|
||||
if not by_device:
|
||||
continue
|
||||
|
||||
# Ratelimit key requests by the sending user.
|
||||
if message_type == ToDeviceEventTypes.RoomKeyRequest:
|
||||
allowed, _ = await self._ratelimiter.can_do_action(
|
||||
None, (sender_user_id, None)
|
||||
)
|
||||
if not allowed:
|
||||
logger.info(
|
||||
"Dropping room_key_request from %s to %s due to rate limit",
|
||||
sender_user_id,
|
||||
user_id,
|
||||
)
|
||||
continue
|
||||
|
||||
messages_by_device = {
|
||||
device_id: {
|
||||
"content": message_content,
|
||||
@@ -207,19 +192,13 @@ class DeviceMessageHandler:
|
||||
for user_id, by_device in messages.items():
|
||||
# Ratelimit local cross-user key requests by the sending device.
|
||||
if (
|
||||
message_type == ToDeviceEventTypes.RoomKeyRequest
|
||||
message_type == EduTypes.RoomKeyRequest
|
||||
and user_id != sender_user_id
|
||||
):
|
||||
allowed, _ = await self._ratelimiter.can_do_action(
|
||||
and await self._ratelimiter.can_do_action(
|
||||
requester, (sender_user_id, requester.device_id)
|
||||
)
|
||||
if not allowed:
|
||||
logger.info(
|
||||
"Dropping room_key_request from %s to %s due to rate limit",
|
||||
sender_user_id,
|
||||
user_id,
|
||||
)
|
||||
continue
|
||||
):
|
||||
continue
|
||||
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if self.is_mine(UserID.from_string(user_id)):
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
import logging
|
||||
import string
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional
|
||||
from typing import Iterable, List, Optional
|
||||
|
||||
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
|
||||
from synapse.api.errors import (
|
||||
@@ -27,19 +27,15 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.storage.databases.main.directory import RoomAliasMapping
|
||||
from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
|
||||
from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DirectoryHandler(BaseHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
|
||||
self.state = hs.get_state_handler()
|
||||
@@ -64,7 +60,7 @@ class DirectoryHandler(BaseHandler):
|
||||
room_id: str,
|
||||
servers: Optional[Iterable[str]] = None,
|
||||
creator: Optional[str] = None,
|
||||
) -> None:
|
||||
):
|
||||
# general association creation for both human users and app services
|
||||
|
||||
for wchar in string.whitespace:
|
||||
@@ -78,7 +74,7 @@ class DirectoryHandler(BaseHandler):
|
||||
# TODO(erikj): Add transactions.
|
||||
# TODO(erikj): Check if there is a current association.
|
||||
if not servers:
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
servers = {get_domain_from_id(u) for u in users}
|
||||
|
||||
if not servers:
|
||||
@@ -108,9 +104,8 @@ class DirectoryHandler(BaseHandler):
|
||||
"""
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
room_alias_str = room_alias.to_string()
|
||||
|
||||
if len(room_alias_str) > MAX_ALIAS_LENGTH:
|
||||
if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
|
||||
@@ -119,7 +114,7 @@ class DirectoryHandler(BaseHandler):
|
||||
|
||||
service = requester.app_service
|
||||
if service:
|
||||
if not service.is_interested_in_alias(room_alias_str):
|
||||
if not service.is_interested_in_alias(room_alias.to_string()):
|
||||
raise SynapseError(
|
||||
400,
|
||||
"This application service has not reserved this kind of alias.",
|
||||
@@ -143,7 +138,7 @@ class DirectoryHandler(BaseHandler):
|
||||
raise AuthError(403, "This user is not permitted to create this alias")
|
||||
|
||||
if not self.config.is_alias_creation_allowed(
|
||||
user_id, room_id, room_alias_str
|
||||
user_id, room_id, room_alias.to_string()
|
||||
):
|
||||
# Lets just return a generic message, as there may be all sorts of
|
||||
# reasons why we said no. TODO: Allow configurable error messages
|
||||
@@ -216,7 +211,7 @@ class DirectoryHandler(BaseHandler):
|
||||
|
||||
async def delete_appservice_association(
|
||||
self, service: ApplicationService, room_alias: RoomAlias
|
||||
) -> None:
|
||||
):
|
||||
if not service.is_interested_in_alias(room_alias.to_string()):
|
||||
raise SynapseError(
|
||||
400,
|
||||
@@ -225,7 +220,7 @@ class DirectoryHandler(BaseHandler):
|
||||
)
|
||||
await self._delete_association(room_alias)
|
||||
|
||||
async def _delete_association(self, room_alias: RoomAlias) -> str:
|
||||
async def _delete_association(self, room_alias: RoomAlias):
|
||||
if not self.hs.is_mine(room_alias):
|
||||
raise SynapseError(400, "Room alias must be local")
|
||||
|
||||
@@ -233,19 +228,17 @@ class DirectoryHandler(BaseHandler):
|
||||
|
||||
return room_id
|
||||
|
||||
async def get_association(self, room_alias: RoomAlias) -> JsonDict:
|
||||
async def get_association(self, room_alias: RoomAlias):
|
||||
room_id = None
|
||||
if self.hs.is_mine(room_alias):
|
||||
result = await self.get_association_from_room_alias(
|
||||
room_alias
|
||||
) # type: Optional[RoomAliasMapping]
|
||||
result = await self.get_association_from_room_alias(room_alias)
|
||||
|
||||
if result:
|
||||
room_id = result.room_id
|
||||
servers = result.servers
|
||||
else:
|
||||
try:
|
||||
fed_result = await self.federation.make_query(
|
||||
result = await self.federation.make_query(
|
||||
destination=room_alias.domain,
|
||||
query_type="directory",
|
||||
args={"room_alias": room_alias.to_string()},
|
||||
@@ -255,13 +248,13 @@ class DirectoryHandler(BaseHandler):
|
||||
except CodeMessageException as e:
|
||||
logging.warning("Error retrieving alias")
|
||||
if e.code == 404:
|
||||
fed_result = None
|
||||
result = None
|
||||
else:
|
||||
raise
|
||||
|
||||
if fed_result and "room_id" in fed_result and "servers" in fed_result:
|
||||
room_id = fed_result["room_id"]
|
||||
servers = fed_result["servers"]
|
||||
if result and "room_id" in result and "servers" in result:
|
||||
room_id = result["room_id"]
|
||||
servers = result["servers"]
|
||||
|
||||
if not room_id:
|
||||
raise SynapseError(
|
||||
@@ -270,7 +263,7 @@ class DirectoryHandler(BaseHandler):
|
||||
Codes.NOT_FOUND,
|
||||
)
|
||||
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
extra_servers = {get_domain_from_id(u) for u in users}
|
||||
servers = set(extra_servers) | set(servers)
|
||||
|
||||
@@ -282,7 +275,7 @@ class DirectoryHandler(BaseHandler):
|
||||
|
||||
return {"room_id": room_id, "servers": servers}
|
||||
|
||||
async def on_directory_query(self, args: JsonDict) -> JsonDict:
|
||||
async def on_directory_query(self, args):
|
||||
room_alias = RoomAlias.from_string(args["room_alias"])
|
||||
if not self.hs.is_mine(room_alias):
|
||||
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
|
||||
@@ -300,7 +293,7 @@ class DirectoryHandler(BaseHandler):
|
||||
|
||||
async def _update_canonical_alias(
|
||||
self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias
|
||||
) -> None:
|
||||
):
|
||||
"""
|
||||
Send an updated canonical alias event if the removed alias was set as
|
||||
the canonical alias or listed in the alt_aliases field.
|
||||
@@ -351,9 +344,7 @@ class DirectoryHandler(BaseHandler):
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
async def get_association_from_room_alias(
|
||||
self, room_alias: RoomAlias
|
||||
) -> Optional[RoomAliasMapping]:
|
||||
async def get_association_from_room_alias(self, room_alias: RoomAlias):
|
||||
result = await self.store.get_association_from_room_alias(room_alias)
|
||||
if not result:
|
||||
# Query AS to see if it exists
|
||||
@@ -381,7 +372,7 @@ class DirectoryHandler(BaseHandler):
|
||||
# either no interested services, or no service with an exclusive lock
|
||||
return True
|
||||
|
||||
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
|
||||
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
|
||||
"""Determine whether a user can delete an alias.
|
||||
|
||||
One of the following must be true:
|
||||
@@ -403,13 +394,14 @@ class DirectoryHandler(BaseHandler):
|
||||
if not room_id:
|
||||
return False
|
||||
|
||||
return await self.auth.check_can_change_room_list(
|
||||
res = await self.auth.check_can_change_room_list(
|
||||
room_id, UserID.from_string(user_id)
|
||||
)
|
||||
return res
|
||||
|
||||
async def edit_published_room_list(
|
||||
self, requester: Requester, room_id: str, visibility: str
|
||||
) -> None:
|
||||
):
|
||||
"""Edit the entry of the room in the published room list.
|
||||
|
||||
requester
|
||||
@@ -477,7 +469,7 @@ class DirectoryHandler(BaseHandler):
|
||||
|
||||
async def edit_published_appservice_room_list(
|
||||
self, appservice_id: str, network_id: str, room_id: str, visibility: str
|
||||
) -> None:
|
||||
):
|
||||
"""Add or remove a room from the appservice/network specific public
|
||||
room list.
|
||||
|
||||
@@ -507,4 +499,5 @@ class DirectoryHandler(BaseHandler):
|
||||
room_id, requester.user.to_string()
|
||||
)
|
||||
|
||||
return await self.store.get_aliases_for_room(room_id)
|
||||
aliases = await self.store.get_aliases_for_room(room_id)
|
||||
return aliases
|
||||
|
||||
@@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler):
|
||||
# Send down presence.
|
||||
if event.state_key == auth_user_id:
|
||||
# Send down presence for everyone in the room.
|
||||
users = await self.store.get_users_in_room(
|
||||
users = await self.state.get_current_users_in_room(
|
||||
event.room_id
|
||||
) # type: Iterable[str]
|
||||
else:
|
||||
|
||||
@@ -552,12 +552,8 @@ class FederationHandler(BaseHandler):
|
||||
destination: str,
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
) -> List[EventBase]:
|
||||
"""Requests all of the room state at a given event from a remote
|
||||
homeserver.
|
||||
|
||||
Will also fetch any missing events reported in the `auth_chain_ids`
|
||||
section of `/state_ids`.
|
||||
) -> Tuple[List[EventBase], List[EventBase]]:
|
||||
"""Requests all of the room state at a given event from a remote homeserver.
|
||||
|
||||
Args:
|
||||
destination: The remote homeserver to query for the state.
|
||||
@@ -565,7 +561,8 @@ class FederationHandler(BaseHandler):
|
||||
event_id: The id of the event we want the state at.
|
||||
|
||||
Returns:
|
||||
A list of events in the state, not including the event itself.
|
||||
A list of events in the state, not including the event itself, and
|
||||
a list of events in the auth chain for the given event.
|
||||
"""
|
||||
(
|
||||
state_event_ids,
|
||||
@@ -574,53 +571,68 @@ class FederationHandler(BaseHandler):
|
||||
destination, room_id, event_id=event_id
|
||||
)
|
||||
|
||||
# Fetch the state events from the DB, and check we have the auth events.
|
||||
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
|
||||
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
|
||||
desired_events = set(state_event_ids + auth_event_ids)
|
||||
|
||||
# Check for missing events. We handle state and auth event seperately,
|
||||
# as we want to pull the state from the DB, but we don't for the auth
|
||||
# events. (Note: we likely won't use the majority of the auth chain, and
|
||||
# it can be *huge* for large rooms, so it's worth ensuring that we don't
|
||||
# unnecessarily pull it from the DB).
|
||||
missing_state_events = set(state_event_ids) - set(event_map)
|
||||
missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
|
||||
if missing_state_events or missing_auth_events:
|
||||
await self._get_events_and_persist(
|
||||
destination=destination,
|
||||
room_id=room_id,
|
||||
events=missing_state_events | missing_auth_events,
|
||||
event_map = await self._get_events_from_store_or_dest(
|
||||
destination, room_id, desired_events
|
||||
)
|
||||
|
||||
failed_to_fetch = desired_events - event_map.keys()
|
||||
if failed_to_fetch:
|
||||
logger.warning(
|
||||
"Failed to fetch missing state/auth events for %s %s",
|
||||
event_id,
|
||||
failed_to_fetch,
|
||||
)
|
||||
|
||||
if missing_state_events:
|
||||
new_events = await self.store.get_events(
|
||||
missing_state_events, allow_rejected=True
|
||||
)
|
||||
event_map.update(new_events)
|
||||
remote_state = [
|
||||
event_map[e_id] for e_id in state_event_ids if e_id in event_map
|
||||
]
|
||||
|
||||
missing_state_events.difference_update(new_events)
|
||||
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
if missing_state_events:
|
||||
logger.warning(
|
||||
"Failed to fetch missing state events for %s %s",
|
||||
event_id,
|
||||
missing_state_events,
|
||||
)
|
||||
return remote_state, auth_chain
|
||||
|
||||
if missing_auth_events:
|
||||
auth_events_in_store = await self.store.have_seen_events(
|
||||
missing_auth_events
|
||||
)
|
||||
missing_auth_events.difference_update(auth_events_in_store)
|
||||
async def _get_events_from_store_or_dest(
|
||||
self, destination: str, room_id: str, event_ids: Iterable[str]
|
||||
) -> Dict[str, EventBase]:
|
||||
"""Fetch events from a remote destination, checking if we already have them.
|
||||
|
||||
if missing_auth_events:
|
||||
logger.warning(
|
||||
"Failed to fetch missing auth events for %s %s",
|
||||
event_id,
|
||||
missing_auth_events,
|
||||
)
|
||||
Persists any events we don't already have as outliers.
|
||||
|
||||
remote_state = list(event_map.values())
|
||||
If we fail to fetch any of the events, a warning will be logged, and the event
|
||||
will be omitted from the result. Likewise, any events which turn out not to
|
||||
be in the given room.
|
||||
|
||||
This function *does not* automatically get missing auth events of the
|
||||
newly fetched events. Callers must include the full auth chain of
|
||||
of the missing events in the `event_ids` argument, to ensure that any
|
||||
missing auth events are correctly fetched.
|
||||
|
||||
Returns:
|
||||
map from event_id to event
|
||||
"""
|
||||
fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
|
||||
|
||||
missing_events = set(event_ids) - fetched_events.keys()
|
||||
|
||||
if missing_events:
|
||||
logger.debug(
|
||||
"Fetching unknown state/auth events %s for room %s",
|
||||
missing_events,
|
||||
room_id,
|
||||
)
|
||||
|
||||
await self._get_events_and_persist(
|
||||
destination=destination, room_id=room_id, events=missing_events
|
||||
)
|
||||
|
||||
# we need to make sure we re-load from the database to get the rejected
|
||||
# state correct.
|
||||
fetched_events.update(
|
||||
(await self.store.get_events(missing_events, allow_rejected=True))
|
||||
)
|
||||
|
||||
# check for events which were in the wrong room.
|
||||
#
|
||||
@@ -628,8 +640,8 @@ class FederationHandler(BaseHandler):
|
||||
# auth_events at an event in room A are actually events in room B
|
||||
|
||||
bad_events = [
|
||||
(event.event_id, event.room_id)
|
||||
for event in remote_state
|
||||
(event_id, event.room_id)
|
||||
for event_id, event in fetched_events.items()
|
||||
if event.room_id != room_id
|
||||
]
|
||||
|
||||
@@ -646,10 +658,9 @@ class FederationHandler(BaseHandler):
|
||||
room_id,
|
||||
)
|
||||
|
||||
if bad_events:
|
||||
remote_state = [e for e in remote_state if e.room_id == room_id]
|
||||
del fetched_events[bad_event_id]
|
||||
|
||||
return remote_state
|
||||
return fetched_events
|
||||
|
||||
async def _get_state_after_missing_prev_event(
|
||||
self,
|
||||
@@ -952,23 +963,27 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
# For each edge get the current state.
|
||||
|
||||
auth_events = {}
|
||||
state_events = {}
|
||||
events_to_state = {}
|
||||
for e_id in edges:
|
||||
state = await self._get_state_for_room(
|
||||
state, auth = await self._get_state_for_room(
|
||||
destination=dest,
|
||||
room_id=room_id,
|
||||
event_id=e_id,
|
||||
)
|
||||
auth_events.update({a.event_id: a for a in auth})
|
||||
auth_events.update({s.event_id: s for s in state})
|
||||
state_events.update({s.event_id: s for s in state})
|
||||
events_to_state[e_id] = state
|
||||
|
||||
required_auth = {
|
||||
a_id
|
||||
for event in events + list(state_events.values())
|
||||
for event in events
|
||||
+ list(state_events.values())
|
||||
+ list(auth_events.values())
|
||||
for a_id in event.auth_event_ids()
|
||||
}
|
||||
auth_events = await self.store.get_events(required_auth, allow_rejected=True)
|
||||
auth_events.update(
|
||||
{e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
|
||||
)
|
||||
@@ -2431,9 +2446,7 @@ class FederationHandler(BaseHandler):
|
||||
# If we are going to send this event over federation we precaclculate
|
||||
# the joined hosts.
|
||||
if event.internal_metadata.get_send_on_behalf_of():
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(
|
||||
event, context
|
||||
)
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(event)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
"""Utilities for interacting with Identity Servers"""
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
@@ -41,16 +41,13 @@ from synapse.util.stringutils import (
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
id_server_scheme = "https://"
|
||||
|
||||
|
||||
class IdentityHandler(BaseHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
|
||||
# An HTTP client for contacting trusted URLs.
|
||||
@@ -83,7 +80,7 @@ class IdentityHandler(BaseHandler):
|
||||
request: SynapseRequest,
|
||||
medium: str,
|
||||
address: str,
|
||||
) -> None:
|
||||
):
|
||||
"""Used to ratelimit requests to `/requestToken` by IP and address.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -15,11 +15,10 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import random
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse import event_auth
|
||||
@@ -44,15 +43,14 @@ from synapse.events import EventBase
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
||||
from synapse.util import json_decoder, json_encoder, log_failure
|
||||
from synapse.util.async_helpers import Linearizer, unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@@ -68,7 +66,7 @@ logger = logging.getLogger(__name__)
|
||||
class MessageHandler:
|
||||
"""Contains some read only APIs to get state about a room"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
self.auth = hs.get_auth()
|
||||
self.clock = hs.get_clock()
|
||||
self.state = hs.get_state_handler()
|
||||
@@ -93,7 +91,7 @@ class MessageHandler:
|
||||
room_id: str,
|
||||
event_type: str,
|
||||
state_key: str,
|
||||
) -> Optional[EventBase]:
|
||||
) -> dict:
|
||||
"""Get data from a room.
|
||||
|
||||
Args:
|
||||
@@ -117,10 +115,6 @@ class MessageHandler:
|
||||
data = await self.state.get_current_state(room_id, event_type, state_key)
|
||||
elif membership == Membership.LEAVE:
|
||||
key = (event_type, state_key)
|
||||
# If the membership is not JOIN, then the event ID should exist.
|
||||
assert (
|
||||
membership_event_id is not None
|
||||
), "check_user_in_room_or_world_readable returned invalid data"
|
||||
room_state = await self.state_store.get_state_for_events(
|
||||
[membership_event_id], StateFilter.from_types([key])
|
||||
)
|
||||
@@ -192,12 +186,10 @@ class MessageHandler:
|
||||
|
||||
event = last_events[0]
|
||||
if visible_events:
|
||||
room_state_events = await self.state_store.get_state_for_events(
|
||||
room_state = await self.state_store.get_state_for_events(
|
||||
[event.event_id], state_filter=state_filter
|
||||
)
|
||||
room_state = room_state_events[
|
||||
event.event_id
|
||||
] # type: Mapping[Any, EventBase]
|
||||
room_state = room_state[event.event_id]
|
||||
else:
|
||||
raise AuthError(
|
||||
403,
|
||||
@@ -218,14 +210,10 @@ class MessageHandler:
|
||||
)
|
||||
room_state = await self.store.get_events(state_ids.values())
|
||||
elif membership == Membership.LEAVE:
|
||||
# If the membership is not JOIN, then the event ID should exist.
|
||||
assert (
|
||||
membership_event_id is not None
|
||||
), "check_user_in_room_or_world_readable returned invalid data"
|
||||
room_state_events = await self.state_store.get_state_for_events(
|
||||
room_state = await self.state_store.get_state_for_events(
|
||||
[membership_event_id], state_filter=state_filter
|
||||
)
|
||||
room_state = room_state_events[membership_event_id]
|
||||
room_state = room_state[membership_event_id]
|
||||
|
||||
now = self.clock.time_msec()
|
||||
events = await self._event_serializer.serialize_events(
|
||||
@@ -260,7 +248,7 @@ class MessageHandler:
|
||||
"Getting joined members after leaving is not implemented"
|
||||
)
|
||||
|
||||
users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
|
||||
users_with_profile = await self.state.get_current_users_in_room(room_id)
|
||||
|
||||
# If this is an AS, double check that they are allowed to see the members.
|
||||
# This can either be because the AS user is in the room or because there
|
||||
@@ -459,19 +447,6 @@ class EventCreationHandler:
|
||||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
|
||||
# Stores the state groups we've recently added to the joined hosts
|
||||
# external cache. Note that the timeout must be significantly less than
|
||||
# the TTL on the external cache.
|
||||
self._external_cache_joined_hosts_updates = (
|
||||
None
|
||||
) # type: Optional[ExpiringCache]
|
||||
if self._external_cache.is_enabled():
|
||||
self._external_cache_joined_hosts_updates = ExpiringCache(
|
||||
"_external_cache_joined_hosts_updates",
|
||||
self.clock,
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
async def create_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -980,44 +955,10 @@ class EventCreationHandler:
|
||||
logger.exception("Failed to encode content: %r", event.content)
|
||||
raise
|
||||
|
||||
# We now persist the event (and update the cache in parallel, since we
|
||||
# don't want to block on it).
|
||||
result = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self._persist_event,
|
||||
requester=requester,
|
||||
event=event,
|
||||
context=context,
|
||||
ratelimit=ratelimit,
|
||||
extra_users=extra_users,
|
||||
),
|
||||
run_in_background(
|
||||
self.cache_joined_hosts_for_event, event, context
|
||||
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
return result[0]
|
||||
|
||||
async def _persist_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
event: EventBase,
|
||||
context: EventContext,
|
||||
ratelimit: bool = True,
|
||||
extra_users: Optional[List[UserID]] = None,
|
||||
) -> EventBase:
|
||||
"""Actually persists the event. Should only be called by
|
||||
`handle_new_client_event`, and see its docstring for documentation of
|
||||
the arguments.
|
||||
"""
|
||||
|
||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||
|
||||
await self.cache_joined_hosts_for_event(event)
|
||||
|
||||
try:
|
||||
# If we're a worker we need to hit out to the master.
|
||||
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
||||
@@ -1057,9 +998,7 @@ class EventCreationHandler:
|
||||
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||
raise
|
||||
|
||||
async def cache_joined_hosts_for_event(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
|
||||
"""Precalculate the joined hosts at the event, when using Redis, so that
|
||||
external federation senders don't have to recalculate it themselves.
|
||||
"""
|
||||
@@ -1067,9 +1006,6 @@ class EventCreationHandler:
|
||||
if not self._external_cache.is_enabled():
|
||||
return
|
||||
|
||||
# If external cache is enabled we should always have this.
|
||||
assert self._external_cache_joined_hosts_updates is not None
|
||||
|
||||
# We actually store two mappings, event ID -> prev state group,
|
||||
# state group -> joined hosts, which is much more space efficient
|
||||
# than event ID -> joined hosts.
|
||||
@@ -1077,28 +1013,22 @@ class EventCreationHandler:
|
||||
# Note: We have to cache event ID -> prev state group, as we don't
|
||||
# store that in the DB.
|
||||
#
|
||||
# Note: We set the state group -> joined hosts cache if it hasn't been
|
||||
# set for a while, so that the expiry time is reset.
|
||||
# Note: We always set the state group -> joined hosts cache, even if
|
||||
# we already set it, so that the expiry time is reset.
|
||||
|
||||
state_entry = await self.state.resolve_state_groups_for_events(
|
||||
event.room_id, event_ids=event.prev_event_ids()
|
||||
)
|
||||
|
||||
if state_entry.state_group:
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
await self._external_cache.set(
|
||||
"event_to_prev_state_group",
|
||||
event.event_id,
|
||||
state_entry.state_group,
|
||||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
|
||||
if state_entry.state_group in self._external_cache_joined_hosts_updates:
|
||||
return
|
||||
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
# Note that the expiry times must be larger than the expiry time in
|
||||
# _external_cache_joined_hosts_updates.
|
||||
await self._external_cache.set(
|
||||
"get_joined_hosts",
|
||||
str(state_entry.state_group),
|
||||
@@ -1106,8 +1036,6 @@ class EventCreationHandler:
|
||||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
|
||||
self._external_cache_joined_hosts_updates[state_entry.state_group] = None
|
||||
|
||||
async def _validate_canonical_alias(
|
||||
self, directory_handler, room_alias_str: str, expected_room_id: str
|
||||
) -> None:
|
||||
|
||||
@@ -1183,16 +1183,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
max_pos, deltas = await self.store.get_current_state_deltas(
|
||||
self._event_pos, room_max_stream_ordering
|
||||
)
|
||||
|
||||
# We may get multiple deltas for different rooms, but we want to
|
||||
# handle them on a room by room basis, so we batch them up by
|
||||
# room.
|
||||
deltas_by_room: Dict[str, List[JsonDict]] = {}
|
||||
for delta in deltas:
|
||||
deltas_by_room.setdefault(delta["room_id"], []).append(delta)
|
||||
|
||||
for room_id, deltas_for_room in deltas_by_room.items():
|
||||
await self._handle_state_delta(room_id, deltas_for_room)
|
||||
await self._handle_state_delta(deltas)
|
||||
|
||||
self._event_pos = max_pos
|
||||
|
||||
@@ -1201,21 +1192,17 @@ class PresenceHandler(BasePresenceHandler):
|
||||
max_pos
|
||||
)
|
||||
|
||||
async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
|
||||
"""Process current state deltas for the room to find new joins that need
|
||||
to be handled.
|
||||
async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
|
||||
"""Process current state deltas to find new joins that need to be
|
||||
handled.
|
||||
"""
|
||||
|
||||
# Sets of newly joined users. Note that if the local server is
|
||||
# joining a remote room for the first time we'll see both the joining
|
||||
# user and all remote users as newly joined.
|
||||
newly_joined_users = set()
|
||||
# A map of destination to a set of user state that they should receive
|
||||
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
|
||||
|
||||
for delta in deltas:
|
||||
assert room_id == delta["room_id"]
|
||||
|
||||
typ = delta["type"]
|
||||
state_key = delta["state_key"]
|
||||
room_id = delta["room_id"]
|
||||
event_id = delta["event_id"]
|
||||
prev_event_id = delta["prev_event_id"]
|
||||
|
||||
@@ -1244,55 +1231,72 @@ class PresenceHandler(BasePresenceHandler):
|
||||
# Ignore changes to join events.
|
||||
continue
|
||||
|
||||
newly_joined_users.add(state_key)
|
||||
# Retrieve any user presence state updates that need to be sent as a result,
|
||||
# and the destinations that need to receive it
|
||||
destinations, user_presence_states = await self._on_user_joined_room(
|
||||
room_id, state_key
|
||||
)
|
||||
|
||||
if not newly_joined_users:
|
||||
# If nobody has joined then there's nothing to do.
|
||||
return
|
||||
# Insert the destinations and respective updates into our destinations dict
|
||||
for destination in destinations:
|
||||
presence_destinations.setdefault(destination, set()).update(
|
||||
user_presence_states
|
||||
)
|
||||
|
||||
# We want to send:
|
||||
# 1. presence states of all local users in the room to newly joined
|
||||
# remote servers
|
||||
# 2. presence states of newly joined users to all remote servers in
|
||||
# the room.
|
||||
#
|
||||
# TODO: Only send presence states to remote hosts that don't already
|
||||
# have them (because they already share rooms).
|
||||
# Send out user presence updates for each destination
|
||||
for destination, user_state_set in presence_destinations.items():
|
||||
self._federation_queue.send_presence_to_destinations(
|
||||
destinations=[destination], states=user_state_set
|
||||
)
|
||||
|
||||
# Get all the users who were already in the room, by fetching the
|
||||
# current users in the room and removing the newly joined users.
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
prev_users = set(users) - newly_joined_users
|
||||
async def _on_user_joined_room(
|
||||
self, room_id: str, user_id: str
|
||||
) -> Tuple[List[str], List[UserPresenceState]]:
|
||||
"""Called when we detect a user joining the room via the current state
|
||||
delta stream. Returns the destinations that need to be updated and the
|
||||
presence updates to send to them.
|
||||
|
||||
# Construct sets for all the local users and remote hosts that were
|
||||
# already in the room
|
||||
prev_local_users = []
|
||||
prev_remote_hosts = set()
|
||||
for user_id in prev_users:
|
||||
if self.is_mine_id(user_id):
|
||||
prev_local_users.append(user_id)
|
||||
else:
|
||||
prev_remote_hosts.add(get_domain_from_id(user_id))
|
||||
Args:
|
||||
room_id: The ID of the room that the user has joined.
|
||||
user_id: The ID of the user that has joined the room.
|
||||
|
||||
# Similarly, construct sets for all the local users and remote hosts
|
||||
# that were *not* already in the room. Care needs to be taken with the
|
||||
# calculating the remote hosts, as a host may have already been in the
|
||||
# room even if there is a newly joined user from that host.
|
||||
newly_joined_local_users = []
|
||||
newly_joined_remote_hosts = set()
|
||||
for user_id in newly_joined_users:
|
||||
if self.is_mine_id(user_id):
|
||||
newly_joined_local_users.append(user_id)
|
||||
else:
|
||||
host = get_domain_from_id(user_id)
|
||||
if host not in prev_remote_hosts:
|
||||
newly_joined_remote_hosts.add(host)
|
||||
Returns:
|
||||
A tuple of destinations and presence updates to send to them.
|
||||
"""
|
||||
if self.is_mine_id(user_id):
|
||||
# If this is a local user then we need to send their presence
|
||||
# out to hosts in the room (who don't already have it)
|
||||
|
||||
# Send presence states of all local users in the room to newly joined
|
||||
# remote servers. (We actually only send states for local users already
|
||||
# in the room, as we'll send states for newly joined local users below.)
|
||||
if prev_local_users and newly_joined_remote_hosts:
|
||||
local_states = await self.current_state_for_users(prev_local_users)
|
||||
# TODO: We should be able to filter the hosts down to those that
|
||||
# haven't previously seen the user
|
||||
|
||||
remote_hosts = await self.state.get_current_hosts_in_room(room_id)
|
||||
|
||||
# Filter out ourselves.
|
||||
filtered_remote_hosts = [
|
||||
host for host in remote_hosts if host != self.server_name
|
||||
]
|
||||
|
||||
state = await self.current_state_for_user(user_id)
|
||||
return filtered_remote_hosts, [state]
|
||||
else:
|
||||
# A remote user has joined the room, so we need to:
|
||||
# 1. Check if this is a new server in the room
|
||||
# 2. If so send any presence they don't already have for
|
||||
# local users in the room.
|
||||
|
||||
# TODO: We should be able to filter the users down to those that
|
||||
# the server hasn't previously seen
|
||||
|
||||
# TODO: Check that this is actually a new server joining the
|
||||
# room.
|
||||
|
||||
remote_host = get_domain_from_id(user_id)
|
||||
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
user_ids = list(filter(self.is_mine_id, users))
|
||||
|
||||
states_d = await self.current_state_for_users(user_ids)
|
||||
|
||||
# Filter out old presence, i.e. offline presence states where
|
||||
# the user hasn't been active for a week. We can change this
|
||||
@@ -1302,27 +1306,13 @@ class PresenceHandler(BasePresenceHandler):
|
||||
now = self.clock.time_msec()
|
||||
states = [
|
||||
state
|
||||
for state in local_states.values()
|
||||
for state in states_d.values()
|
||||
if state.state != PresenceState.OFFLINE
|
||||
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
|
||||
or state.status_msg is not None
|
||||
]
|
||||
|
||||
self._federation_queue.send_presence_to_destinations(
|
||||
destinations=newly_joined_remote_hosts,
|
||||
states=states,
|
||||
)
|
||||
|
||||
# Send presence states of newly joined users to all remote servers in
|
||||
# the room
|
||||
if newly_joined_local_users and (
|
||||
prev_remote_hosts or newly_joined_remote_hosts
|
||||
):
|
||||
local_states = await self.current_state_for_users(newly_joined_local_users)
|
||||
self._federation_queue.send_presence_to_destinations(
|
||||
destinations=prev_remote_hosts | newly_joined_remote_hosts,
|
||||
states=list(local_states.values()),
|
||||
)
|
||||
return [remote_host], states
|
||||
|
||||
|
||||
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
|
||||
|
||||
@@ -722,7 +722,9 @@ class RegistrationHandler(BaseHandler):
|
||||
)
|
||||
if is_guest:
|
||||
assert valid_until_ms is None
|
||||
access_token = self.macaroon_gen.generate_guest_access_token(user_id)
|
||||
access_token = self.macaroon_gen.generate_access_token(
|
||||
user_id, ["guest = true"]
|
||||
)
|
||||
else:
|
||||
access_token = await self._auth_handler.get_access_token_for_user_id(
|
||||
user_id,
|
||||
|
||||
@@ -32,14 +32,7 @@ from synapse.api.constants import (
|
||||
RoomCreationPreset,
|
||||
RoomEncryptionAlgorithms,
|
||||
)
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
LimitExceededError,
|
||||
NotFoundError,
|
||||
StoreError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||
from synapse.events import EventBase
|
||||
@@ -133,6 +126,10 @@ class RoomCreationHandler(BaseHandler):
|
||||
|
||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||
|
||||
self._invite_burst_count = (
|
||||
hs.config.ratelimiting.rc_invites_per_room.burst_count
|
||||
)
|
||||
|
||||
async def upgrade_room(
|
||||
self, requester: Requester, old_room_id: str, new_version: RoomVersion
|
||||
) -> str:
|
||||
@@ -679,18 +676,8 @@ class RoomCreationHandler(BaseHandler):
|
||||
invite_3pid_list = []
|
||||
invite_list = []
|
||||
|
||||
if invite_list or invite_3pid_list:
|
||||
try:
|
||||
# If there are invites in the request, see if the ratelimiting settings
|
||||
# allow that number of invites to be sent from the current user.
|
||||
await self.room_member_handler.ratelimit_multiple_invites(
|
||||
requester,
|
||||
room_id=None,
|
||||
n_invites=len(invite_list) + len(invite_3pid_list),
|
||||
update=False,
|
||||
)
|
||||
except LimitExceededError:
|
||||
raise SynapseError(400, "Cannot invite so many users at once")
|
||||
if len(invite_list) + len(invite_3pid_list) > self._invite_burst_count:
|
||||
raise SynapseError(400, "Cannot invite so many users at once")
|
||||
|
||||
await self.event_creation_handler.assert_accepted_privacy_policy(requester)
|
||||
|
||||
@@ -1340,7 +1327,7 @@ class RoomShutdownHandler:
|
||||
new_room_id = None
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
kicked_users = []
|
||||
failed_to_kick_users = []
|
||||
for user_id in users:
|
||||
|
||||
@@ -163,31 +163,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
async def forget(self, user: UserID, room_id: str) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
async def ratelimit_multiple_invites(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
room_id: Optional[str],
|
||||
n_invites: int,
|
||||
update: bool = True,
|
||||
):
|
||||
"""Ratelimit more than one invite sent by the given requester in the given room.
|
||||
|
||||
Args:
|
||||
requester: The requester sending the invites.
|
||||
room_id: The room the invites are being sent in.
|
||||
n_invites: The amount of invites to ratelimit for.
|
||||
update: Whether to update the ratelimiter's cache.
|
||||
|
||||
Raises:
|
||||
LimitExceededError: The requester can't send that many invites in the room.
|
||||
"""
|
||||
await self._invites_per_room_limiter.ratelimit(
|
||||
requester,
|
||||
room_id,
|
||||
update=update,
|
||||
n_actions=n_invites,
|
||||
)
|
||||
|
||||
async def ratelimit_invite(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
@@ -1069,7 +1044,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
|
||||
class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
|
||||
self.distributor = hs.get_distributor()
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import re
|
||||
from collections import deque
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast
|
||||
|
||||
@@ -227,23 +226,6 @@ class SpaceSummaryHandler:
|
||||
suggested_only: bool,
|
||||
max_children: Optional[int],
|
||||
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
|
||||
"""
|
||||
Generate a room entry and a list of event entries for a given room.
|
||||
|
||||
Args:
|
||||
requester: The requesting user, or None if this is over federation.
|
||||
room_id: The room ID to summarize.
|
||||
suggested_only: True if only suggested children should be returned.
|
||||
Otherwise, all children are returned.
|
||||
max_children: The maximum number of children to return for this node.
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
An iterable of a single value of the room.
|
||||
|
||||
An iterable of the sorted children events. This may be limited
|
||||
to a maximum size or may include all children.
|
||||
"""
|
||||
if not await self._is_room_accessible(room_id, requester):
|
||||
return (), ()
|
||||
|
||||
@@ -306,7 +288,6 @@ class SpaceSummaryHandler:
|
||||
ev.data
|
||||
for ev in res.events
|
||||
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
|
||||
or ev.event_type == EventTypes.SpaceChild
|
||||
)
|
||||
|
||||
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
|
||||
@@ -350,9 +331,7 @@ class SpaceSummaryHandler:
|
||||
)
|
||||
|
||||
# TODO: update once MSC1772 lands
|
||||
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
|
||||
if not room_type:
|
||||
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
|
||||
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
|
||||
|
||||
entry = {
|
||||
"room_id": stats["room_id"],
|
||||
@@ -365,7 +344,6 @@ class SpaceSummaryHandler:
|
||||
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
|
||||
),
|
||||
"guest_can_join": stats["guest_access"] == "can_join",
|
||||
"creation_ts": create_event.origin_server_ts,
|
||||
"room_type": room_type,
|
||||
}
|
||||
|
||||
@@ -375,18 +353,6 @@ class SpaceSummaryHandler:
|
||||
return room_entry
|
||||
|
||||
async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
|
||||
"""
|
||||
Get the child events for a given room.
|
||||
|
||||
The returned results are sorted for stability.
|
||||
|
||||
Args:
|
||||
room_id: The room id to get the children of.
|
||||
|
||||
Returns:
|
||||
An iterable of sorted child events.
|
||||
"""
|
||||
|
||||
# look for child rooms/spaces.
|
||||
current_state_ids = await self._store.get_current_state_ids(room_id)
|
||||
|
||||
@@ -394,15 +360,13 @@ class SpaceSummaryHandler:
|
||||
[
|
||||
event_id
|
||||
for key, event_id in current_state_ids.items()
|
||||
# TODO: update once MSC1772 has been FCP for a period of time.
|
||||
# TODO: update once MSC1772 lands
|
||||
if key[0] == EventTypes.MSC1772_SPACE_CHILD
|
||||
or key[0] == EventTypes.SpaceChild
|
||||
]
|
||||
)
|
||||
|
||||
# filter out any events without a "via" (which implies it has been redacted),
|
||||
# and order to ensure we return stable results.
|
||||
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
|
||||
# filter out any events without a "via" (which implies it has been redacted)
|
||||
return (e for e in events if _has_valid_via(e))
|
||||
|
||||
|
||||
@attr.s(frozen=True, slots=True)
|
||||
@@ -428,39 +392,3 @@ def _is_suggested_child_event(edge_event: EventBase) -> bool:
|
||||
return True
|
||||
logger.debug("Ignorning not-suggested child %s", edge_event.state_key)
|
||||
return False
|
||||
|
||||
|
||||
# Order may only contain characters in the range of \x20 (space) to \x7F (~).
|
||||
_INVALID_ORDER_CHARS_RE = re.compile(r"[^\x20-\x7F]")
|
||||
|
||||
|
||||
def _child_events_comparison_key(child: EventBase) -> Tuple[bool, Optional[str], str]:
|
||||
"""
|
||||
Generate a value for comparing two child events for ordering.
|
||||
|
||||
The rules for ordering are supposed to be:
|
||||
|
||||
1. The 'order' key, if it is valid.
|
||||
2. The 'origin_server_ts' of the 'm.room.create' event.
|
||||
3. The 'room_id'.
|
||||
|
||||
But we skip step 2 since we may not have any state from the room.
|
||||
|
||||
Args:
|
||||
child: The event for generating a comparison key.
|
||||
|
||||
Returns:
|
||||
The comparison key as a tuple of:
|
||||
False if the ordering is valid.
|
||||
The ordering field.
|
||||
The room ID.
|
||||
"""
|
||||
order = child.content.get("order")
|
||||
# If order is not a string or doesn't meet the requirements, ignore it.
|
||||
if not isinstance(order, str):
|
||||
order = None
|
||||
elif len(order) > 50 or _INVALID_ORDER_CHARS_RE.search(order):
|
||||
order = None
|
||||
|
||||
# Items without an order come last.
|
||||
return (order is None, order, child.room_id)
|
||||
|
||||
@@ -1190,7 +1190,7 @@ class SyncHandler:
|
||||
|
||||
# Step 1b, check for newly joined rooms
|
||||
for room_id in newly_joined_rooms:
|
||||
joined_users = await self.store.get_users_in_room(room_id)
|
||||
joined_users = await self.state.get_current_users_in_room(room_id)
|
||||
newly_joined_or_invited_users.update(joined_users)
|
||||
|
||||
# TODO: Check that these users are actually new, i.e. either they
|
||||
@@ -1206,7 +1206,7 @@ class SyncHandler:
|
||||
|
||||
# Now find users that we no longer track
|
||||
for room_id in newly_left_rooms:
|
||||
left_users = await self.store.get_users_in_room(room_id)
|
||||
left_users = await self.state.get_current_users_in_room(room_id)
|
||||
newly_left_users.update(left_users)
|
||||
|
||||
# Remove any users that we still share a room with.
|
||||
@@ -1361,7 +1361,7 @@ class SyncHandler:
|
||||
|
||||
extra_users_ids = set(newly_joined_or_invited_users)
|
||||
for room_id in newly_joined_rooms:
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
extra_users_ids.update(users)
|
||||
extra_users_ids.discard(user.to_string())
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import Any
|
||||
|
||||
from twisted.web.client import PartialDownloadError
|
||||
|
||||
@@ -22,16 +22,13 @@ from synapse.api.errors import Codes, LoginError, SynapseError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UserInteractiveAuthChecker:
|
||||
"""Abstract base class for an interactive auth checker"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
pass
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
@@ -60,10 +57,10 @@ class UserInteractiveAuthChecker:
|
||||
class DummyAuthChecker(UserInteractiveAuthChecker):
|
||||
AUTH_TYPE = LoginType.DUMMY
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
def is_enabled(self):
|
||||
return True
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
async def check_auth(self, authdict, clientip):
|
||||
return True
|
||||
|
||||
|
||||
@@ -73,24 +70,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker):
|
||||
def is_enabled(self):
|
||||
return True
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
async def check_auth(self, authdict, clientip):
|
||||
return True
|
||||
|
||||
|
||||
class RecaptchaAuthChecker(UserInteractiveAuthChecker):
|
||||
AUTH_TYPE = LoginType.RECAPTCHA
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
self._enabled = bool(hs.config.recaptcha_private_key)
|
||||
self._http_client = hs.get_proxied_http_client()
|
||||
self._url = hs.config.recaptcha_siteverify_api
|
||||
self._secret = hs.config.recaptcha_private_key
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
def is_enabled(self):
|
||||
return self._enabled
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
async def check_auth(self, authdict, clientip):
|
||||
try:
|
||||
user_response = authdict["response"]
|
||||
except KeyError:
|
||||
@@ -135,11 +132,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
|
||||
|
||||
|
||||
class _BaseThreepidAuthChecker:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
async def _check_threepid(self, medium: str, authdict: dict) -> dict:
|
||||
async def _check_threepid(self, medium, authdict):
|
||||
if "threepid_creds" not in authdict:
|
||||
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
|
||||
|
||||
@@ -209,31 +206,31 @@ class _BaseThreepidAuthChecker:
|
||||
class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
|
||||
AUTH_TYPE = LoginType.EMAIL_IDENTITY
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
UserInteractiveAuthChecker.__init__(self, hs)
|
||||
_BaseThreepidAuthChecker.__init__(self, hs)
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
def is_enabled(self):
|
||||
return self.hs.config.threepid_behaviour_email in (
|
||||
ThreepidBehaviour.REMOTE,
|
||||
ThreepidBehaviour.LOCAL,
|
||||
)
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
async def check_auth(self, authdict, clientip):
|
||||
return await self._check_threepid("email", authdict)
|
||||
|
||||
|
||||
class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
|
||||
AUTH_TYPE = LoginType.MSISDN
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
def __init__(self, hs):
|
||||
UserInteractiveAuthChecker.__init__(self, hs)
|
||||
_BaseThreepidAuthChecker.__init__(self, hs)
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
def is_enabled(self):
|
||||
return bool(self.hs.config.account_threepid_delegate_msisdn)
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
async def check_auth(self, authdict, clientip):
|
||||
return await self._check_threepid("msisdn", authdict)
|
||||
|
||||
|
||||
|
||||
@@ -12,13 +12,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
# These are imported to allow for nicer logging configuration files.
|
||||
from synapse.logging._remote import RemoteHandler
|
||||
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
|
||||
|
||||
# These are imported to allow for nicer logging configuration files.
|
||||
__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
|
||||
|
||||
# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc
|
||||
issue9533_logger = logging.getLogger("synapse.9533_debug")
|
||||
|
||||
@@ -535,13 +535,6 @@ class ReactorLastSeenMetric:
|
||||
|
||||
REGISTRY.register(ReactorLastSeenMetric())
|
||||
|
||||
# The minimum time in seconds between GCs for each generation, regardless of the current GC
|
||||
# thresholds and counts.
|
||||
MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
|
||||
|
||||
# The time (in seconds since the epoch) of the last time we did a GC for each generation.
|
||||
_last_gc = [0.0, 0.0, 0.0]
|
||||
|
||||
|
||||
def runUntilCurrentTimer(reactor, func):
|
||||
@functools.wraps(func)
|
||||
@@ -582,16 +575,11 @@ def runUntilCurrentTimer(reactor, func):
|
||||
return ret
|
||||
|
||||
# Check if we need to do a manual GC (since its been disabled), and do
|
||||
# one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
|
||||
# promote an object into gen 2, and we don't want to handle the same
|
||||
# object multiple times.
|
||||
# one if necessary.
|
||||
threshold = gc.get_threshold()
|
||||
counts = gc.get_count()
|
||||
for i in (2, 1, 0):
|
||||
# We check if we need to do one based on a straightforward
|
||||
# comparison between the threshold and count. We also do an extra
|
||||
# check to make sure that we don't a GC too often.
|
||||
if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
|
||||
if threshold[i] < counts[i]:
|
||||
if i == 0:
|
||||
logger.debug("Collecting gc %d", i)
|
||||
else:
|
||||
@@ -601,8 +589,6 @@ def runUntilCurrentTimer(reactor, func):
|
||||
unreachable = gc.collect(i)
|
||||
end = time.time()
|
||||
|
||||
_last_gc[i] = end
|
||||
|
||||
gc_time.labels(i).observe(end - start)
|
||||
gc_unreachable.labels(i).set(unreachable)
|
||||
|
||||
@@ -629,7 +615,6 @@ try:
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
__all__ = [
|
||||
"MetricsResource",
|
||||
"generate_latest",
|
||||
|
||||
@@ -1,196 +0,0 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ctypes
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
from synapse.metrics import REGISTRY, GaugeMetricFamily
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _setup_jemalloc_stats():
|
||||
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
|
||||
statistics exposed by jemalloc.
|
||||
"""
|
||||
|
||||
# Try to find the loaded jemalloc shared library, if any. We need to
|
||||
# introspect into what is loaded, rather than loading whatever is on the
|
||||
# path, as if we load a *different* jemalloc version things will seg fault.
|
||||
|
||||
# We look in `/proc/self/maps`, which only exists on linux.
|
||||
if not os.path.exists("/proc/self/maps"):
|
||||
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
|
||||
return
|
||||
|
||||
# We're looking for a path at the end of the line that includes
|
||||
# "libjemalloc".
|
||||
regex = re.compile(r"/\S+/libjemalloc.*$")
|
||||
|
||||
jemalloc_path = None
|
||||
with open("/proc/self/maps") as f:
|
||||
for line in f:
|
||||
match = regex.search(line.strip())
|
||||
if match:
|
||||
jemalloc_path = match.group()
|
||||
|
||||
if not jemalloc_path:
|
||||
# No loaded jemalloc was found.
|
||||
logger.debug("jemalloc not found")
|
||||
return
|
||||
|
||||
logger.debug("Found jemalloc at %s", jemalloc_path)
|
||||
|
||||
jemalloc = ctypes.CDLL(jemalloc_path)
|
||||
|
||||
def _mallctl(
|
||||
name: str, read: bool = True, write: Optional[int] = None
|
||||
) -> Optional[int]:
|
||||
"""Wrapper around `mallctl` for reading and writing integers to
|
||||
jemalloc.
|
||||
|
||||
Args:
|
||||
name: The name of the option to read from/write to.
|
||||
read: Whether to try and read the value.
|
||||
write: The value to write, if given.
|
||||
|
||||
Returns:
|
||||
The value read if `read` is True, otherwise None.
|
||||
|
||||
Raises:
|
||||
An exception if `mallctl` returns a non-zero error code.
|
||||
"""
|
||||
|
||||
input_var = None
|
||||
input_var_ref = None
|
||||
input_len_ref = None
|
||||
if read:
|
||||
input_var = ctypes.c_size_t(0)
|
||||
input_len = ctypes.c_size_t(ctypes.sizeof(input_var))
|
||||
|
||||
input_var_ref = ctypes.byref(input_var)
|
||||
input_len_ref = ctypes.byref(input_len)
|
||||
|
||||
write_var_ref = None
|
||||
write_len = ctypes.c_size_t(0)
|
||||
if write is not None:
|
||||
write_var = ctypes.c_size_t(write)
|
||||
write_len = ctypes.c_size_t(ctypes.sizeof(write_var))
|
||||
|
||||
write_var_ref = ctypes.byref(write_var)
|
||||
|
||||
# The interface is:
|
||||
#
|
||||
# int mallctl(
|
||||
# const char *name,
|
||||
# void *oldp,
|
||||
# size_t *oldlenp,
|
||||
# void *newp,
|
||||
# size_t newlen
|
||||
# )
|
||||
#
|
||||
# Where oldp/oldlenp is a buffer where the old value will be written to
|
||||
# (if not null), and newp/newlen is the buffer with the new value to set
|
||||
# (if not null). Note that they're all references *except* newlen.
|
||||
result = jemalloc.mallctl(
|
||||
name.encode("ascii"),
|
||||
input_var_ref,
|
||||
input_len_ref,
|
||||
write_var_ref,
|
||||
write_len,
|
||||
)
|
||||
|
||||
if result != 0:
|
||||
raise Exception("Failed to call mallctl")
|
||||
|
||||
if input_var is None:
|
||||
return None
|
||||
|
||||
return input_var.value
|
||||
|
||||
def _jemalloc_refresh_stats() -> None:
|
||||
"""Request that jemalloc updates its internal statistics. This needs to
|
||||
be called before querying for stats, otherwise it will return stale
|
||||
values.
|
||||
"""
|
||||
try:
|
||||
_mallctl("epoch", read=False, write=1)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to reload jemalloc stats: %s", e)
|
||||
|
||||
class JemallocCollector:
|
||||
"""Metrics for internal jemalloc stats."""
|
||||
|
||||
def collect(self):
|
||||
_jemalloc_refresh_stats()
|
||||
|
||||
g = GaugeMetricFamily(
|
||||
"jemalloc_stats_app_memory_bytes",
|
||||
"The stats reported by jemalloc",
|
||||
labels=["type"],
|
||||
)
|
||||
|
||||
# Read the relevant global stats from jemalloc. Note that these may
|
||||
# not be accurate if python is configured to use its internal small
|
||||
# object allocator (which is on by default, disable by setting the
|
||||
# env `PYTHONMALLOC=malloc`).
|
||||
#
|
||||
# See the jemalloc manpage for details about what each value means,
|
||||
# roughly:
|
||||
# - allocated ─ Total number of bytes allocated by the app
|
||||
# - active ─ Total number of bytes in active pages allocated by
|
||||
# the application, this is bigger than `allocated`.
|
||||
# - resident ─ Maximum number of bytes in physically resident data
|
||||
# pages mapped by the allocator, comprising all pages dedicated
|
||||
# to allocator metadata, pages backing active allocations, and
|
||||
# unused dirty pages. This is bigger than `active`.
|
||||
# - mapped ─ Total number of bytes in active extents mapped by the
|
||||
# allocator.
|
||||
# - metadata ─ Total number of bytes dedicated to jemalloc
|
||||
# metadata.
|
||||
for t in (
|
||||
"allocated",
|
||||
"active",
|
||||
"resident",
|
||||
"mapped",
|
||||
"metadata",
|
||||
):
|
||||
try:
|
||||
value = _mallctl(f"stats.{t}")
|
||||
except Exception as e:
|
||||
# There was an error fetching the value, skip.
|
||||
logger.warning("Failed to read jemalloc stats.%s: %s", t, e)
|
||||
continue
|
||||
|
||||
g.add_metric([t], value=value)
|
||||
|
||||
yield g
|
||||
|
||||
REGISTRY.register(JemallocCollector())
|
||||
|
||||
logger.debug("Added jemalloc stats")
|
||||
|
||||
|
||||
def setup_jemalloc_stats():
|
||||
"""Try to setup jemalloc stats, if jemalloc is loaded."""
|
||||
|
||||
try:
|
||||
_setup_jemalloc_stats()
|
||||
except Exception as e:
|
||||
# This should only happen if we find the loaded jemalloc library, but
|
||||
# fail to load it somehow (e.g. we somehow picked the wrong version).
|
||||
logger.info("Failed to setup collector to record jemalloc stats: %s", e)
|
||||
@@ -38,7 +38,6 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import log_kv, start_active_span
|
||||
from synapse.logging.utils import log_function
|
||||
@@ -427,13 +426,6 @@ class Notifier:
|
||||
for room in rooms:
|
||||
user_streams |= self.room_to_user_streams.get(room, set())
|
||||
|
||||
if stream_key == "to_device_key":
|
||||
issue9533_logger.debug(
|
||||
"to-device messages stream id %s, awaking streams for %s",
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
for user_stream in user_streams:
|
||||
try:
|
||||
|
||||
@@ -19,7 +19,6 @@ from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import UserID
|
||||
from synapse.util import glob_to_regex, re_word_boundary
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -184,7 +183,7 @@ class PushRuleEvaluatorForEvent:
|
||||
r = regex_cache.get((display_name, False, True), None)
|
||||
if not r:
|
||||
r1 = re.escape(display_name)
|
||||
r1 = re_word_boundary(r1)
|
||||
r1 = _re_word_boundary(r1)
|
||||
r = re.compile(r1, flags=re.IGNORECASE)
|
||||
regex_cache[(display_name, False, True)] = r
|
||||
|
||||
@@ -213,7 +212,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
|
||||
try:
|
||||
r = regex_cache.get((glob, True, word_boundary), None)
|
||||
if not r:
|
||||
r = glob_to_regex(glob, word_boundary)
|
||||
r = _glob_to_re(glob, word_boundary)
|
||||
regex_cache[(glob, True, word_boundary)] = r
|
||||
return bool(r.search(value))
|
||||
except re.error:
|
||||
@@ -221,6 +220,56 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _glob_to_re(glob: str, word_boundary: bool) -> Pattern:
|
||||
"""Generates regex for a given glob.
|
||||
|
||||
Args:
|
||||
glob
|
||||
word_boundary: Whether to match against word boundaries or entire string.
|
||||
"""
|
||||
if IS_GLOB.search(glob):
|
||||
r = re.escape(glob)
|
||||
|
||||
r = r.replace(r"\*", ".*?")
|
||||
r = r.replace(r"\?", ".")
|
||||
|
||||
# handle [abc], [a-z] and [!a-z] style ranges.
|
||||
r = GLOB_REGEX.sub(
|
||||
lambda x: (
|
||||
"[%s%s]" % (x.group(1) and "^" or "", x.group(2).replace(r"\\\-", "-"))
|
||||
),
|
||||
r,
|
||||
)
|
||||
if word_boundary:
|
||||
r = _re_word_boundary(r)
|
||||
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
else:
|
||||
r = "^" + r + "$"
|
||||
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
elif word_boundary:
|
||||
r = re.escape(glob)
|
||||
r = _re_word_boundary(r)
|
||||
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
else:
|
||||
r = "^" + re.escape(glob) + "$"
|
||||
return re.compile(r, flags=re.IGNORECASE)
|
||||
|
||||
|
||||
def _re_word_boundary(r: str) -> str:
|
||||
"""
|
||||
Adds word boundary characters to the start and end of an
|
||||
expression to require that the match occur as a whole word,
|
||||
but do so respecting the fact that strings starting or ending
|
||||
with non-word characters will change word boundaries.
|
||||
"""
|
||||
# we can't use \b as it chokes on unicode. however \W seems to be okay
|
||||
# as shorthand for [^0-9A-Za-z_].
|
||||
return r"(^|\W)%s(\W|$)" % (r,)
|
||||
|
||||
|
||||
def _flatten_dict(
|
||||
d: Union[EventBase, dict],
|
||||
prefix: Optional[List[str]] = None,
|
||||
|
||||
@@ -78,8 +78,7 @@ REQUIREMENTS = [
|
||||
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
|
||||
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
|
||||
# is out in November.)
|
||||
# Note: 21.1.0 broke `/sync`, see #9936
|
||||
"attrs>=19.1.0,!=21.1.0",
|
||||
"attrs>=19.1.0",
|
||||
"netaddr>=0.7.18",
|
||||
"Jinja2>=2.9",
|
||||
"bleach>=1.4.3",
|
||||
@@ -117,8 +116,6 @@ CONDITIONAL_REQUIREMENTS = {
|
||||
# hiredis is not a *strict* dependency, but it makes things much faster.
|
||||
# (if it is not installed, we fall back to slow code.)
|
||||
"redis": ["txredisapi>=1.4.7", "hiredis"],
|
||||
# Required to use experimental `caches.track_memory_usage` config option.
|
||||
"cache_memory": ["pympler"],
|
||||
}
|
||||
|
||||
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]
|
||||
|
||||
@@ -51,6 +51,7 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# How long we allow callers to wait for replication updates before timing out.
|
||||
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from prometheus_client import Counter, Histogram
|
||||
from prometheus_client import Counter
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
@@ -35,20 +35,6 @@ get_counter = Counter(
|
||||
labelnames=["cache_name", "hit"],
|
||||
)
|
||||
|
||||
response_timer = Histogram(
|
||||
"synapse_external_cache_response_time_seconds",
|
||||
"Time taken to get a response from Redis for a cache get/set request",
|
||||
labelnames=["method"],
|
||||
buckets=(
|
||||
0.001,
|
||||
0.002,
|
||||
0.005,
|
||||
0.01,
|
||||
0.02,
|
||||
0.05,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -86,14 +72,13 @@ class ExternalCache:
|
||||
|
||||
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
|
||||
|
||||
with response_timer.labels("set").time():
|
||||
return await make_deferred_yieldable(
|
||||
self._redis_connection.set(
|
||||
self._get_redis_key(cache_name, key),
|
||||
encoded_value,
|
||||
pexpire=expiry_ms,
|
||||
)
|
||||
return await make_deferred_yieldable(
|
||||
self._redis_connection.set(
|
||||
self._get_redis_key(cache_name, key),
|
||||
encoded_value,
|
||||
pexpire=expiry_ms,
|
||||
)
|
||||
)
|
||||
|
||||
async def get(self, cache_name: str, key: str) -> Optional[Any]:
|
||||
"""Look up a key/value in the named cache."""
|
||||
@@ -101,10 +86,9 @@ class ExternalCache:
|
||||
if self._redis_connection is None:
|
||||
return None
|
||||
|
||||
with response_timer.labels("get").time():
|
||||
result = await make_deferred_yieldable(
|
||||
self._redis_connection.get(self._get_redis_key(cache_name, key))
|
||||
)
|
||||
result = await make_deferred_yieldable(
|
||||
self._redis_connection.get(self._get_redis_key(cache_name, key))
|
||||
)
|
||||
|
||||
logger.debug("Got cache result %s %s: %r", cache_name, key, result)
|
||||
|
||||
|
||||
@@ -37,11 +37,9 @@ from synapse.types import JsonDict, RoomAlias, RoomID, UserID, create_requester
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.handlers.pagination import PaginationHandler
|
||||
from synapse.handlers.room import RoomShutdownHandler
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -148,14 +146,50 @@ class DeleteRoomRestServlet(RestServlet):
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
return await _delete_room(
|
||||
request,
|
||||
room_id,
|
||||
self.auth,
|
||||
self.room_shutdown_handler,
|
||||
self.pagination_handler,
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
block = content.get("block", False)
|
||||
if not isinstance(block, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'block' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge = content.get("purge", True)
|
||||
if not isinstance(purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
force_purge = content.get("force_purge", False)
|
||||
if not isinstance(force_purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'force_purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
ret = await self.room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content.get("new_room_user_id"),
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=block,
|
||||
)
|
||||
|
||||
# Purge room
|
||||
if purge:
|
||||
await self.pagination_handler.purge_room(room_id, force=force_purge)
|
||||
|
||||
return (200, ret)
|
||||
|
||||
|
||||
class ListRoomRestServlet(RestServlet):
|
||||
"""
|
||||
@@ -248,22 +282,7 @@ class ListRoomRestServlet(RestServlet):
|
||||
|
||||
|
||||
class RoomRestServlet(RestServlet):
|
||||
"""Manage a room.
|
||||
|
||||
On GET : Get details of a room.
|
||||
|
||||
On DELETE : Delete a room from server.
|
||||
|
||||
It is a combination and improvement of shutdown and purge room.
|
||||
|
||||
Shuts down a room by removing all local users from the room.
|
||||
Blocking all future invites and joins to the room is optional.
|
||||
|
||||
If desired any local aliases will be repointed to a new room
|
||||
created by `new_room_user_id` and kicked users will be auto-
|
||||
joined to the new room.
|
||||
|
||||
If 'purge' is true, it will remove all traces of a room from the database.
|
||||
"""Get room details.
|
||||
|
||||
TODO: Add on_POST to allow room creation without joining the room
|
||||
"""
|
||||
@@ -274,8 +293,6 @@ class RoomRestServlet(RestServlet):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.room_shutdown_handler = hs.get_room_shutdown_handler()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
@@ -291,17 +308,6 @@ class RoomRestServlet(RestServlet):
|
||||
|
||||
return (200, ret)
|
||||
|
||||
async def on_DELETE(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
return await _delete_room(
|
||||
request,
|
||||
room_id,
|
||||
self.auth,
|
||||
self.room_shutdown_handler,
|
||||
self.pagination_handler,
|
||||
)
|
||||
|
||||
|
||||
class RoomMembersRestServlet(RestServlet):
|
||||
"""
|
||||
@@ -688,55 +694,3 @@ class RoomEventContextServlet(RestServlet):
|
||||
)
|
||||
|
||||
return 200, results
|
||||
|
||||
|
||||
async def _delete_room(
|
||||
request: SynapseRequest,
|
||||
room_id: str,
|
||||
auth: "Auth",
|
||||
room_shutdown_handler: "RoomShutdownHandler",
|
||||
pagination_handler: "PaginationHandler",
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(auth, requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
block = content.get("block", False)
|
||||
if not isinstance(block, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'block' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge = content.get("purge", True)
|
||||
if not isinstance(purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
force_purge = content.get("force_purge", False)
|
||||
if not isinstance(force_purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'force_purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
ret = await room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content.get("new_room_user_id"),
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=block,
|
||||
)
|
||||
|
||||
# Purge room
|
||||
if purge:
|
||||
await pagination_handler.purge_room(room_id, force=force_purge)
|
||||
|
||||
return (200, ret)
|
||||
|
||||
@@ -1020,7 +1020,6 @@ class RoomSpaceSummaryRestServlet(RestServlet):
|
||||
max_rooms_per_space=parse_integer(request, "max_rooms_per_space"),
|
||||
)
|
||||
|
||||
# TODO When switching to the stable endpoint, remove the POST handler.
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
|
||||
@@ -39,6 +39,13 @@ class WellKnownBuilder:
|
||||
|
||||
result = {"m.homeserver": {"base_url": self._config.public_baseurl}}
|
||||
|
||||
# Point to BigBlueButton widget for calls
|
||||
result.update({
|
||||
"io.element.call_behaviour": {
|
||||
"widget_build_url": "http://localhost:8184/api/v1/dimension/bigbluebutton/widget_state",
|
||||
}
|
||||
})
|
||||
|
||||
if self._config.default_identity_server:
|
||||
result["m.identity_server"] = {
|
||||
"base_url": self._config.default_identity_server
|
||||
|
||||
@@ -213,23 +213,19 @@ class StateHandler:
|
||||
return ret.state
|
||||
|
||||
async def get_current_users_in_room(
|
||||
self, room_id: str, latest_event_ids: List[str]
|
||||
self, room_id: str, latest_event_ids: Optional[List[str]] = None
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
"""
|
||||
Get the users who are currently in a room.
|
||||
|
||||
Note: This is much slower than using the equivalent method
|
||||
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
|
||||
so this should only be used when wanting the users at a particular point
|
||||
in the room.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room.
|
||||
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
|
||||
Returns:
|
||||
Dictionary of user IDs to their profileinfo.
|
||||
"""
|
||||
|
||||
if not latest_event_ids:
|
||||
latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id)
|
||||
assert latest_event_ids is not None
|
||||
|
||||
logger.debug("calling resolve_state_groups from get_current_users_in_room")
|
||||
|
||||
@@ -69,7 +69,6 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
|
||||
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
import logging
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
@@ -405,13 +404,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
if remote_messages_by_destination:
|
||||
issue9533_logger.debug(
|
||||
"Queued outgoing to-device messages with stream_id %i for %s",
|
||||
stream_id,
|
||||
list(remote_messages_by_destination.keys()),
|
||||
)
|
||||
|
||||
async with self._device_inbox_id_gen.get_next() as stream_id:
|
||||
now_ms = self.clock.time_msec()
|
||||
await self.db_pool.runInteraction(
|
||||
@@ -541,16 +533,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
issue9533_logger.debug(
|
||||
"Stored to-device messages with stream_id %i for %s",
|
||||
stream_id,
|
||||
[
|
||||
(user_id, device_id)
|
||||
for (user_id, messages_by_device) in local_by_user_then_device.items()
|
||||
for device_id in messages_by_device.keys()
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
|
||||
@@ -84,9 +84,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
|
||||
if keys:
|
||||
result["keys"] = keys
|
||||
|
||||
device_display_name = None
|
||||
if self.hs.config.allow_device_name_lookup_over_federation:
|
||||
device_display_name = device.display_name
|
||||
device_display_name = device.display_name
|
||||
if device_display_name:
|
||||
result["device_display_name"] = device_display_name
|
||||
|
||||
|
||||
@@ -205,12 +205,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
|
||||
sql = """
|
||||
SELECT state_key, display_name, avatar_url FROM room_memberships as m
|
||||
INNER JOIN current_state_events as c
|
||||
ON m.event_id = c.event_id
|
||||
AND m.room_id = c.room_id
|
||||
AND m.user_id = c.state_key
|
||||
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
|
||||
SELECT user_id, display_name, avatar_url FROM room_memberships
|
||||
WHERE room_id = ? AND membership = ?
|
||||
"""
|
||||
txn.execute(sql, (room_id, Membership.JOIN))
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user