Compare commits
44 Commits
release-v1
...
rei/docker
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e940c625a8 | ||
|
|
ed5d900b9e | ||
|
|
d71674670d | ||
|
|
6d482ba259 | ||
|
|
57e4786e90 | ||
|
|
fd65139714 | ||
|
|
ec07062e31 | ||
|
|
cef0d5d90a | ||
|
|
2d3bd9aa67 | ||
|
|
2897fb6b4f | ||
|
|
d8df8e6c14 | ||
|
|
c5815567a4 | ||
|
|
95b3f952fa | ||
|
|
74e4419eb4 | ||
|
|
b8bf600700 | ||
|
|
6a72c910f1 | ||
|
|
0938f32e93 | ||
|
|
1d5f7b2cc6 | ||
|
|
b59d285f7c | ||
|
|
fc8598bc87 | ||
|
|
4210143f53 | ||
|
|
6911604a0f | ||
|
|
8e45dfbe25 | ||
|
|
b500fcbc0c | ||
|
|
0d6cfea9b8 | ||
|
|
15c2a6a106 | ||
|
|
2d327d25bf | ||
|
|
02d99f044e | ||
|
|
ec2271ac50 | ||
|
|
807efd26ae | ||
|
|
c3040dd5cc | ||
|
|
36f37acf53 | ||
|
|
df54c8485a | ||
|
|
8ff465d206 | ||
|
|
9006ee36d1 | ||
|
|
f8cf02b200 | ||
|
|
ffc61d1b69 | ||
|
|
2aa37a4250 | ||
|
|
b784299cbc | ||
|
|
9f2016e96e | ||
|
|
2277275485 | ||
|
|
c027bc0e4b | ||
|
|
4c2096599c | ||
|
|
e83520cc42 |
@@ -1,12 +1,14 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# this script is run by GitHub Actions in a plain `bionic` container; it installs the
|
||||
# this script is run by GitHub Actions in a plain `focal` container; it installs the
|
||||
# minimal requirements for tox and hands over to the py3-old tox environment.
|
||||
|
||||
# Prevent tzdata from asking for user input
|
||||
export DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
set -ex
|
||||
|
||||
apt-get update
|
||||
apt-get install -y python3 python3-dev python3-pip libxml2-dev libxslt-dev xmlsec1 zlib1g-dev tox
|
||||
apt-get install -y python3 python3-dev python3-pip libxml2-dev libxslt-dev xmlsec1 zlib1g-dev tox libjpeg-dev libwebp-dev
|
||||
|
||||
export LANG="C.UTF-8"
|
||||
|
||||
|
||||
80
.github/workflows/docker.yml
vendored
80
.github/workflows/docker.yml
vendored
@@ -34,6 +34,8 @@ jobs:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
|
||||
# TODO: consider using https://github.com/docker/metadata-action instead of this
|
||||
# custom magic
|
||||
- name: Calculate docker image tag
|
||||
id: set-tag
|
||||
run: |
|
||||
@@ -53,18 +55,6 @@ jobs:
|
||||
esac
|
||||
echo "::set-output name=tag::$tag"
|
||||
|
||||
# for release builds, we want to get the amd64 image out asap, so first
|
||||
# we do an amd64-only build, before following up with a multiarch build.
|
||||
- name: Build and push amd64
|
||||
uses: docker/build-push-action@v2
|
||||
if: "${{ startsWith(github.ref, 'refs/tags/v') }}"
|
||||
with:
|
||||
push: true
|
||||
labels: "gitsha1=${{ github.sha }}"
|
||||
tags: "matrixdotorg/synapse:${{ steps.set-tag.outputs.tag }}"
|
||||
file: "docker/Dockerfile"
|
||||
platforms: linux/amd64
|
||||
|
||||
- name: Build and push all platforms
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
@@ -73,3 +63,69 @@ jobs:
|
||||
tags: "matrixdotorg/synapse:${{ steps.set-tag.outputs.tag }}"
|
||||
file: "docker/Dockerfile"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
|
||||
build_workers_test:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
# The worker test image depends on the base image, so we must build the base
|
||||
# first.
|
||||
needs: build
|
||||
|
||||
permissions:
|
||||
packages: write
|
||||
contents: read
|
||||
|
||||
steps:
|
||||
- name: Set up QEMU
|
||||
id: qemu
|
||||
uses: docker/setup-qemu-action@v1
|
||||
with:
|
||||
platforms: arm64
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
id: buildx
|
||||
uses: docker/setup-buildx-action@v1
|
||||
|
||||
- name: Inspect builder
|
||||
run: docker buildx inspect
|
||||
|
||||
- name: Login to GitHub Container Registry (for worker-testing-only image)
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
# This must match EXACTLY the one in the `build` pipeline.
|
||||
- name: Calculate docker image tag
|
||||
id: set-tag
|
||||
run: |
|
||||
case "${GITHUB_REF}" in
|
||||
refs/heads/develop)
|
||||
tag=develop
|
||||
;;
|
||||
refs/heads/master|refs/heads/main)
|
||||
tag=latest
|
||||
;;
|
||||
refs/tags/*)
|
||||
tag=${GITHUB_REF#refs/tags/}
|
||||
;;
|
||||
*)
|
||||
tag=${GITHUB_SHA}
|
||||
;;
|
||||
esac
|
||||
echo "::set-output name=tag::$tag"
|
||||
|
||||
# This image is solely intended to be used for automated test tools,
|
||||
# such as mx-tester.
|
||||
- name: Build and push worker-testing-only image for all platforms
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
push: true
|
||||
build-args: |
|
||||
"base_version=${{ steps.set-tag.outputs.tag }}"
|
||||
labels: "gitsha1=${{ github.sha }}"
|
||||
tags: "ghcr.io/matrix-org/synapse-workers-testing-only:${{ steps.set-tag.outputs.tag }}"
|
||||
file: "docker/Dockerfile-workers"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
|
||||
36
.github/workflows/tests.yml
vendored
36
.github/workflows/tests.yml
vendored
@@ -141,7 +141,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Test with old deps
|
||||
uses: docker://ubuntu:bionic # For old python and sqlite
|
||||
uses: docker://ubuntu:focal # For old python and sqlite
|
||||
with:
|
||||
workdir: /github/workspace
|
||||
entrypoint: .ci/scripts/test_old_deps.sh
|
||||
@@ -213,15 +213,15 @@ jobs:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- sytest-tag: bionic
|
||||
- sytest-tag: focal
|
||||
|
||||
- sytest-tag: bionic
|
||||
- sytest-tag: focal
|
||||
postgres: postgres
|
||||
|
||||
- sytest-tag: testing
|
||||
postgres: postgres
|
||||
|
||||
- sytest-tag: bionic
|
||||
- sytest-tag: focal
|
||||
postgres: multi-postgres
|
||||
workers: workers
|
||||
|
||||
@@ -323,17 +323,22 @@ jobs:
|
||||
if: ${{ !failure() && !cancelled() }}
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
# https://github.com/matrix-org/complement/blob/master/dockerfiles/ComplementCIBuildkite.Dockerfile
|
||||
image: matrixdotorg/complement:latest
|
||||
env:
|
||||
CI: true
|
||||
ports:
|
||||
- 8448:8448
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
|
||||
steps:
|
||||
# The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement.
|
||||
# See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path
|
||||
- name: "Set Go Version"
|
||||
run: |
|
||||
# Add Go 1.17 to the PATH: see https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md#environment-variables-2
|
||||
echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH
|
||||
# Add the Go path to the PATH: We need this so we can call gotestfmt
|
||||
echo "~/go/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: "Install Complement Dependencies"
|
||||
run: |
|
||||
sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev
|
||||
go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest
|
||||
|
||||
- name: Run actions/checkout@v2 for synapse
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
@@ -376,8 +381,11 @@ jobs:
|
||||
working-directory: complement/dockerfiles
|
||||
|
||||
# Run Complement
|
||||
- run: set -o pipefail && go test -v -json -tags synapse_blacklist,msc2403 ./tests/... 2>&1 | gotestfmt
|
||||
- run: |
|
||||
set -o pipefail
|
||||
go test -v -json -tags synapse_blacklist,msc2403 ./tests/... 2>&1 | gotestfmt
|
||||
shell: bash
|
||||
name: Run Complement Tests
|
||||
env:
|
||||
COMPLEMENT_BASE_IMAGE: complement-synapse:latest
|
||||
working-directory: complement
|
||||
|
||||
2
.github/workflows/twisted_trunk.yml
vendored
2
.github/workflows/twisted_trunk.yml
vendored
@@ -25,7 +25,7 @@ jobs:
|
||||
- run: sudo apt-get -qq install xmlsec1
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.6
|
||||
python-version: 3.7
|
||||
- run: .ci/patch_for_twisted_trunk.sh
|
||||
- run: pip install tox
|
||||
- run: tox -e py
|
||||
|
||||
11
CHANGES.md
11
CHANGES.md
@@ -14,6 +14,17 @@ Bugfixes
|
||||
- Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. ([\#11806](https://github.com/matrix-org/synapse/issues/11806))
|
||||
|
||||
|
||||
Synapse 1.50.2 (2022-01-24)
|
||||
===========================
|
||||
|
||||
This release includes the same bugfix as Synapse 1.51.0rc2.
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. ([\#11806](https://github.com/matrix-org/synapse/issues/11806))
|
||||
|
||||
|
||||
Synapse 1.51.0rc1 (2022-01-21)
|
||||
==============================
|
||||
|
||||
|
||||
1
changelog.d/11612.bugfix
Normal file
1
changelog.d/11612.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Include the bundled aggregations in the `/sync` response, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675).
|
||||
1
changelog.d/11621.feature
Normal file
1
changelog.d/11621.feature
Normal file
@@ -0,0 +1 @@
|
||||
Remove account data (including client config, push rules and ignored users) upon user deactivation.
|
||||
1
changelog.d/11639.feature
Normal file
1
changelog.d/11639.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add admin API to reset connection timeouts for remote server.
|
||||
1
changelog.d/11658.feature
Normal file
1
changelog.d/11658.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add an admin API to get a list of rooms that federate with a given remote homeserver.
|
||||
1
changelog.d/11683.removal
Normal file
1
changelog.d/11683.removal
Normal file
@@ -0,0 +1 @@
|
||||
Drop support for Python 3.6, which is EOL.
|
||||
1
changelog.d/11743.feature
Normal file
1
changelog.d/11743.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add a config flag to inhibit M_USER_IN_USE during registration.
|
||||
1
changelog.d/11767.bugfix
Normal file
1
changelog.d/11767.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug when previewing Reddit URLs which do not contain an image.
|
||||
1
changelog.d/11784.bugfix
Normal file
1
changelog.d/11784.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug that media streams could cause long-lived connections when generating URL previews.
|
||||
1
changelog.d/11788.feature
Normal file
1
changelog.d/11788.feature
Normal file
@@ -0,0 +1 @@
|
||||
Remove account data (including client config, push rules and ignored users) upon user deactivation.
|
||||
1
changelog.d/11789.feature
Normal file
1
changelog.d/11789.feature
Normal file
@@ -0,0 +1 @@
|
||||
Remove account data (including client config, push rules and ignored users) upon user deactivation.
|
||||
1
changelog.d/11790.feature
Normal file
1
changelog.d/11790.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add a module callback to set username at registration.
|
||||
1
changelog.d/11792.misc
Normal file
1
changelog.d/11792.misc
Normal file
@@ -0,0 +1 @@
|
||||
Preparation for database schema simplifications: add `state_key` and `rejection_reason` columns to `events` table.
|
||||
1
changelog.d/11793.misc
Normal file
1
changelog.d/11793.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add `FrozenEvent.get_state_key` and use it in a couple of places.
|
||||
1
changelog.d/11794.misc
Normal file
1
changelog.d/11794.misc
Normal file
@@ -0,0 +1 @@
|
||||
Preparation for database schema simplifications: stop reading from `event_reference_hashes`.
|
||||
1
changelog.d/11795.misc
Normal file
1
changelog.d/11795.misc
Normal file
@@ -0,0 +1 @@
|
||||
Drop unused table `public_room_list_stream`.
|
||||
1
changelog.d/11798.bugfix
Normal file
1
changelog.d/11798.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Include a `prev_content` field in state events sent to Application Services. Contributed by @totallynotvaishnav.
|
||||
1
changelog.d/11799.misc
Normal file
1
changelog.d/11799.misc
Normal file
@@ -0,0 +1 @@
|
||||
Preparation for reducing Postgres serialization errors: allow setting transaction isolation level. Contributed by Nick @ Beeper.
|
||||
1
changelog.d/11810.misc
Normal file
1
changelog.d/11810.misc
Normal file
@@ -0,0 +1 @@
|
||||
Docker: skip the initial amd64-only build and go straight to multiarch.
|
||||
1
changelog.d/11811.misc
Normal file
1
changelog.d/11811.misc
Normal file
@@ -0,0 +1 @@
|
||||
Run Complement on the Github Actions VM and not inside a Docker container.
|
||||
1
changelog.d/11813.misc
Normal file
1
changelog.d/11813.misc
Normal file
@@ -0,0 +1 @@
|
||||
Log module names at startup.
|
||||
1
changelog.d/11815.misc
Normal file
1
changelog.d/11815.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve type safety of bundled aggregations code.
|
||||
1
changelog.d/11816.misc
Normal file
1
changelog.d/11816.misc
Normal file
@@ -0,0 +1 @@
|
||||
Drop support for Python 3.6, which is EOL.
|
||||
1
changelog.d/11817.misc
Normal file
1
changelog.d/11817.misc
Normal file
@@ -0,0 +1 @@
|
||||
Correct a type annotation in the event validation logic.
|
||||
1
changelog.d/11820.doc
Normal file
1
changelog.d/11820.doc
Normal file
@@ -0,0 +1 @@
|
||||
Update pypi installation docs to indicate that we now support Python 3.10.
|
||||
1
changelog.d/11821.doc
Normal file
1
changelog.d/11821.doc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing steps to the contribution submission process in the documentation. Contributed by @sequentialread.
|
||||
1
changelog.d/11823.misc
Normal file
1
changelog.d/11823.misc
Normal file
@@ -0,0 +1 @@
|
||||
Minor updates and documentation for database schema delta files.
|
||||
1
changelog.d/11827.bugfix
Normal file
1
changelog.d/11827.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug introduced in Synapse 0.33.3 causing requests to sometimes log strings such as `HTTPStatus.OK` instead of integer status codes.
|
||||
1
changelog.d/11830.misc
Normal file
1
changelog.d/11830.misc
Normal file
@@ -0,0 +1 @@
|
||||
Correct a type annotation in the event validation logic.
|
||||
1
changelog.d/11834.misc
Normal file
1
changelog.d/11834.misc
Normal file
@@ -0,0 +1 @@
|
||||
Workaround a type annotation problem in `prometheus_client` 0.13.0.
|
||||
1
changelog.d/11836.misc
Normal file
1
changelog.d/11836.misc
Normal file
@@ -0,0 +1 @@
|
||||
Minor performance improvement in room state lookup.
|
||||
1
changelog.d/11838.misc
Normal file
1
changelog.d/11838.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix some indentation inconsistencies in the sample config.
|
||||
1
changelog.d/11847.misc
Normal file
1
changelog.d/11847.misc
Normal file
@@ -0,0 +1 @@
|
||||
Preparation for reducing Postgres serialization errors: allow setting transaction isolation level. Contributed by Nick @ Beeper.
|
||||
1
changelog.d/11852.misc
Normal file
1
changelog.d/11852.misc
Normal file
@@ -0,0 +1 @@
|
||||
Build Docker images for using worker-mode Synapse in automated test tools.
|
||||
6
debian/changelog
vendored
6
debian/changelog
vendored
@@ -16,6 +16,12 @@ matrix-synapse-py3 (1.51.0~rc1) stable; urgency=medium
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Fri, 21 Jan 2022 10:46:02 +0000
|
||||
|
||||
matrix-synapse-py3 (1.50.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.50.2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Mon, 24 Jan 2022 13:37:11 +0000
|
||||
|
||||
matrix-synapse-py3 (1.50.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.50.1.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Use the Sytest image that comes with a lot of the build dependencies
|
||||
# pre-installed
|
||||
FROM matrixdotorg/sytest:bionic
|
||||
FROM matrixdotorg/sytest:focal
|
||||
|
||||
# The Sytest image doesn't come with python, so install that
|
||||
RUN apt-get update && apt-get -qq install -y python3 python3-dev python3-pip
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
ARG base_version=latest
|
||||
|
||||
# Inherit from the official Synapse docker image
|
||||
FROM matrixdotorg/synapse
|
||||
|
||||
FROM matrixdotorg/synapse:$base_version
|
||||
|
||||
# Install deps
|
||||
RUN apt-get update
|
||||
|
||||
@@ -16,4 +16,4 @@ sudo -u postgres /usr/lib/postgresql/10/bin/pg_ctl -w -D /var/lib/postgresql/dat
|
||||
# Run the tests
|
||||
cd /src
|
||||
export TRIAL_FLAGS="-j 4"
|
||||
tox --workdir=./.tox-pg-container -e py36-postgres "$@"
|
||||
tox --workdir=./.tox-pg-container -e py37-postgres "$@"
|
||||
|
||||
@@ -353,6 +353,11 @@ The following actions are performed when deactivating an user:
|
||||
- Remove the user from the user directory
|
||||
- Reject all pending invites
|
||||
- Remove all account validity information related to the user
|
||||
- Remove the arbitrary data store known as *account data*. For example, this includes:
|
||||
- list of ignored users;
|
||||
- push rules;
|
||||
- secret storage keys; and
|
||||
- cross-signing keys.
|
||||
|
||||
The following additional actions are performed during deactivation if `erase`
|
||||
is set to `true`:
|
||||
@@ -366,7 +371,6 @@ The following actions are **NOT** performed. The list may be incomplete.
|
||||
- Remove mappings of SSO IDs
|
||||
- [Delete media uploaded](#delete-media-uploaded-by-a-user) by user (included avatar images)
|
||||
- Delete sent and received messages
|
||||
- Delete E2E cross-signing keys
|
||||
- Remove the user's creation (registration) timestamp
|
||||
- [Remove rate limit overrides](#override-ratelimiting-for-users)
|
||||
- Remove from monthly active users
|
||||
|
||||
@@ -16,6 +16,6 @@ It returns a JSON body like the following:
|
||||
```json
|
||||
{
|
||||
"server_version": "0.99.2rc1 (b=develop, abcdef123)",
|
||||
"python_version": "3.6.8"
|
||||
"python_version": "3.7.8"
|
||||
}
|
||||
```
|
||||
|
||||
@@ -55,6 +55,7 @@ setup a *virtualenv*, as follows:
|
||||
cd path/where/you/have/cloned/the/repository
|
||||
python3 -m venv ./env
|
||||
source ./env/bin/activate
|
||||
pip install wheel
|
||||
pip install -e ".[all,dev]"
|
||||
pip install tox
|
||||
```
|
||||
@@ -116,7 +117,7 @@ The linters look at your code and do two things:
|
||||
- ensure that your code follows the coding style adopted by the project;
|
||||
- catch a number of errors in your code.
|
||||
|
||||
They're pretty fast, don't hesitate!
|
||||
The linter takes no time at all to run as soon as you've [downloaded the dependencies into your python virtual environment](#4-install-the-dependencies).
|
||||
|
||||
```sh
|
||||
source ./env/bin/activate
|
||||
|
||||
@@ -96,6 +96,60 @@ Ensure postgres is installed, then run:
|
||||
NB at the time of writing, this script predates the split into separate `state`/`main`
|
||||
databases so will require updates to handle that correctly.
|
||||
|
||||
## Delta files
|
||||
|
||||
Delta files define the steps required to upgrade the database from an earlier version.
|
||||
They can be written as either a file containing a series of SQL statements, or a Python
|
||||
module.
|
||||
|
||||
Synapse remembers which delta files it has applied to a database (they are stored in the
|
||||
`applied_schema_deltas` table) and will not re-apply them (even if a given file is
|
||||
subsequently updated).
|
||||
|
||||
Delta files should be placed in a directory named `synapse/storage/schema/<database>/delta/<version>/`.
|
||||
They are applied in alphanumeric order, so by convention the first two characters
|
||||
of the filename should be an integer such as `01`, to put the file in the right order.
|
||||
|
||||
### SQL delta files
|
||||
|
||||
These should be named `*.sql`, or — for changes which should only be applied for a
|
||||
given database engine — `*.sql.posgres` or `*.sql.sqlite`. For example, a delta which
|
||||
adds a new column to the `foo` table might be called `01add_bar_to_foo.sql`.
|
||||
|
||||
Note that our SQL parser is a bit simple - it understands comments (`--` and `/*...*/`),
|
||||
but complex statements which require a `;` in the middle of them (such as `CREATE
|
||||
TRIGGER`) are beyond it and you'll have to use a Python delta file.
|
||||
|
||||
### Python delta files
|
||||
|
||||
For more flexibility, a delta file can take the form of a python module. These should
|
||||
be named `*.py`. Note that database-engine-specific modules are not supported here –
|
||||
instead you can write `if isinstance(database_engine, PostgresEngine)` or similar.
|
||||
|
||||
A Python delta module should define either or both of the following functions:
|
||||
|
||||
```python
|
||||
import synapse.config.homeserver
|
||||
import synapse.storage.engines
|
||||
import synapse.storage.types
|
||||
|
||||
|
||||
def run_create(
|
||||
cur: synapse.storage.types.Cursor,
|
||||
database_engine: synapse.storage.engines.BaseDatabaseEngine,
|
||||
) -> None:
|
||||
"""Called whenever an existing or new database is to be upgraded"""
|
||||
...
|
||||
|
||||
def run_upgrade(
|
||||
cur: synapse.storage.types.Cursor,
|
||||
database_engine: synapse.storage.engines.BaseDatabaseEngine,
|
||||
config: synapse.config.homeserver.HomeServerConfig,
|
||||
) -> None:
|
||||
"""Called whenever an existing database is to be upgraded."""
|
||||
...
|
||||
```
|
||||
|
||||
## Boolean columns
|
||||
|
||||
Boolean columns require special treatment, since SQLite treats booleans the
|
||||
|
||||
@@ -105,6 +105,68 @@ device ID), and the (now deactivated) access token.
|
||||
|
||||
If multiple modules implement this callback, Synapse runs them all in order.
|
||||
|
||||
### `get_username_for_registration`
|
||||
|
||||
_First introduced in Synapse v1.52.0_
|
||||
|
||||
```python
|
||||
async def get_username_for_registration(
|
||||
uia_results: Dict[str, Any],
|
||||
params: Dict[str, Any],
|
||||
) -> Optional[str]
|
||||
```
|
||||
|
||||
Called when registering a new user. The module can return a username to set for the user
|
||||
being registered by returning it as a string, or `None` if it doesn't wish to force a
|
||||
username for this user. If a username is returned, it will be used as the local part of a
|
||||
user's full Matrix ID (e.g. it's `alice` in `@alice:example.com`).
|
||||
|
||||
This callback is called once [User-Interactive Authentication](https://spec.matrix.org/latest/client-server-api/#user-interactive-authentication-api)
|
||||
has been completed by the user. It is not called when registering a user via SSO. It is
|
||||
passed two dictionaries, which include the information that the user has provided during
|
||||
the registration process.
|
||||
|
||||
The first dictionary contains the results of the [User-Interactive Authentication](https://spec.matrix.org/latest/client-server-api/#user-interactive-authentication-api)
|
||||
flow followed by the user. Its keys are the identifiers of every step involved in the flow,
|
||||
associated with either a boolean value indicating whether the step was correctly completed,
|
||||
or additional information (e.g. email address, phone number...). A list of most existing
|
||||
identifiers can be found in the [Matrix specification](https://spec.matrix.org/v1.1/client-server-api/#authentication-types).
|
||||
Here's an example featuring all currently supported keys:
|
||||
|
||||
```python
|
||||
{
|
||||
"m.login.dummy": True, # Dummy authentication
|
||||
"m.login.terms": True, # User has accepted the terms of service for the homeserver
|
||||
"m.login.recaptcha": True, # User has completed the recaptcha challenge
|
||||
"m.login.email.identity": { # User has provided and verified an email address
|
||||
"medium": "email",
|
||||
"address": "alice@example.com",
|
||||
"validated_at": 1642701357084,
|
||||
},
|
||||
"m.login.msisdn": { # User has provided and verified a phone number
|
||||
"medium": "msisdn",
|
||||
"address": "33123456789",
|
||||
"validated_at": 1642701357084,
|
||||
},
|
||||
"org.matrix.msc3231.login.registration_token": "sometoken", # User has registered through the flow described in MSC3231
|
||||
}
|
||||
```
|
||||
|
||||
The second dictionary contains the parameters provided by the user's client in the request
|
||||
to `/_matrix/client/v3/register`. See the [Matrix specification](https://spec.matrix.org/latest/client-server-api/#post_matrixclientv3register)
|
||||
for a complete list of these parameters.
|
||||
|
||||
If the module cannot, or does not wish to, generate a username for this user, it must
|
||||
return `None`.
|
||||
|
||||
If multiple modules implement this callback, they will be considered in order. If a
|
||||
callback returns `None`, Synapse falls through to the next one. The value of the first
|
||||
callback that does not return `None` will be used. If this happens, Synapse will not call
|
||||
any of the subsequent implementations of this callback. If every callback return `None`,
|
||||
the username provided by the user is used, if any (otherwise one is automatically
|
||||
generated).
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
The example module below implements authentication checkers for two different login types:
|
||||
|
||||
@@ -41,11 +41,11 @@
|
||||
# documentation on how to configure or create custom modules for Synapse.
|
||||
#
|
||||
modules:
|
||||
# - module: my_super_module.MySuperClass
|
||||
# config:
|
||||
# do_thing: true
|
||||
# - module: my_other_super_module.SomeClass
|
||||
# config: {}
|
||||
#- module: my_super_module.MySuperClass
|
||||
# config:
|
||||
# do_thing: true
|
||||
#- module: my_other_super_module.SomeClass
|
||||
# config: {}
|
||||
|
||||
|
||||
## Server ##
|
||||
@@ -1428,6 +1428,16 @@ account_threepid_delegates:
|
||||
#
|
||||
#auto_join_rooms_for_guests: false
|
||||
|
||||
# Whether to inhibit errors raised when registering a new account if the user ID
|
||||
# already exists. If turned on, that requests to /register/available will always
|
||||
# show a user ID as available, and Synapse won't raise an error when starting
|
||||
# a registration with a user ID that already exists. However, Synapse will still
|
||||
# raise an error if the registration completes and the username conflicts.
|
||||
#
|
||||
# Defaults to false.
|
||||
#
|
||||
#inhibit_user_in_use_error: true
|
||||
|
||||
|
||||
## Metrics ###
|
||||
|
||||
|
||||
@@ -194,7 +194,7 @@ When following this route please make sure that the [Platform-specific prerequis
|
||||
System requirements:
|
||||
|
||||
- POSIX-compliant system (tested on Linux & OS X)
|
||||
- Python 3.7 or later, up to Python 3.9.
|
||||
- Python 3.7 or later, up to Python 3.10.
|
||||
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
|
||||
|
||||
To install the Synapse homeserver run:
|
||||
|
||||
@@ -86,7 +86,7 @@ The following fields are returned in the JSON response body:
|
||||
- `next_token`: string representing a positive integer - Indication for pagination. See above.
|
||||
- `total` - integer - Total number of destinations.
|
||||
|
||||
# Destination Details API
|
||||
## Destination Details API
|
||||
|
||||
This API gets the retry timing info for a specific remote server.
|
||||
|
||||
@@ -108,7 +108,105 @@ A response body like the following is returned:
|
||||
}
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- `destination` - Name of the remote server.
|
||||
|
||||
**Response**
|
||||
|
||||
The response fields are the same like in the `destinations` array in
|
||||
[List of destinations](#list-of-destinations) response.
|
||||
|
||||
## Destination rooms
|
||||
|
||||
This API gets the rooms that federate with a specific remote server.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/federation/destinations/<destination>/rooms
|
||||
```
|
||||
|
||||
A response body like the following is returned:
|
||||
|
||||
```json
|
||||
{
|
||||
"rooms":[
|
||||
{
|
||||
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
|
||||
"stream_ordering": 8326
|
||||
},
|
||||
{
|
||||
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
|
||||
"stream_ordering": 93534
|
||||
}
|
||||
],
|
||||
"total": 2
|
||||
}
|
||||
```
|
||||
|
||||
To paginate, check for `next_token` and if present, call the endpoint again
|
||||
with `from` set to the value of `next_token`. This will return a new page.
|
||||
|
||||
If the endpoint does not return a `next_token` then there are no more destinations
|
||||
to paginate through.
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- `destination` - Name of the remote server.
|
||||
|
||||
The following query parameters are available:
|
||||
|
||||
- `from` - Offset in the returned list. Defaults to `0`.
|
||||
- `limit` - Maximum amount of destinations to return. Defaults to `100`.
|
||||
- `dir` - Direction of room order by `room_id`. Either `f` for forwards or `b` for
|
||||
backwards. Defaults to `f`.
|
||||
|
||||
**Response**
|
||||
|
||||
The following fields are returned in the JSON response body:
|
||||
|
||||
- `rooms` - An array of objects, each containing information about a room.
|
||||
Room objects contain the following fields:
|
||||
- `room_id` - string - The ID of the room.
|
||||
- `stream_ordering` - integer - The stream ordering of the most recent
|
||||
successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation)
|
||||
to this destination in this room.
|
||||
- `next_token`: string representing a positive integer - Indication for pagination. See above.
|
||||
- `total` - integer - Total number of destinations.
|
||||
|
||||
## Reset connection timeout
|
||||
|
||||
Synapse makes federation requests to other homeservers. If a federation request fails,
|
||||
Synapse will mark the destination homeserver as offline, preventing any future requests
|
||||
to that server for a "cooldown" period. This period grows over time if the server
|
||||
continues to fail its responses
|
||||
([exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff)).
|
||||
|
||||
Admins can cancel the cooldown period with this API.
|
||||
|
||||
This API resets the retry timing for a specific remote server and tries to connect to
|
||||
the remote server again. It does not wait for the next `retry_interval`.
|
||||
The connection must have previously run into an error and `retry_last_ts`
|
||||
([Destination Details API](#destination-details-api)) must not be equal to `0`.
|
||||
|
||||
The connection attempt is carried out in the background and can take a while
|
||||
even if the API already returns the http status 200.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/federation/destinations/<destination>/reset_connection
|
||||
|
||||
{}
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- `destination` - Name of the remote server.
|
||||
|
||||
2
setup.py
2
setup.py
@@ -150,7 +150,7 @@ setup(
|
||||
zip_safe=False,
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/x-rst",
|
||||
python_requires="~=3.6",
|
||||
python_requires="~=3.7",
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"synapse_homeserver = synapse.app.homeserver:main",
|
||||
|
||||
@@ -21,8 +21,8 @@ import os
|
||||
import sys
|
||||
|
||||
# Check that we're not running on an unsupported Python version.
|
||||
if sys.version_info < (3, 6):
|
||||
print("Synapse requires Python 3.6 or above.")
|
||||
if sys.version_info < (3, 7):
|
||||
print("Synapse requires Python 3.7 or above.")
|
||||
sys.exit(1)
|
||||
|
||||
# Twisted and canonicaljson will fail to import when this file is executed to
|
||||
|
||||
@@ -16,7 +16,6 @@ import atexit
|
||||
import gc
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
@@ -436,7 +435,8 @@ async def start(hs: "HomeServer") -> None:
|
||||
# before we start the listeners.
|
||||
module_api = hs.get_module_api()
|
||||
for module, config in hs.config.modules.loaded_modules:
|
||||
module(config=config, api=module_api)
|
||||
m = module(config=config, api=module_api)
|
||||
logger.info("Loaded module %s", m)
|
||||
|
||||
load_legacy_spam_checkers(hs)
|
||||
load_legacy_third_party_event_rules(hs)
|
||||
@@ -468,15 +468,13 @@ async def start(hs: "HomeServer") -> None:
|
||||
# everything currently allocated are things that will be used for the
|
||||
# rest of time. Doing so means less work each GC (hopefully).
|
||||
#
|
||||
# This only works on Python 3.7
|
||||
if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
|
||||
# PyPy does not (yet?) implement gc.freeze()
|
||||
if hasattr(gc, "freeze"):
|
||||
gc.collect()
|
||||
gc.freeze()
|
||||
|
||||
# Speed up shutdowns by freezing all allocated objects. This moves everything
|
||||
# into the permanent generation and excludes them from the final GC.
|
||||
# Unfortunately only works on Python 3.7
|
||||
if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
|
||||
# Speed up shutdowns by freezing all allocated objects. This moves everything
|
||||
# into the permanent generation and excludes them from the final GC.
|
||||
atexit.register(gc.freeze)
|
||||
|
||||
|
||||
|
||||
@@ -41,9 +41,9 @@ class ModulesConfig(Config):
|
||||
# documentation on how to configure or create custom modules for Synapse.
|
||||
#
|
||||
modules:
|
||||
# - module: my_super_module.MySuperClass
|
||||
# config:
|
||||
# do_thing: true
|
||||
# - module: my_other_super_module.SomeClass
|
||||
# config: {}
|
||||
#- module: my_super_module.MySuperClass
|
||||
# config:
|
||||
# do_thing: true
|
||||
#- module: my_other_super_module.SomeClass
|
||||
# config: {}
|
||||
"""
|
||||
|
||||
@@ -190,6 +190,8 @@ class RegistrationConfig(Config):
|
||||
# The success template used during fallback auth.
|
||||
self.fallback_success_template = self.read_template("auth_success.html")
|
||||
|
||||
self.inhibit_user_in_use_error = config.get("inhibit_user_in_use_error", False)
|
||||
|
||||
def generate_config_section(self, generate_secrets=False, **kwargs):
|
||||
if generate_secrets:
|
||||
registration_shared_secret = 'registration_shared_secret: "%s"' % (
|
||||
@@ -446,6 +448,16 @@ class RegistrationConfig(Config):
|
||||
# Defaults to true.
|
||||
#
|
||||
#auto_join_rooms_for_guests: false
|
||||
|
||||
# Whether to inhibit errors raised when registering a new account if the user ID
|
||||
# already exists. If turned on, that requests to /register/available will always
|
||||
# show a user ID as available, and Synapse won't raise an error when starting
|
||||
# a registration with a user ID that already exists. However, Synapse will still
|
||||
# raise an error if the registration completes and the username conflicts.
|
||||
#
|
||||
# Defaults to false.
|
||||
#
|
||||
#inhibit_user_in_use_error: true
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
@@ -315,10 +315,11 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
redacts: DefaultDictProperty[Optional[str]] = DefaultDictProperty("redacts", None)
|
||||
room_id: DictProperty[str] = DictProperty("room_id")
|
||||
sender: DictProperty[str] = DictProperty("sender")
|
||||
# TODO state_key should be Optional[str], this is generally asserted in Synapse
|
||||
# by calling is_state() first (which ensures this), but it is hard (not possible?)
|
||||
# TODO state_key should be Optional[str]. This is generally asserted in Synapse
|
||||
# by calling is_state() first (which ensures it is not None), but it is hard (not possible?)
|
||||
# to properly annotate that calling is_state() asserts that state_key exists
|
||||
# and is non-None.
|
||||
# and is non-None. It would be better to replace such direct references with
|
||||
# get_state_key() (and a check for None).
|
||||
state_key: DictProperty[str] = DictProperty("state_key")
|
||||
type: DictProperty[str] = DictProperty("type")
|
||||
user_id: DictProperty[str] = DictProperty("sender")
|
||||
@@ -332,7 +333,11 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
return self.content["membership"]
|
||||
|
||||
def is_state(self) -> bool:
|
||||
return hasattr(self, "state_key") and self.state_key is not None
|
||||
return self.get_state_key() is not None
|
||||
|
||||
def get_state_key(self) -> Optional[str]:
|
||||
"""Get the state key of this event, or None if it's not a state event"""
|
||||
return self._dict.get("state_key")
|
||||
|
||||
def get_dict(self) -> JsonDict:
|
||||
d = dict(self._dict)
|
||||
|
||||
@@ -163,7 +163,7 @@ class EventContext:
|
||||
return {
|
||||
"prev_state_id": prev_state_id,
|
||||
"event_type": event.type,
|
||||
"event_state_key": event.state_key if event.is_state() else None,
|
||||
"event_state_key": event.get_state_key(),
|
||||
"state_group": self._state_group,
|
||||
"state_group_before_event": self.state_group_before_event,
|
||||
"rejected": self.rejected,
|
||||
|
||||
@@ -14,7 +14,17 @@
|
||||
# limitations under the License.
|
||||
import collections.abc
|
||||
import re
|
||||
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
@@ -26,6 +36,10 @@ from synapse.util.frozenutils import unfreeze
|
||||
|
||||
from . import EventBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.storage.databases.main.relations import BundledAggregations
|
||||
|
||||
|
||||
# Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
|
||||
# (?<!stuff) matches if the current position in the string is not preceded
|
||||
# by a match for 'stuff'.
|
||||
@@ -376,7 +390,7 @@ class EventClientSerializer:
|
||||
event: Union[JsonDict, EventBase],
|
||||
time_now: int,
|
||||
*,
|
||||
bundle_aggregations: Optional[Dict[str, JsonDict]] = None,
|
||||
bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None,
|
||||
**kwargs: Any,
|
||||
) -> JsonDict:
|
||||
"""Serializes a single event.
|
||||
@@ -415,7 +429,7 @@ class EventClientSerializer:
|
||||
self,
|
||||
event: EventBase,
|
||||
time_now: int,
|
||||
aggregations: JsonDict,
|
||||
aggregations: "BundledAggregations",
|
||||
serialized_event: JsonDict,
|
||||
) -> None:
|
||||
"""Potentially injects bundled aggregations into the unsigned portion of the serialized event.
|
||||
@@ -427,13 +441,18 @@ class EventClientSerializer:
|
||||
serialized_event: The serialized event which may be modified.
|
||||
|
||||
"""
|
||||
# Make a copy in-case the object is cached.
|
||||
aggregations = aggregations.copy()
|
||||
serialized_aggregations = {}
|
||||
|
||||
if RelationTypes.REPLACE in aggregations:
|
||||
if aggregations.annotations:
|
||||
serialized_aggregations[RelationTypes.ANNOTATION] = aggregations.annotations
|
||||
|
||||
if aggregations.references:
|
||||
serialized_aggregations[RelationTypes.REFERENCE] = aggregations.references
|
||||
|
||||
if aggregations.replace:
|
||||
# If there is an edit replace the content, preserving existing
|
||||
# relations.
|
||||
edit = aggregations[RelationTypes.REPLACE]
|
||||
edit = aggregations.replace
|
||||
|
||||
# Ensure we take copies of the edit content, otherwise we risk modifying
|
||||
# the original event.
|
||||
@@ -451,24 +470,28 @@ class EventClientSerializer:
|
||||
else:
|
||||
serialized_event["content"].pop("m.relates_to", None)
|
||||
|
||||
aggregations[RelationTypes.REPLACE] = {
|
||||
serialized_aggregations[RelationTypes.REPLACE] = {
|
||||
"event_id": edit.event_id,
|
||||
"origin_server_ts": edit.origin_server_ts,
|
||||
"sender": edit.sender,
|
||||
}
|
||||
|
||||
# If this event is the start of a thread, include a summary of the replies.
|
||||
if RelationTypes.THREAD in aggregations:
|
||||
# Serialize the latest thread event.
|
||||
latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"]
|
||||
|
||||
# Don't bundle aggregations as this could recurse forever.
|
||||
aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event(
|
||||
latest_thread_event, time_now, bundle_aggregations=None
|
||||
)
|
||||
if aggregations.thread:
|
||||
serialized_aggregations[RelationTypes.THREAD] = {
|
||||
# Don't bundle aggregations as this could recurse forever.
|
||||
"latest_event": self.serialize_event(
|
||||
aggregations.thread.latest_event, time_now, bundle_aggregations=None
|
||||
),
|
||||
"count": aggregations.thread.count,
|
||||
"current_user_participated": aggregations.thread.current_user_participated,
|
||||
}
|
||||
|
||||
# Include the bundled aggregations in the event.
|
||||
serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations)
|
||||
if serialized_aggregations:
|
||||
serialized_event["unsigned"].setdefault("m.relations", {}).update(
|
||||
serialized_aggregations
|
||||
)
|
||||
|
||||
def serialize_events(
|
||||
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import collections.abc
|
||||
from typing import Iterable, Union
|
||||
from typing import Iterable, Type, Union
|
||||
|
||||
import jsonschema
|
||||
|
||||
@@ -246,7 +246,7 @@ POWER_LEVELS_SCHEMA = {
|
||||
|
||||
# This could return something newer than Draft 7, but that's the current "latest"
|
||||
# validator.
|
||||
def _create_power_level_validator() -> jsonschema.Draft7Validator:
|
||||
def _create_power_level_validator() -> Type[jsonschema.Draft7Validator]:
|
||||
validator = jsonschema.validators.validator_for(POWER_LEVELS_SCHEMA)
|
||||
|
||||
# by default jsonschema does not consider a frozendict to be an object so
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Type
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
@@ -36,17 +36,19 @@ from synapse.http.servlet import (
|
||||
parse_integer_from_args,
|
||||
parse_string_from_args,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, ThirdPartyInstanceID
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TransportLayerServer(JsonResource):
|
||||
"""Handles incoming federation HTTP requests"""
|
||||
|
||||
def __init__(self, hs: HomeServer, servlet_groups: Optional[List[str]] = None):
|
||||
def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None):
|
||||
"""Initialize the TransportLayerServer
|
||||
|
||||
Will by default register all servlets. For custom behaviour, pass in
|
||||
@@ -113,7 +115,7 @@ class PublicRoomList(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
@@ -203,7 +205,7 @@ class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
@@ -251,7 +253,7 @@ class OpenIdUserInfo(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
@@ -297,7 +299,7 @@ DEFAULT_SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
|
||||
|
||||
|
||||
def register_servlets(
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
resource: HttpServer,
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import functools
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Awaitable, Callable, Optional, Tuple, cast
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple, cast
|
||||
|
||||
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
||||
from synapse.api.urls import FEDERATION_V1_PREFIX
|
||||
@@ -29,11 +29,13 @@ from synapse.logging.opentracing import (
|
||||
start_active_span_follows_from,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.stringutils import parse_and_validate_server_name
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -46,7 +48,7 @@ class NoAuthenticationError(AuthenticationError):
|
||||
|
||||
|
||||
class Authenticator:
|
||||
def __init__(self, hs: HomeServer):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._clock = hs.get_clock()
|
||||
self.keyring = hs.get_keyring()
|
||||
self.server_name = hs.hostname
|
||||
@@ -114,11 +116,11 @@ class Authenticator:
|
||||
# alive
|
||||
retry_timings = await self.store.get_destination_retry_timings(origin)
|
||||
if retry_timings and retry_timings.retry_last_ts:
|
||||
run_in_background(self._reset_retry_timings, origin)
|
||||
run_in_background(self.reset_retry_timings, origin)
|
||||
|
||||
return origin
|
||||
|
||||
async def _reset_retry_timings(self, origin: str) -> None:
|
||||
async def reset_retry_timings(self, origin: str) -> None:
|
||||
try:
|
||||
logger.info("Marking origin %r as up", origin)
|
||||
await self.store.set_destination_retry_timings(origin, None, 0, 0)
|
||||
@@ -227,7 +229,7 @@ class BaseFederationServlet:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
|
||||
@@ -12,7 +12,17 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Type, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
@@ -30,11 +40,13 @@ from synapse.http.servlet import (
|
||||
parse_string_from_args,
|
||||
parse_strings_from_args,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
issue_8631_logger = logging.getLogger("synapse.8631_debug")
|
||||
|
||||
@@ -47,7 +59,7 @@ class BaseFederationServerServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
@@ -596,7 +608,7 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
@@ -670,7 +682,7 @@ class FederationRoomHierarchyServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
@@ -706,7 +718,7 @@ class RoomComplexityServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Dict, List, Tuple, Type
|
||||
from typing import TYPE_CHECKING, Dict, List, Tuple, Type
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.federation.transport.server._base import (
|
||||
@@ -19,10 +19,12 @@ from synapse.federation.transport.server._base import (
|
||||
BaseFederationServlet,
|
||||
)
|
||||
from synapse.handlers.groups_local import GroupsLocalHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class BaseGroupsLocalServlet(BaseFederationServlet):
|
||||
"""Abstract base class for federation servlet classes which provides a groups local handler.
|
||||
@@ -32,7 +34,7 @@ class BaseGroupsLocalServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Dict, List, Tuple, Type
|
||||
from typing import TYPE_CHECKING, Dict, List, Tuple, Type
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
@@ -22,10 +22,12 @@ from synapse.federation.transport.server._base import (
|
||||
BaseFederationServlet,
|
||||
)
|
||||
from synapse.http.servlet import parse_string_from_args
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class BaseGroupsServerServlet(BaseFederationServlet):
|
||||
"""Abstract base class for federation servlet classes which provides a groups server handler.
|
||||
@@ -35,7 +37,7 @@ class BaseGroupsServerServlet(BaseFederationServlet):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: HomeServer,
|
||||
hs: "HomeServer",
|
||||
authenticator: Authenticator,
|
||||
ratelimiter: FederationRateLimiter,
|
||||
server_name: str,
|
||||
|
||||
@@ -2060,6 +2060,10 @@ CHECK_AUTH_CALLBACK = Callable[
|
||||
Optional[Tuple[str, Optional[Callable[["LoginResponse"], Awaitable[None]]]]]
|
||||
],
|
||||
]
|
||||
GET_USERNAME_FOR_REGISTRATION_CALLBACK = Callable[
|
||||
[JsonDict, JsonDict],
|
||||
Awaitable[Optional[str]],
|
||||
]
|
||||
|
||||
|
||||
class PasswordAuthProvider:
|
||||
@@ -2072,6 +2076,9 @@ class PasswordAuthProvider:
|
||||
# lists of callbacks
|
||||
self.check_3pid_auth_callbacks: List[CHECK_3PID_AUTH_CALLBACK] = []
|
||||
self.on_logged_out_callbacks: List[ON_LOGGED_OUT_CALLBACK] = []
|
||||
self.get_username_for_registration_callbacks: List[
|
||||
GET_USERNAME_FOR_REGISTRATION_CALLBACK
|
||||
] = []
|
||||
|
||||
# Mapping from login type to login parameters
|
||||
self._supported_login_types: Dict[str, Iterable[str]] = {}
|
||||
@@ -2086,6 +2093,9 @@ class PasswordAuthProvider:
|
||||
auth_checkers: Optional[
|
||||
Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
|
||||
] = None,
|
||||
get_username_for_registration: Optional[
|
||||
GET_USERNAME_FOR_REGISTRATION_CALLBACK
|
||||
] = None,
|
||||
) -> None:
|
||||
# Register check_3pid_auth callback
|
||||
if check_3pid_auth is not None:
|
||||
@@ -2130,6 +2140,11 @@ class PasswordAuthProvider:
|
||||
# Add the new method to the list of auth_checker_callbacks for this login type
|
||||
self.auth_checker_callbacks.setdefault(login_type, []).append(callback)
|
||||
|
||||
if get_username_for_registration is not None:
|
||||
self.get_username_for_registration_callbacks.append(
|
||||
get_username_for_registration,
|
||||
)
|
||||
|
||||
def get_supported_login_types(self) -> Mapping[str, Iterable[str]]:
|
||||
"""Get the login types supported by this password provider
|
||||
|
||||
@@ -2285,3 +2300,46 @@ class PasswordAuthProvider:
|
||||
except Exception as e:
|
||||
logger.warning("Failed to run module API callback %s: %s", callback, e)
|
||||
continue
|
||||
|
||||
async def get_username_for_registration(
|
||||
self,
|
||||
uia_results: JsonDict,
|
||||
params: JsonDict,
|
||||
) -> Optional[str]:
|
||||
"""Defines the username to use when registering the user, using the credentials
|
||||
and parameters provided during the UIA flow.
|
||||
|
||||
Stops at the first callback that returns a string.
|
||||
|
||||
Args:
|
||||
uia_results: The credentials provided during the UIA flow.
|
||||
params: The parameters provided by the registration request.
|
||||
|
||||
Returns:
|
||||
The localpart to use when registering this user, or None if no module
|
||||
returned a localpart.
|
||||
"""
|
||||
for callback in self.get_username_for_registration_callbacks:
|
||||
try:
|
||||
res = await callback(uia_results, params)
|
||||
|
||||
if isinstance(res, str):
|
||||
return res
|
||||
elif res is not None:
|
||||
# mypy complains that this line is unreachable because it assumes the
|
||||
# data returned by the module fits the expected type. We just want
|
||||
# to make sure this is the case.
|
||||
logger.warning( # type: ignore[unreachable]
|
||||
"Ignoring non-string value returned by"
|
||||
" get_username_for_registration callback %s: %s",
|
||||
callback,
|
||||
res,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Module raised an exception in get_username_for_registration: %s",
|
||||
e,
|
||||
)
|
||||
raise SynapseError(code=500, msg="Internal Server Error")
|
||||
|
||||
return None
|
||||
|
||||
@@ -157,6 +157,9 @@ class DeactivateAccountHandler:
|
||||
# Mark the user as deactivated.
|
||||
await self.store.set_user_deactivated_status(user_id, True)
|
||||
|
||||
# Remove account data (including ignored users and push rules).
|
||||
await self.store.purge_account_data_for_user(user_id)
|
||||
|
||||
return identity_server_supports_unbinding
|
||||
|
||||
async def _reject_pending_invites_for_user(self, user_id: str) -> None:
|
||||
|
||||
@@ -132,6 +132,7 @@ class RegistrationHandler:
|
||||
localpart: str,
|
||||
guest_access_token: Optional[str] = None,
|
||||
assigned_user_id: Optional[str] = None,
|
||||
inhibit_user_in_use_error: bool = False,
|
||||
) -> None:
|
||||
if types.contains_invalid_mxid_characters(localpart):
|
||||
raise SynapseError(
|
||||
@@ -171,21 +172,22 @@ class RegistrationHandler:
|
||||
|
||||
users = await self.store.get_users_by_id_case_insensitive(user_id)
|
||||
if users:
|
||||
if not guest_access_token:
|
||||
if not inhibit_user_in_use_error and not guest_access_token:
|
||||
raise SynapseError(
|
||||
400, "User ID already taken.", errcode=Codes.USER_IN_USE
|
||||
)
|
||||
user_data = await self.auth.get_user_by_access_token(guest_access_token)
|
||||
if (
|
||||
not user_data.is_guest
|
||||
or UserID.from_string(user_data.user_id).localpart != localpart
|
||||
):
|
||||
raise AuthError(
|
||||
403,
|
||||
"Cannot register taken user ID without valid guest "
|
||||
"credentials for that user.",
|
||||
errcode=Codes.FORBIDDEN,
|
||||
)
|
||||
if guest_access_token:
|
||||
user_data = await self.auth.get_user_by_access_token(guest_access_token)
|
||||
if (
|
||||
not user_data.is_guest
|
||||
or UserID.from_string(user_data.user_id).localpart != localpart
|
||||
):
|
||||
raise AuthError(
|
||||
403,
|
||||
"Cannot register taken user ID without valid guest "
|
||||
"credentials for that user.",
|
||||
errcode=Codes.FORBIDDEN,
|
||||
)
|
||||
|
||||
if guest_access_token is None:
|
||||
try:
|
||||
|
||||
@@ -30,6 +30,7 @@ from typing import (
|
||||
Tuple,
|
||||
)
|
||||
|
||||
import attr
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from synapse.api.constants import (
|
||||
@@ -60,6 +61,7 @@ from synapse.events.utils import copy_power_levels_contents
|
||||
from synapse.federation.federation_client import InvalidResponseError
|
||||
from synapse.handlers.federation import get_domains_from_state
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
from synapse.storage.databases.main.relations import BundledAggregations
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.streams import EventSource
|
||||
from synapse.types import (
|
||||
@@ -90,6 +92,17 @@ id_server_scheme = "https://"
|
||||
FIVE_MINUTES_IN_MS = 5 * 60 * 1000
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class EventContext:
|
||||
events_before: List[EventBase]
|
||||
event: EventBase
|
||||
events_after: List[EventBase]
|
||||
state: List[EventBase]
|
||||
aggregations: Dict[str, BundledAggregations]
|
||||
start: str
|
||||
end: str
|
||||
|
||||
|
||||
class RoomCreationHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -1119,7 +1132,7 @@ class RoomContextHandler:
|
||||
limit: int,
|
||||
event_filter: Optional[Filter],
|
||||
use_admin_priviledge: bool = False,
|
||||
) -> Optional[JsonDict]:
|
||||
) -> Optional[EventContext]:
|
||||
"""Retrieves events, pagination tokens and state around a given event
|
||||
in a room.
|
||||
|
||||
@@ -1167,38 +1180,28 @@ class RoomContextHandler:
|
||||
results = await self.store.get_events_around(
|
||||
room_id, event_id, before_limit, after_limit, event_filter
|
||||
)
|
||||
events_before = results.events_before
|
||||
events_after = results.events_after
|
||||
|
||||
if event_filter:
|
||||
results["events_before"] = await event_filter.filter(
|
||||
results["events_before"]
|
||||
)
|
||||
results["events_after"] = await event_filter.filter(results["events_after"])
|
||||
events_before = await event_filter.filter(events_before)
|
||||
events_after = await event_filter.filter(events_after)
|
||||
|
||||
results["events_before"] = await filter_evts(results["events_before"])
|
||||
results["events_after"] = await filter_evts(results["events_after"])
|
||||
events_before = await filter_evts(events_before)
|
||||
events_after = await filter_evts(events_after)
|
||||
# filter_evts can return a pruned event in case the user is allowed to see that
|
||||
# there's something there but not see the content, so use the event that's in
|
||||
# `filtered` rather than the event we retrieved from the datastore.
|
||||
results["event"] = filtered[0]
|
||||
event = filtered[0]
|
||||
|
||||
# Fetch the aggregations.
|
||||
aggregations = await self.store.get_bundled_aggregations(
|
||||
[results["event"]], user.to_string()
|
||||
itertools.chain(events_before, (event,), events_after),
|
||||
user.to_string(),
|
||||
)
|
||||
aggregations.update(
|
||||
await self.store.get_bundled_aggregations(
|
||||
results["events_before"], user.to_string()
|
||||
)
|
||||
)
|
||||
aggregations.update(
|
||||
await self.store.get_bundled_aggregations(
|
||||
results["events_after"], user.to_string()
|
||||
)
|
||||
)
|
||||
results["aggregations"] = aggregations
|
||||
|
||||
if results["events_after"]:
|
||||
last_event_id = results["events_after"][-1].event_id
|
||||
if events_after:
|
||||
last_event_id = events_after[-1].event_id
|
||||
else:
|
||||
last_event_id = event_id
|
||||
|
||||
@@ -1206,9 +1209,9 @@ class RoomContextHandler:
|
||||
state_filter = StateFilter.from_lazy_load_member_list(
|
||||
ev.sender
|
||||
for ev in itertools.chain(
|
||||
results["events_before"],
|
||||
(results["event"],),
|
||||
results["events_after"],
|
||||
events_before,
|
||||
(event,),
|
||||
events_after,
|
||||
)
|
||||
)
|
||||
else:
|
||||
@@ -1226,21 +1229,23 @@ class RoomContextHandler:
|
||||
if event_filter:
|
||||
state_events = await event_filter.filter(state_events)
|
||||
|
||||
results["state"] = await filter_evts(state_events)
|
||||
|
||||
# We use a dummy token here as we only care about the room portion of
|
||||
# the token, which we replace.
|
||||
token = StreamToken.START
|
||||
|
||||
results["start"] = await token.copy_and_replace(
|
||||
"room_key", results["start"]
|
||||
).to_string(self.store)
|
||||
|
||||
results["end"] = await token.copy_and_replace(
|
||||
"room_key", results["end"]
|
||||
).to_string(self.store)
|
||||
|
||||
return results
|
||||
return EventContext(
|
||||
events_before=events_before,
|
||||
event=event,
|
||||
events_after=events_after,
|
||||
state=await filter_evts(state_events),
|
||||
aggregations=aggregations,
|
||||
start=await token.copy_and_replace("room_key", results.start).to_string(
|
||||
self.store
|
||||
),
|
||||
end=await token.copy_and_replace("room_key", results.end).to_string(
|
||||
self.store
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class TimestampLookupHandler:
|
||||
|
||||
@@ -361,36 +361,37 @@ class SearchHandler:
|
||||
|
||||
logger.info(
|
||||
"Context for search returned %d and %d events",
|
||||
len(res["events_before"]),
|
||||
len(res["events_after"]),
|
||||
len(res.events_before),
|
||||
len(res.events_after),
|
||||
)
|
||||
|
||||
res["events_before"] = await filter_events_for_client(
|
||||
self.storage, user.to_string(), res["events_before"]
|
||||
events_before = await filter_events_for_client(
|
||||
self.storage, user.to_string(), res.events_before
|
||||
)
|
||||
|
||||
res["events_after"] = await filter_events_for_client(
|
||||
self.storage, user.to_string(), res["events_after"]
|
||||
events_after = await filter_events_for_client(
|
||||
self.storage, user.to_string(), res.events_after
|
||||
)
|
||||
|
||||
res["start"] = await now_token.copy_and_replace(
|
||||
"room_key", res["start"]
|
||||
).to_string(self.store)
|
||||
|
||||
res["end"] = await now_token.copy_and_replace(
|
||||
"room_key", res["end"]
|
||||
).to_string(self.store)
|
||||
context = {
|
||||
"events_before": events_before,
|
||||
"events_after": events_after,
|
||||
"start": await now_token.copy_and_replace(
|
||||
"room_key", res.start
|
||||
).to_string(self.store),
|
||||
"end": await now_token.copy_and_replace(
|
||||
"room_key", res.end
|
||||
).to_string(self.store),
|
||||
}
|
||||
|
||||
if include_profile:
|
||||
senders = {
|
||||
ev.sender
|
||||
for ev in itertools.chain(
|
||||
res["events_before"], [event], res["events_after"]
|
||||
)
|
||||
for ev in itertools.chain(events_before, [event], events_after)
|
||||
}
|
||||
|
||||
if res["events_after"]:
|
||||
last_event_id = res["events_after"][-1].event_id
|
||||
if events_after:
|
||||
last_event_id = events_after[-1].event_id
|
||||
else:
|
||||
last_event_id = event.event_id
|
||||
|
||||
@@ -402,7 +403,7 @@ class SearchHandler:
|
||||
last_event_id, state_filter
|
||||
)
|
||||
|
||||
res["profile_info"] = {
|
||||
context["profile_info"] = {
|
||||
s.state_key: {
|
||||
"displayname": s.content.get("displayname", None),
|
||||
"avatar_url": s.content.get("avatar_url", None),
|
||||
@@ -411,7 +412,7 @@ class SearchHandler:
|
||||
if s.type == EventTypes.Member and s.state_key in senders
|
||||
}
|
||||
|
||||
contexts[event.event_id] = res
|
||||
contexts[event.event_id] = context
|
||||
else:
|
||||
contexts = {}
|
||||
|
||||
@@ -421,10 +422,10 @@ class SearchHandler:
|
||||
|
||||
for context in contexts.values():
|
||||
context["events_before"] = self._event_serializer.serialize_events(
|
||||
context["events_before"], time_now
|
||||
context["events_before"], time_now # type: ignore[arg-type]
|
||||
)
|
||||
context["events_after"] = self._event_serializer.serialize_events(
|
||||
context["events_after"], time_now
|
||||
context["events_after"], time_now # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
state_results = {}
|
||||
|
||||
@@ -37,6 +37,7 @@ from synapse.logging.context import current_context
|
||||
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.storage.databases.main.event_push_actions import NotifCounts
|
||||
from synapse.storage.databases.main.relations import BundledAggregations
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
@@ -100,7 +101,7 @@ class TimelineBatch:
|
||||
limited: bool
|
||||
# A mapping of event ID to the bundled aggregations for the above events.
|
||||
# This is only calculated if limited is true.
|
||||
bundled_aggregations: Optional[Dict[str, Dict[str, Any]]] = None
|
||||
bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
@@ -1619,7 +1620,7 @@ class SyncHandler:
|
||||
# TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
|
||||
ignored_account_data = (
|
||||
await self.store.get_global_account_data_by_type_for_user(
|
||||
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
|
||||
user_id=user_id, data_type=AccountDataTypes.IGNORED_USER_LIST
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -731,15 +731,24 @@ class SimpleHttpClient:
|
||||
# straight back in again
|
||||
|
||||
try:
|
||||
length = await make_deferred_yieldable(
|
||||
read_body_with_max_size(response, output_stream, max_size)
|
||||
)
|
||||
d = read_body_with_max_size(response, output_stream, max_size)
|
||||
|
||||
# Ensure that the body is not read forever.
|
||||
d = timeout_deferred(d, 30, self.hs.get_reactor())
|
||||
|
||||
length = await make_deferred_yieldable(d)
|
||||
except BodyExceededMaxSize:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_GATEWAY,
|
||||
"Requested file is too large > %r bytes" % (max_size,),
|
||||
Codes.TOO_LARGE,
|
||||
)
|
||||
except defer.TimeoutError:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_GATEWAY,
|
||||
"Requested file took too long to download",
|
||||
Codes.TOO_LARGE,
|
||||
)
|
||||
except Exception as e:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_GATEWAY, ("Failed to download remote body: %s" % e)
|
||||
|
||||
@@ -407,7 +407,10 @@ class SynapseRequest(Request):
|
||||
|
||||
user_agent = get_request_user_agent(self, "-")
|
||||
|
||||
code = str(self.code)
|
||||
# int(self.code) looks redundant, because self.code is already an int.
|
||||
# But self.code might be an HTTPStatus (which inherits from int)---which has
|
||||
# a different string representation. So ensure we really have an integer.
|
||||
code = str(int(self.code))
|
||||
if not self.finished:
|
||||
# we didn't send the full response before we gave up (presumably because
|
||||
# the connection dropped)
|
||||
|
||||
@@ -71,6 +71,7 @@ from synapse.handlers.account_validity import (
|
||||
from synapse.handlers.auth import (
|
||||
CHECK_3PID_AUTH_CALLBACK,
|
||||
CHECK_AUTH_CALLBACK,
|
||||
GET_USERNAME_FOR_REGISTRATION_CALLBACK,
|
||||
ON_LOGGED_OUT_CALLBACK,
|
||||
AuthHandler,
|
||||
)
|
||||
@@ -177,6 +178,7 @@ class ModuleApi:
|
||||
self._presence_stream = hs.get_event_sources().sources.presence
|
||||
self._state = hs.get_state_handler()
|
||||
self._clock: Clock = hs.get_clock()
|
||||
self._registration_handler = hs.get_registration_handler()
|
||||
self._send_email_handler = hs.get_send_email_handler()
|
||||
self.custom_template_dir = hs.config.server.custom_template_directory
|
||||
|
||||
@@ -310,6 +312,9 @@ class ModuleApi:
|
||||
auth_checkers: Optional[
|
||||
Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
|
||||
] = None,
|
||||
get_username_for_registration: Optional[
|
||||
GET_USERNAME_FOR_REGISTRATION_CALLBACK
|
||||
] = None,
|
||||
) -> None:
|
||||
"""Registers callbacks for password auth provider capabilities.
|
||||
|
||||
@@ -319,6 +324,7 @@ class ModuleApi:
|
||||
check_3pid_auth=check_3pid_auth,
|
||||
on_logged_out=on_logged_out,
|
||||
auth_checkers=auth_checkers,
|
||||
get_username_for_registration=get_username_for_registration,
|
||||
)
|
||||
|
||||
def register_background_update_controller_callbacks(
|
||||
@@ -1202,6 +1208,22 @@ class ModuleApi:
|
||||
"""
|
||||
return await defer_to_thread(self._hs.get_reactor(), f, *args, **kwargs)
|
||||
|
||||
async def check_username(self, username: str) -> None:
|
||||
"""Checks if the provided username uses the grammar defined in the Matrix
|
||||
specification, and is already being used by an existing user.
|
||||
|
||||
Added in Synapse v1.52.0.
|
||||
|
||||
Args:
|
||||
username: The username to check. This is the local part of the user's full
|
||||
Matrix user ID, i.e. it's "alice" if the full user ID is "@alice:foo.com".
|
||||
|
||||
Raises:
|
||||
SynapseError with the errcode "M_USER_IN_USE" if the username is already in
|
||||
use.
|
||||
"""
|
||||
await self._registration_handler.check_username(username)
|
||||
|
||||
|
||||
class PublicRoomListManager:
|
||||
"""Contains methods for adding to, removing from and querying whether a room
|
||||
|
||||
@@ -455,7 +455,7 @@ class Mailer:
|
||||
}
|
||||
|
||||
the_events = await filter_events_for_client(
|
||||
self.storage, user_id, results["events_before"]
|
||||
self.storage, user_id, results.events_before
|
||||
)
|
||||
the_events.append(notif_event)
|
||||
|
||||
|
||||
@@ -70,13 +70,14 @@ REQUIREMENTS = [
|
||||
"pyasn1>=0.1.9",
|
||||
"pyasn1-modules>=0.0.7",
|
||||
"bcrypt>=3.1.0",
|
||||
"pillow>=4.3.0",
|
||||
"pillow>=5.4.0",
|
||||
"sortedcontainers>=1.4.4",
|
||||
"pymacaroons>=0.13.0",
|
||||
"msgpack>=0.5.2",
|
||||
"phonenumbers>=8.2.0",
|
||||
# we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
|
||||
"prometheus_client>=0.4.0",
|
||||
# 0.13.0 has an incorrect type annotation, see #11832.
|
||||
"prometheus_client>=0.4.0,<0.13.0",
|
||||
# we use `order`, which arrived in attrs 19.2.0.
|
||||
# Note: 21.1.0 broke `/sync`, see #9936
|
||||
"attrs>=19.2.0,!=21.1.0",
|
||||
@@ -107,7 +108,7 @@ CONDITIONAL_REQUIREMENTS = {
|
||||
# `systemd.journal.JournalHandler`, as is documented in
|
||||
# `contrib/systemd/log_config.yaml`.
|
||||
"systemd": ["systemd-python>=231"],
|
||||
"url_preview": ["lxml>=3.5.0"],
|
||||
"url_preview": ["lxml>=4.2.0"],
|
||||
"sentry": ["sentry-sdk>=0.7.2"],
|
||||
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
|
||||
"jwt": ["pyjwt>=1.6.4"],
|
||||
|
||||
@@ -52,8 +52,8 @@ class SlavedEventStore(
|
||||
EventPushActionsWorkerStore,
|
||||
StreamWorkerStore,
|
||||
StateGroupWorkerStore,
|
||||
EventsWorkerStore,
|
||||
SignatureWorkerStore,
|
||||
EventsWorkerStore,
|
||||
UserErasureWorkerStore,
|
||||
RelationsWorkerStore,
|
||||
BaseSlavedStore,
|
||||
|
||||
@@ -41,7 +41,9 @@ from synapse.rest.admin.event_reports import (
|
||||
EventReportsRestServlet,
|
||||
)
|
||||
from synapse.rest.admin.federation import (
|
||||
DestinationsRestServlet,
|
||||
DestinationMembershipRestServlet,
|
||||
DestinationResetConnectionRestServlet,
|
||||
DestinationRestServlet,
|
||||
ListDestinationsRestServlet,
|
||||
)
|
||||
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
|
||||
@@ -267,7 +269,9 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
ListRegistrationTokensRestServlet(hs).register(http_server)
|
||||
NewRegistrationTokenRestServlet(hs).register(http_server)
|
||||
RegistrationTokenRestServlet(hs).register(http_server)
|
||||
DestinationsRestServlet(hs).register(http_server)
|
||||
DestinationMembershipRestServlet(hs).register(http_server)
|
||||
DestinationResetConnectionRestServlet(hs).register(http_server)
|
||||
DestinationRestServlet(hs).register(http_server)
|
||||
ListDestinationsRestServlet(hs).register(http_server)
|
||||
|
||||
# Some servlets only get registered for the main process.
|
||||
|
||||
@@ -16,6 +16,7 @@ from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||
from synapse.federation.transport.server import Authenticator
|
||||
from synapse.http.servlet import RestServlet, parse_integer, parse_string
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
|
||||
@@ -90,7 +91,7 @@ class ListDestinationsRestServlet(RestServlet):
|
||||
return HTTPStatus.OK, response
|
||||
|
||||
|
||||
class DestinationsRestServlet(RestServlet):
|
||||
class DestinationRestServlet(RestServlet):
|
||||
"""Get details of a destination.
|
||||
This needs user to have administrator access in Synapse.
|
||||
|
||||
@@ -145,3 +146,100 @@ class DestinationsRestServlet(RestServlet):
|
||||
}
|
||||
|
||||
return HTTPStatus.OK, response
|
||||
|
||||
|
||||
class DestinationMembershipRestServlet(RestServlet):
|
||||
"""Get list of rooms of a destination.
|
||||
This needs user to have administrator access in Synapse.
|
||||
|
||||
GET /_synapse/admin/v1/federation/destinations/<destination>/rooms?from=0&limit=10
|
||||
|
||||
returns:
|
||||
200 OK with a list of rooms if success otherwise an error.
|
||||
|
||||
The parameters `from` and `limit` are required only for pagination.
|
||||
By default, a `limit` of 100 is used.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]*)/rooms$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
self._store = hs.get_datastore()
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, destination: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
if not await self._store.is_destination_known(destination):
|
||||
raise NotFoundError("Unknown destination")
|
||||
|
||||
start = parse_integer(request, "from", default=0)
|
||||
limit = parse_integer(request, "limit", default=100)
|
||||
|
||||
if start < 0:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Query parameter from must be a string representing a positive integer.",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
if limit < 0:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Query parameter limit must be a string representing a positive integer.",
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
|
||||
|
||||
rooms, total = await self._store.get_destination_rooms_paginate(
|
||||
destination, start, limit, direction
|
||||
)
|
||||
response = {"rooms": rooms, "total": total}
|
||||
if (start + limit) < total:
|
||||
response["next_token"] = str(start + len(rooms))
|
||||
|
||||
return HTTPStatus.OK, response
|
||||
|
||||
|
||||
class DestinationResetConnectionRestServlet(RestServlet):
|
||||
"""Reset destinations' connection timeouts and wake it up.
|
||||
This needs user to have administrator access in Synapse.
|
||||
|
||||
POST /_synapse/admin/v1/federation/destinations/<destination>/reset_connection
|
||||
{}
|
||||
|
||||
returns:
|
||||
200 OK otherwise an error.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns(
|
||||
"/federation/destinations/(?P<destination>[^/]+)/reset_connection$"
|
||||
)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
self._store = hs.get_datastore()
|
||||
self._authenticator = Authenticator(hs)
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, destination: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
if not await self._store.is_destination_known(destination):
|
||||
raise NotFoundError("Unknown destination")
|
||||
|
||||
retry_timings = await self._store.get_destination_retry_timings(destination)
|
||||
if not (retry_timings and retry_timings.retry_last_ts):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"The retry timing does not need to be reset for this destination.",
|
||||
)
|
||||
|
||||
# reset timings and wake up
|
||||
await self._authenticator.reset_retry_timings(destination)
|
||||
|
||||
return HTTPStatus.OK, {}
|
||||
|
||||
@@ -729,7 +729,7 @@ class RoomEventContextServlet(RestServlet):
|
||||
else:
|
||||
event_filter = None
|
||||
|
||||
results = await self.room_context_handler.get_event_context(
|
||||
event_context = await self.room_context_handler.get_event_context(
|
||||
requester,
|
||||
room_id,
|
||||
event_id,
|
||||
@@ -738,25 +738,34 @@ class RoomEventContextServlet(RestServlet):
|
||||
use_admin_priviledge=True,
|
||||
)
|
||||
|
||||
if not results:
|
||||
if not event_context:
|
||||
raise SynapseError(
|
||||
HTTPStatus.NOT_FOUND, "Event not found.", errcode=Codes.NOT_FOUND
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
aggregations = results.pop("aggregations", None)
|
||||
results["events_before"] = self._event_serializer.serialize_events(
|
||||
results["events_before"], time_now, bundle_aggregations=aggregations
|
||||
)
|
||||
results["event"] = self._event_serializer.serialize_event(
|
||||
results["event"], time_now, bundle_aggregations=aggregations
|
||||
)
|
||||
results["events_after"] = self._event_serializer.serialize_events(
|
||||
results["events_after"], time_now, bundle_aggregations=aggregations
|
||||
)
|
||||
results["state"] = self._event_serializer.serialize_events(
|
||||
results["state"], time_now
|
||||
)
|
||||
results = {
|
||||
"events_before": self._event_serializer.serialize_events(
|
||||
event_context.events_before,
|
||||
time_now,
|
||||
bundle_aggregations=event_context.aggregations,
|
||||
),
|
||||
"event": self._event_serializer.serialize_event(
|
||||
event_context.event,
|
||||
time_now,
|
||||
bundle_aggregations=event_context.aggregations,
|
||||
),
|
||||
"events_after": self._event_serializer.serialize_events(
|
||||
event_context.events_after,
|
||||
time_now,
|
||||
bundle_aggregations=event_context.aggregations,
|
||||
),
|
||||
"state": self._event_serializer.serialize_events(
|
||||
event_context.state, time_now
|
||||
),
|
||||
"start": event_context.start,
|
||||
"end": event_context.end,
|
||||
}
|
||||
|
||||
return HTTPStatus.OK, results
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ class AccountDataServlet(RestServlet):
|
||||
raise AuthError(403, "Cannot get account data for other users.")
|
||||
|
||||
event = await self.store.get_global_account_data_by_type_for_user(
|
||||
account_data_type, user_id
|
||||
user_id, account_data_type
|
||||
)
|
||||
|
||||
if event is None:
|
||||
|
||||
@@ -339,12 +339,19 @@ class UsernameAvailabilityRestServlet(RestServlet):
|
||||
),
|
||||
)
|
||||
|
||||
self.inhibit_user_in_use_error = (
|
||||
hs.config.registration.inhibit_user_in_use_error
|
||||
)
|
||||
|
||||
async def on_GET(self, request: Request) -> Tuple[int, JsonDict]:
|
||||
if not self.hs.config.registration.enable_registration:
|
||||
raise SynapseError(
|
||||
403, "Registration has been disabled", errcode=Codes.FORBIDDEN
|
||||
)
|
||||
|
||||
if self.inhibit_user_in_use_error:
|
||||
return 200, {"available": True}
|
||||
|
||||
ip = request.getClientIP()
|
||||
with self.ratelimiter.ratelimit(ip) as wait_deferred:
|
||||
await wait_deferred
|
||||
@@ -418,10 +425,14 @@ class RegisterRestServlet(RestServlet):
|
||||
self.ratelimiter = hs.get_registration_ratelimiter()
|
||||
self.password_policy_handler = hs.get_password_policy_handler()
|
||||
self.clock = hs.get_clock()
|
||||
self.password_auth_provider = hs.get_password_auth_provider()
|
||||
self._registration_enabled = self.hs.config.registration.enable_registration
|
||||
self._refresh_tokens_enabled = (
|
||||
hs.config.registration.refreshable_access_token_lifetime is not None
|
||||
)
|
||||
self._inhibit_user_in_use_error = (
|
||||
hs.config.registration.inhibit_user_in_use_error
|
||||
)
|
||||
|
||||
self._registration_flows = _calculate_registration_flows(
|
||||
hs.config, self.auth_handler
|
||||
@@ -564,6 +575,7 @@ class RegisterRestServlet(RestServlet):
|
||||
desired_username,
|
||||
guest_access_token=guest_access_token,
|
||||
assigned_user_id=registered_user_id,
|
||||
inhibit_user_in_use_error=self._inhibit_user_in_use_error,
|
||||
)
|
||||
|
||||
# Check if the user-interactive authentication flows are complete, if
|
||||
@@ -627,7 +639,16 @@ class RegisterRestServlet(RestServlet):
|
||||
if not password_hash:
|
||||
raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
|
||||
|
||||
desired_username = params.get("username", None)
|
||||
desired_username = await (
|
||||
self.password_auth_provider.get_username_for_registration(
|
||||
auth_result,
|
||||
params,
|
||||
)
|
||||
)
|
||||
|
||||
if desired_username is None:
|
||||
desired_username = params.get("username", None)
|
||||
|
||||
guest_access_token = params.get("guest_access_token", None)
|
||||
|
||||
if desired_username is not None:
|
||||
|
||||
@@ -706,27 +706,36 @@ class RoomEventContextServlet(RestServlet):
|
||||
else:
|
||||
event_filter = None
|
||||
|
||||
results = await self.room_context_handler.get_event_context(
|
||||
event_context = await self.room_context_handler.get_event_context(
|
||||
requester, room_id, event_id, limit, event_filter
|
||||
)
|
||||
|
||||
if not results:
|
||||
if not event_context:
|
||||
raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
aggregations = results.pop("aggregations", None)
|
||||
results["events_before"] = self._event_serializer.serialize_events(
|
||||
results["events_before"], time_now, bundle_aggregations=aggregations
|
||||
)
|
||||
results["event"] = self._event_serializer.serialize_event(
|
||||
results["event"], time_now, bundle_aggregations=aggregations
|
||||
)
|
||||
results["events_after"] = self._event_serializer.serialize_events(
|
||||
results["events_after"], time_now, bundle_aggregations=aggregations
|
||||
)
|
||||
results["state"] = self._event_serializer.serialize_events(
|
||||
results["state"], time_now
|
||||
)
|
||||
results = {
|
||||
"events_before": self._event_serializer.serialize_events(
|
||||
event_context.events_before,
|
||||
time_now,
|
||||
bundle_aggregations=event_context.aggregations,
|
||||
),
|
||||
"event": self._event_serializer.serialize_event(
|
||||
event_context.event,
|
||||
time_now,
|
||||
bundle_aggregations=event_context.aggregations,
|
||||
),
|
||||
"events_after": self._event_serializer.serialize_events(
|
||||
event_context.events_after,
|
||||
time_now,
|
||||
bundle_aggregations=event_context.aggregations,
|
||||
),
|
||||
"state": self._event_serializer.serialize_events(
|
||||
event_context.state, time_now
|
||||
),
|
||||
"start": event_context.start,
|
||||
"end": event_context.end,
|
||||
}
|
||||
|
||||
return 200, results
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.storage.databases.main.relations import BundledAggregations
|
||||
from synapse.types import JsonDict, StreamToken
|
||||
from synapse.util import json_decoder
|
||||
|
||||
@@ -526,7 +527,7 @@ class SyncRestServlet(RestServlet):
|
||||
|
||||
def serialize(
|
||||
events: Iterable[EventBase],
|
||||
aggregations: Optional[Dict[str, Dict[str, Any]]] = None,
|
||||
aggregations: Optional[Dict[str, BundledAggregations]] = None,
|
||||
) -> List[JsonDict]:
|
||||
return self._event_serializer.serialize_events(
|
||||
events,
|
||||
|
||||
@@ -321,14 +321,33 @@ def _iterate_over_text(
|
||||
|
||||
|
||||
def rebase_url(url: str, base: str) -> str:
|
||||
base_parts = list(urlparse.urlparse(base))
|
||||
"""
|
||||
Resolves a potentially relative `url` against an absolute `base` URL.
|
||||
|
||||
For example:
|
||||
|
||||
>>> rebase_url("subpage", "https://example.com/foo/")
|
||||
'https://example.com/foo/subpage'
|
||||
>>> rebase_url("sibling", "https://example.com/foo")
|
||||
'https://example.com/sibling'
|
||||
>>> rebase_url("/bar", "https://example.com/foo/")
|
||||
'https://example.com/bar'
|
||||
>>> rebase_url("https://alice.com/a/", "https://example.com/foo/")
|
||||
'https://alice.com/a'
|
||||
"""
|
||||
base_parts = urlparse.urlparse(base)
|
||||
# Convert the parsed URL to a list for (potential) modification.
|
||||
url_parts = list(urlparse.urlparse(url))
|
||||
if not url_parts[0]: # fix up schema
|
||||
url_parts[0] = base_parts[0] or "http"
|
||||
if not url_parts[1]: # fix up hostname
|
||||
url_parts[1] = base_parts[1]
|
||||
# Add a scheme, if one does not exist.
|
||||
if not url_parts[0]:
|
||||
url_parts[0] = base_parts.scheme or "http"
|
||||
# Fix up the hostname, if this is not a data URL.
|
||||
if url_parts[0] != "data" and not url_parts[1]:
|
||||
url_parts[1] = base_parts.netloc
|
||||
# If the path does not start with a /, nest it under the base path's last
|
||||
# directory.
|
||||
if not url_parts[2].startswith("/"):
|
||||
url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts[2]) + url_parts[2]
|
||||
url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts.path) + url_parts[2]
|
||||
return urlparse.urlunparse(url_parts)
|
||||
|
||||
|
||||
|
||||
@@ -21,8 +21,9 @@ import re
|
||||
import shutil
|
||||
import sys
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Iterable, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, BinaryIO, Iterable, Optional, Tuple
|
||||
from urllib import parse as urlparse
|
||||
from urllib.request import urlopen
|
||||
|
||||
import attr
|
||||
|
||||
@@ -70,6 +71,17 @@ ONE_DAY = 24 * ONE_HOUR
|
||||
IMAGE_CACHE_EXPIRY_MS = 2 * ONE_DAY
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class DownloadResult:
|
||||
length: int
|
||||
uri: str
|
||||
response_code: int
|
||||
media_type: str
|
||||
download_name: Optional[str]
|
||||
expires: int
|
||||
etag: Optional[str]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class MediaInfo:
|
||||
"""
|
||||
@@ -256,7 +268,7 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
if oembed_url:
|
||||
url_to_download = oembed_url
|
||||
|
||||
media_info = await self._download_url(url_to_download, user)
|
||||
media_info = await self._handle_url(url_to_download, user)
|
||||
|
||||
logger.debug("got media_info of '%s'", media_info)
|
||||
|
||||
@@ -297,7 +309,9 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
oembed_url = self._oembed.autodiscover_from_html(tree)
|
||||
og_from_oembed: JsonDict = {}
|
||||
if oembed_url:
|
||||
oembed_info = await self._download_url(oembed_url, user)
|
||||
oembed_info = await self._handle_url(
|
||||
oembed_url, user, allow_data_urls=True
|
||||
)
|
||||
(
|
||||
og_from_oembed,
|
||||
author_name,
|
||||
@@ -367,7 +381,135 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
|
||||
return jsonog.encode("utf8")
|
||||
|
||||
async def _download_url(self, url: str, user: UserID) -> MediaInfo:
|
||||
async def _download_url(self, url: str, output_stream: BinaryIO) -> DownloadResult:
|
||||
"""
|
||||
Fetches a remote URL and parses the headers.
|
||||
|
||||
Args:
|
||||
url: The URL to fetch.
|
||||
output_stream: The stream to write the content to.
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
Media length, URL downloaded, the HTTP response code,
|
||||
the media type, the downloaded file name, the number of
|
||||
milliseconds the result is valid for, the etag header.
|
||||
"""
|
||||
|
||||
try:
|
||||
logger.debug("Trying to get preview for url '%s'", url)
|
||||
length, headers, uri, code = await self.client.get_file(
|
||||
url,
|
||||
output_stream=output_stream,
|
||||
max_size=self.max_spider_size,
|
||||
headers={"Accept-Language": self.url_preview_accept_language},
|
||||
)
|
||||
except SynapseError:
|
||||
# Pass SynapseErrors through directly, so that the servlet
|
||||
# handler will return a SynapseError to the client instead of
|
||||
# blank data or a 500.
|
||||
raise
|
||||
except DNSLookupError:
|
||||
# DNS lookup returned no results
|
||||
# Note: This will also be the case if one of the resolved IP
|
||||
# addresses is blacklisted
|
||||
raise SynapseError(
|
||||
502,
|
||||
"DNS resolution failure during URL preview generation",
|
||||
Codes.UNKNOWN,
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXME: pass through 404s and other error messages nicely
|
||||
logger.warning("Error downloading %s: %r", url, e)
|
||||
|
||||
raise SynapseError(
|
||||
500,
|
||||
"Failed to download content: %s"
|
||||
% (traceback.format_exception_only(sys.exc_info()[0], e),),
|
||||
Codes.UNKNOWN,
|
||||
)
|
||||
|
||||
if b"Content-Type" in headers:
|
||||
media_type = headers[b"Content-Type"][0].decode("ascii")
|
||||
else:
|
||||
media_type = "application/octet-stream"
|
||||
|
||||
download_name = get_filename_from_headers(headers)
|
||||
|
||||
# FIXME: we should calculate a proper expiration based on the
|
||||
# Cache-Control and Expire headers. But for now, assume 1 hour.
|
||||
expires = ONE_HOUR
|
||||
etag = headers[b"ETag"][0].decode("ascii") if b"ETag" in headers else None
|
||||
|
||||
return DownloadResult(
|
||||
length, uri, code, media_type, download_name, expires, etag
|
||||
)
|
||||
|
||||
async def _parse_data_url(
|
||||
self, url: str, output_stream: BinaryIO
|
||||
) -> DownloadResult:
|
||||
"""
|
||||
Parses a data: URL.
|
||||
|
||||
Args:
|
||||
url: The URL to parse.
|
||||
output_stream: The stream to write the content to.
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
Media length, URL downloaded, the HTTP response code,
|
||||
the media type, the downloaded file name, the number of
|
||||
milliseconds the result is valid for, the etag header.
|
||||
"""
|
||||
|
||||
try:
|
||||
logger.debug("Trying to parse data url '%s'", url)
|
||||
with urlopen(url) as url_info:
|
||||
# TODO Can this be more efficient.
|
||||
output_stream.write(url_info.read())
|
||||
except Exception as e:
|
||||
logger.warning("Error parsing data: URL %s: %r", url, e)
|
||||
|
||||
raise SynapseError(
|
||||
500,
|
||||
"Failed to parse data URL: %s"
|
||||
% (traceback.format_exception_only(sys.exc_info()[0], e),),
|
||||
Codes.UNKNOWN,
|
||||
)
|
||||
|
||||
return DownloadResult(
|
||||
# Read back the length that has been written.
|
||||
length=output_stream.tell(),
|
||||
uri=url,
|
||||
# If it was parsed, consider this a 200 OK.
|
||||
response_code=200,
|
||||
# urlopen shoves the media-type from the data URL into the content type
|
||||
# header object.
|
||||
media_type=url_info.headers.get_content_type(),
|
||||
# Some features are not supported by data: URLs.
|
||||
download_name=None,
|
||||
expires=ONE_HOUR,
|
||||
etag=None,
|
||||
)
|
||||
|
||||
async def _handle_url(
|
||||
self, url: str, user: UserID, allow_data_urls: bool = False
|
||||
) -> MediaInfo:
|
||||
"""
|
||||
Fetches content from a URL and parses the result to generate a MediaInfo.
|
||||
|
||||
It uses the media storage provider to persist the fetched content and
|
||||
stores the mapping into the database.
|
||||
|
||||
Args:
|
||||
url: The URL to fetch.
|
||||
user: The user who ahs requested this URL.
|
||||
allow_data_urls: True if data URLs should be allowed.
|
||||
|
||||
Returns:
|
||||
A MediaInfo object describing the fetched content.
|
||||
"""
|
||||
|
||||
# TODO: we should probably honour robots.txt... except in practice
|
||||
# we're most likely being explicitly triggered by a human rather than a
|
||||
# bot, so are we really a robot?
|
||||
@@ -377,61 +519,27 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
file_info = FileInfo(server_name=None, file_id=file_id, url_cache=True)
|
||||
|
||||
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
|
||||
try:
|
||||
logger.debug("Trying to get preview for url '%s'", url)
|
||||
length, headers, uri, code = await self.client.get_file(
|
||||
url,
|
||||
output_stream=f,
|
||||
max_size=self.max_spider_size,
|
||||
headers={"Accept-Language": self.url_preview_accept_language},
|
||||
)
|
||||
except SynapseError:
|
||||
# Pass SynapseErrors through directly, so that the servlet
|
||||
# handler will return a SynapseError to the client instead of
|
||||
# blank data or a 500.
|
||||
raise
|
||||
except DNSLookupError:
|
||||
# DNS lookup returned no results
|
||||
# Note: This will also be the case if one of the resolved IP
|
||||
# addresses is blacklisted
|
||||
raise SynapseError(
|
||||
502,
|
||||
"DNS resolution failure during URL preview generation",
|
||||
Codes.UNKNOWN,
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXME: pass through 404s and other error messages nicely
|
||||
logger.warning("Error downloading %s: %r", url, e)
|
||||
if url.startswith("data:"):
|
||||
if not allow_data_urls:
|
||||
raise SynapseError(
|
||||
500, "Previewing of data: URLs is forbidden", Codes.UNKNOWN
|
||||
)
|
||||
|
||||
raise SynapseError(
|
||||
500,
|
||||
"Failed to download content: %s"
|
||||
% (traceback.format_exception_only(sys.exc_info()[0], e),),
|
||||
Codes.UNKNOWN,
|
||||
)
|
||||
await finish()
|
||||
|
||||
if b"Content-Type" in headers:
|
||||
media_type = headers[b"Content-Type"][0].decode("ascii")
|
||||
download_result = await self._parse_data_url(url, f)
|
||||
else:
|
||||
media_type = "application/octet-stream"
|
||||
download_result = await self._download_url(url, f)
|
||||
|
||||
download_name = get_filename_from_headers(headers)
|
||||
|
||||
# FIXME: we should calculate a proper expiration based on the
|
||||
# Cache-Control and Expire headers. But for now, assume 1 hour.
|
||||
expires = ONE_HOUR
|
||||
etag = headers[b"ETag"][0].decode("ascii") if b"ETag" in headers else None
|
||||
await finish()
|
||||
|
||||
try:
|
||||
time_now_ms = self.clock.time_msec()
|
||||
|
||||
await self.store.store_local_media(
|
||||
media_id=file_id,
|
||||
media_type=media_type,
|
||||
media_type=download_result.media_type,
|
||||
time_now_ms=time_now_ms,
|
||||
upload_name=download_name,
|
||||
media_length=length,
|
||||
upload_name=download_result.download_name,
|
||||
media_length=download_result.length,
|
||||
user_id=user,
|
||||
url_cache=url,
|
||||
)
|
||||
@@ -444,16 +552,16 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
raise
|
||||
|
||||
return MediaInfo(
|
||||
media_type=media_type,
|
||||
media_length=length,
|
||||
download_name=download_name,
|
||||
media_type=download_result.media_type,
|
||||
media_length=download_result.length,
|
||||
download_name=download_result.download_name,
|
||||
created_ts_ms=time_now_ms,
|
||||
filesystem_id=file_id,
|
||||
filename=fname,
|
||||
uri=uri,
|
||||
response_code=code,
|
||||
expires=expires,
|
||||
etag=etag,
|
||||
uri=download_result.uri,
|
||||
response_code=download_result.response_code,
|
||||
expires=download_result.expires,
|
||||
etag=download_result.etag,
|
||||
)
|
||||
|
||||
async def _precache_image_url(
|
||||
@@ -474,8 +582,8 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
# FIXME: it might be cleaner to use the same flow as the main /preview_url
|
||||
# request itself and benefit from the same caching etc. But for now we
|
||||
# just rely on the caching on the master request to speed things up.
|
||||
image_info = await self._download_url(
|
||||
rebase_url(og["og:image"], media_info.uri), user
|
||||
image_info = await self._handle_url(
|
||||
rebase_url(og["og:image"], media_info.uri), user, allow_data_urls=True
|
||||
)
|
||||
|
||||
if _is_media(image_info.media_type):
|
||||
|
||||
@@ -702,6 +702,7 @@ class DatabasePool:
|
||||
func: Callable[..., R],
|
||||
*args: Any,
|
||||
db_autocommit: bool = False,
|
||||
isolation_level: Optional[int] = None,
|
||||
**kwargs: Any,
|
||||
) -> R:
|
||||
"""Starts a transaction on the database and runs a given function
|
||||
@@ -724,6 +725,7 @@ class DatabasePool:
|
||||
called multiple times if the transaction is retried, so must
|
||||
correctly handle that case.
|
||||
|
||||
isolation_level: Set the server isolation level for this transaction.
|
||||
args: positional args to pass to `func`
|
||||
kwargs: named args to pass to `func`
|
||||
|
||||
@@ -746,6 +748,7 @@ class DatabasePool:
|
||||
func,
|
||||
*args,
|
||||
db_autocommit=db_autocommit,
|
||||
isolation_level=isolation_level,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -763,6 +766,7 @@ class DatabasePool:
|
||||
func: Callable[..., R],
|
||||
*args: Any,
|
||||
db_autocommit: bool = False,
|
||||
isolation_level: Optional[int] = None,
|
||||
**kwargs: Any,
|
||||
) -> R:
|
||||
"""Wraps the .runWithConnection() method on the underlying db_pool.
|
||||
@@ -775,6 +779,7 @@ class DatabasePool:
|
||||
db_autocommit: Whether to run the function in "autocommit" mode,
|
||||
i.e. outside of a transaction. This is useful for transaction
|
||||
that are only a single query. Currently only affects postgres.
|
||||
isolation_level: Set the server isolation level for this transaction.
|
||||
kwargs: named args to pass to `func`
|
||||
|
||||
Returns:
|
||||
@@ -834,6 +839,10 @@ class DatabasePool:
|
||||
try:
|
||||
if db_autocommit:
|
||||
self.engine.attempt_to_set_autocommit(conn, True)
|
||||
if isolation_level is not None:
|
||||
self.engine.attempt_to_set_isolation_level(
|
||||
conn, isolation_level
|
||||
)
|
||||
|
||||
db_conn = LoggingDatabaseConnection(
|
||||
conn, self.engine, "runWithConnection"
|
||||
@@ -842,6 +851,8 @@ class DatabasePool:
|
||||
finally:
|
||||
if db_autocommit:
|
||||
self.engine.attempt_to_set_autocommit(conn, False)
|
||||
if isolation_level:
|
||||
self.engine.attempt_to_set_isolation_level(conn, None)
|
||||
|
||||
return await make_deferred_yieldable(
|
||||
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
||||
|
||||
@@ -26,6 +26,7 @@ from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import (
|
||||
AbstractStreamIdGenerator,
|
||||
@@ -44,7 +45,7 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
@@ -158,9 +159,9 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
"get_account_data_for_user", get_account_data_for_user_txn
|
||||
)
|
||||
|
||||
@cached(num_args=2, max_entries=5000)
|
||||
@cached(num_args=2, max_entries=5000, tree=True)
|
||||
async def get_global_account_data_by_type_for_user(
|
||||
self, data_type: str, user_id: str
|
||||
self, user_id: str, data_type: str
|
||||
) -> Optional[JsonDict]:
|
||||
"""
|
||||
Returns:
|
||||
@@ -179,7 +180,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
else:
|
||||
return None
|
||||
|
||||
@cached(num_args=2)
|
||||
@cached(num_args=2, tree=True)
|
||||
async def get_account_data_for_room(
|
||||
self, user_id: str, room_id: str
|
||||
) -> Dict[str, JsonDict]:
|
||||
@@ -210,7 +211,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
"get_account_data_for_room", get_account_data_for_room_txn
|
||||
)
|
||||
|
||||
@cached(num_args=3, max_entries=5000)
|
||||
@cached(num_args=3, max_entries=5000, tree=True)
|
||||
async def get_account_data_for_room_and_type(
|
||||
self, user_id: str, room_id: str, account_data_type: str
|
||||
) -> Optional[JsonDict]:
|
||||
@@ -392,7 +393,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
for row in rows:
|
||||
if not row.room_id:
|
||||
self.get_global_account_data_by_type_for_user.invalidate(
|
||||
(row.data_type, row.user_id)
|
||||
(row.user_id, row.data_type)
|
||||
)
|
||||
self.get_account_data_for_user.invalidate((row.user_id,))
|
||||
self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
|
||||
@@ -476,7 +477,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
|
||||
self.get_account_data_for_user.invalidate((user_id,))
|
||||
self.get_global_account_data_by_type_for_user.invalidate(
|
||||
(account_data_type, user_id)
|
||||
(user_id, account_data_type)
|
||||
)
|
||||
|
||||
return self._account_data_id_gen.get_current_token()
|
||||
@@ -546,6 +547,74 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
|
||||
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
|
||||
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
|
||||
|
||||
async def purge_account_data_for_user(self, user_id: str) -> None:
|
||||
"""
|
||||
Removes the account data for a user.
|
||||
|
||||
This is intended to be used upon user deactivation and also removes any
|
||||
derived information from account data (e.g. push rules and ignored users).
|
||||
|
||||
Args:
|
||||
user_id: The user ID to remove data for.
|
||||
"""
|
||||
|
||||
def purge_account_data_for_user_txn(txn: LoggingTransaction) -> None:
|
||||
# Purge from the primary account_data tables.
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn, table="account_data", keyvalues={"user_id": user_id}
|
||||
)
|
||||
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn, table="room_account_data", keyvalues={"user_id": user_id}
|
||||
)
|
||||
|
||||
# Purge from ignored_users where this user is the ignorer.
|
||||
# N.B. We don't purge where this user is the ignoree, because that
|
||||
# interferes with other users' account data.
|
||||
# It's also not this user's data to delete!
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id}
|
||||
)
|
||||
|
||||
# Remove the push rules
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn, table="push_rules", keyvalues={"user_name": user_id}
|
||||
)
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn, table="push_rules_enable", keyvalues={"user_name": user_id}
|
||||
)
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn, table="push_rules_stream", keyvalues={"user_id": user_id}
|
||||
)
|
||||
|
||||
# Invalidate caches as appropriate
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_account_data_for_room_and_type, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_account_data_for_user, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_global_account_data_by_type_for_user, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_account_data_for_room, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_push_rules_for_user, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_push_rules_enabled_for_user, (user_id,)
|
||||
)
|
||||
# This user might be contained in the ignored_by cache for other users,
|
||||
# so we have to invalidate it all.
|
||||
self._invalidate_all_cache_and_stream(txn, self.ignored_by)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"purge_account_data_for_user_txn",
|
||||
purge_account_data_for_user_txn,
|
||||
)
|
||||
|
||||
|
||||
class AccountDataStore(AccountDataWorkerStore):
|
||||
pass
|
||||
|
||||
@@ -384,7 +384,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
"get_new_events_for_appservice", get_new_events_for_appservice_txn
|
||||
)
|
||||
|
||||
events = await self.get_events_as_list(event_ids)
|
||||
events = await self.get_events_as_list(event_ids, get_prev_content=True)
|
||||
|
||||
return upper_bound, events
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ class _NoChainCoverIndex(Exception):
|
||||
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
|
||||
|
||||
|
||||
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
|
||||
class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
|
||||
@@ -1389,6 +1389,8 @@ class PersistEventsStore:
|
||||
"received_ts",
|
||||
"sender",
|
||||
"contains_url",
|
||||
"state_key",
|
||||
"rejection_reason",
|
||||
),
|
||||
values=(
|
||||
(
|
||||
@@ -1405,8 +1407,10 @@ class PersistEventsStore:
|
||||
self._clock.time_msec(),
|
||||
event.sender,
|
||||
"url" in event.content and isinstance(event.content["url"], str),
|
||||
event.get_state_key(),
|
||||
context.rejected or None,
|
||||
)
|
||||
for event, _ in events_and_contexts
|
||||
for event, context in events_and_contexts
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1456,6 +1460,7 @@ class PersistEventsStore:
|
||||
for event, context in events_and_contexts:
|
||||
if context.rejected:
|
||||
# Insert the event_id into the rejections table
|
||||
# (events.rejection_reason has already been done)
|
||||
self._store_rejections_txn(txn, event.event_id, context.rejected)
|
||||
to_remove.add(event)
|
||||
|
||||
|
||||
@@ -390,7 +390,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
"event_search",
|
||||
"events",
|
||||
"group_rooms",
|
||||
"public_room_list_stream",
|
||||
"receipts_graph",
|
||||
"receipts_linearized",
|
||||
"room_aliases",
|
||||
|
||||
@@ -13,17 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union, cast
|
||||
|
||||
import attr
|
||||
from frozendict import frozendict
|
||||
@@ -43,6 +33,7 @@ from synapse.storage.relations import (
|
||||
PaginationChunk,
|
||||
RelationPaginationToken,
|
||||
)
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -51,6 +42,30 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _ThreadAggregation:
|
||||
latest_event: EventBase
|
||||
count: int
|
||||
current_user_participated: bool
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class BundledAggregations:
|
||||
"""
|
||||
The bundled aggregations for an event.
|
||||
|
||||
Some values require additional processing during serialization.
|
||||
"""
|
||||
|
||||
annotations: Optional[JsonDict] = None
|
||||
references: Optional[JsonDict] = None
|
||||
replace: Optional[EventBase] = None
|
||||
thread: Optional[_ThreadAggregation] = None
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.annotations or self.references or self.replace or self.thread)
|
||||
|
||||
|
||||
class RelationsWorkerStore(SQLBaseStore):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -585,7 +600,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
|
||||
async def _get_bundled_aggregation_for_event(
|
||||
self, event: EventBase, user_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
) -> Optional[BundledAggregations]:
|
||||
"""Generate bundled aggregations for an event.
|
||||
|
||||
Note that this does not use a cache, but depends on cached methods.
|
||||
@@ -616,24 +631,24 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
# The bundled aggregations to include, a mapping of relation type to a
|
||||
# type-specific value. Some types include the direct return type here
|
||||
# while others need more processing during serialization.
|
||||
aggregations: Dict[str, Any] = {}
|
||||
aggregations = BundledAggregations()
|
||||
|
||||
annotations = await self.get_aggregation_groups_for_event(event_id, room_id)
|
||||
if annotations.chunk:
|
||||
aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
|
||||
aggregations.annotations = annotations.to_dict()
|
||||
|
||||
references = await self.get_relations_for_event(
|
||||
event_id, room_id, RelationTypes.REFERENCE, direction="f"
|
||||
)
|
||||
if references.chunk:
|
||||
aggregations[RelationTypes.REFERENCE] = references.to_dict()
|
||||
aggregations.references = references.to_dict()
|
||||
|
||||
edit = None
|
||||
if event.type == EventTypes.Message:
|
||||
edit = await self.get_applicable_edit(event_id, room_id)
|
||||
|
||||
if edit:
|
||||
aggregations[RelationTypes.REPLACE] = edit
|
||||
aggregations.replace = edit
|
||||
|
||||
# If this event is the start of a thread, include a summary of the replies.
|
||||
if self._msc3440_enabled:
|
||||
@@ -644,11 +659,11 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
event_id, room_id, user_id
|
||||
)
|
||||
if latest_thread_event:
|
||||
aggregations[RelationTypes.THREAD] = {
|
||||
"latest_event": latest_thread_event,
|
||||
"count": thread_count,
|
||||
"current_user_participated": participated,
|
||||
}
|
||||
aggregations.thread = _ThreadAggregation(
|
||||
latest_event=latest_thread_event,
|
||||
count=thread_count,
|
||||
current_user_participated=participated,
|
||||
)
|
||||
|
||||
# Store the bundled aggregations in the event metadata for later use.
|
||||
return aggregations
|
||||
@@ -657,7 +672,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
self,
|
||||
events: Iterable[EventBase],
|
||||
user_id: str,
|
||||
) -> Dict[str, Dict[str, Any]]:
|
||||
) -> Dict[str, BundledAggregations]:
|
||||
"""Generate bundled aggregations for events.
|
||||
|
||||
Args:
|
||||
@@ -676,7 +691,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
results = {}
|
||||
for event in events:
|
||||
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
|
||||
if event_result is not None:
|
||||
if event_result:
|
||||
results[event.event_id] = event_result
|
||||
|
||||
return results
|
||||
|
||||
@@ -12,16 +12,19 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
from typing import Collection, Dict, List, Tuple
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.crypto.event_signing import compute_event_reference_hash
|
||||
from synapse.storage.databases.main.events_worker import (
|
||||
EventRedactBehaviour,
|
||||
EventsWorkerStore,
|
||||
)
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
|
||||
class SignatureWorkerStore(SQLBaseStore):
|
||||
class SignatureWorkerStore(EventsWorkerStore):
|
||||
@cached()
|
||||
def get_event_reference_hash(self, event_id):
|
||||
# This is a dummy function to allow get_event_reference_hashes
|
||||
@@ -32,7 +35,7 @@ class SignatureWorkerStore(SQLBaseStore):
|
||||
cached_method_name="get_event_reference_hash", list_name="event_ids", num_args=1
|
||||
)
|
||||
async def get_event_reference_hashes(
|
||||
self, event_ids: Iterable[str]
|
||||
self, event_ids: Collection[str]
|
||||
) -> Dict[str, Dict[str, bytes]]:
|
||||
"""Get all hashes for given events.
|
||||
|
||||
@@ -41,18 +44,27 @@ class SignatureWorkerStore(SQLBaseStore):
|
||||
|
||||
Returns:
|
||||
A mapping of event ID to a mapping of algorithm to hash.
|
||||
Returns an empty dict for a given event id if that event is unknown.
|
||||
"""
|
||||
events = await self.get_events(
|
||||
event_ids,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
allow_rejected=True,
|
||||
)
|
||||
|
||||
def f(txn):
|
||||
return {
|
||||
event_id: self._get_event_reference_hashes_txn(txn, event_id)
|
||||
for event_id in event_ids
|
||||
}
|
||||
hashes: Dict[str, Dict[str, bytes]] = {}
|
||||
for event_id in event_ids:
|
||||
event = events.get(event_id)
|
||||
if event is None:
|
||||
hashes[event_id] = {}
|
||||
else:
|
||||
ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
|
||||
hashes[event_id] = {ref_alg: ref_hash_bytes}
|
||||
|
||||
return await self.db_pool.runInteraction("get_event_reference_hashes", f)
|
||||
return hashes
|
||||
|
||||
async def add_event_hashes(
|
||||
self, event_ids: Iterable[str]
|
||||
self, event_ids: Collection[str]
|
||||
) -> List[Tuple[str, Dict[str, str]]]:
|
||||
"""
|
||||
|
||||
@@ -70,24 +82,6 @@ class SignatureWorkerStore(SQLBaseStore):
|
||||
|
||||
return list(encoded_hashes.items())
|
||||
|
||||
def _get_event_reference_hashes_txn(
|
||||
self, txn: Cursor, event_id: str
|
||||
) -> Dict[str, bytes]:
|
||||
"""Get all the hashes for a given PDU.
|
||||
Args:
|
||||
txn:
|
||||
event_id: Id for the Event.
|
||||
Returns:
|
||||
A mapping of algorithm -> hash.
|
||||
"""
|
||||
query = (
|
||||
"SELECT algorithm, hash"
|
||||
" FROM event_reference_hashes"
|
||||
" WHERE event_id = ?"
|
||||
)
|
||||
txn.execute(query, (event_id,))
|
||||
return {k: v for k, v in txn}
|
||||
|
||||
|
||||
class SignatureStore(SignatureWorkerStore):
|
||||
"""Persistence for event signatures and hashes"""
|
||||
|
||||
@@ -81,6 +81,14 @@ class _EventDictReturn:
|
||||
stream_ordering: int
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _EventsAround:
|
||||
events_before: List[EventBase]
|
||||
events_after: List[EventBase]
|
||||
start: RoomStreamToken
|
||||
end: RoomStreamToken
|
||||
|
||||
|
||||
def generate_pagination_where_clause(
|
||||
direction: str,
|
||||
column_names: Tuple[str, str],
|
||||
@@ -846,7 +854,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
before_limit: int,
|
||||
after_limit: int,
|
||||
event_filter: Optional[Filter] = None,
|
||||
) -> dict:
|
||||
) -> _EventsAround:
|
||||
"""Retrieve events and pagination tokens around a given event in a
|
||||
room.
|
||||
"""
|
||||
@@ -869,12 +877,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
list(results["after"]["event_ids"]), get_prev_content=True
|
||||
)
|
||||
|
||||
return {
|
||||
"events_before": events_before,
|
||||
"events_after": events_after,
|
||||
"start": results["before"]["token"],
|
||||
"end": results["after"]["token"],
|
||||
}
|
||||
return _EventsAround(
|
||||
events_before=events_before,
|
||||
events_after=events_after,
|
||||
start=results["before"]["token"],
|
||||
end=results["after"]["token"],
|
||||
)
|
||||
|
||||
def _get_events_around_txn(
|
||||
self,
|
||||
|
||||
@@ -561,6 +561,54 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
|
||||
"get_destinations_paginate_txn", get_destinations_paginate_txn
|
||||
)
|
||||
|
||||
async def get_destination_rooms_paginate(
|
||||
self, destination: str, start: int, limit: int, direction: str = "f"
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
"""Function to retrieve a paginated list of destination's rooms.
|
||||
This will return a json list of rooms and the
|
||||
total number of rooms.
|
||||
|
||||
Args:
|
||||
destination: the destination to query
|
||||
start: start number to begin the query from
|
||||
limit: number of rows to retrieve
|
||||
direction: sort ascending or descending by room_id
|
||||
Returns:
|
||||
A tuple of a dict of rooms and a count of total rooms.
|
||||
"""
|
||||
|
||||
def get_destination_rooms_paginate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
|
||||
if direction == "b":
|
||||
order = "DESC"
|
||||
else:
|
||||
order = "ASC"
|
||||
|
||||
sql = """
|
||||
SELECT COUNT(*) as total_rooms
|
||||
FROM destination_rooms
|
||||
WHERE destination = ?
|
||||
"""
|
||||
txn.execute(sql, [destination])
|
||||
count = cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
rooms = self.db_pool.simple_select_list_paginate_txn(
|
||||
txn=txn,
|
||||
table="destination_rooms",
|
||||
orderby="room_id",
|
||||
start=start,
|
||||
limit=limit,
|
||||
retcols=("room_id", "stream_ordering"),
|
||||
order_direction=order,
|
||||
)
|
||||
return rooms, count
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_destination_rooms_paginate_txn", get_destination_rooms_paginate_txn
|
||||
)
|
||||
|
||||
async def is_destination_known(self, destination: str) -> bool:
|
||||
"""Check if a destination is known to the server."""
|
||||
result = await self.db_pool.simple_select_one_onecol(
|
||||
|
||||
@@ -12,11 +12,18 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import abc
|
||||
from typing import Generic, TypeVar
|
||||
from enum import IntEnum
|
||||
from typing import Generic, Optional, TypeVar
|
||||
|
||||
from synapse.storage.types import Connection
|
||||
|
||||
|
||||
class IsolationLevel(IntEnum):
|
||||
READ_COMMITTED: int = 1
|
||||
REPEATABLE_READ: int = 2
|
||||
SERIALIZABLE: int = 3
|
||||
|
||||
|
||||
class IncorrectDatabaseSetup(RuntimeError):
|
||||
pass
|
||||
|
||||
@@ -109,3 +116,13 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
|
||||
commit/rollback the connections.
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def attempt_to_set_isolation_level(
|
||||
self, conn: Connection, isolation_level: Optional[int]
|
||||
):
|
||||
"""Attempt to set the connections isolation level.
|
||||
|
||||
Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default.
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -13,8 +13,13 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Mapping, Optional
|
||||
|
||||
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
|
||||
from synapse.storage.engines._base import (
|
||||
BaseDatabaseEngine,
|
||||
IncorrectDatabaseSetup,
|
||||
IsolationLevel,
|
||||
)
|
||||
from synapse.storage.types import Connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -34,6 +39,15 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||
self.synchronous_commit = database_config.get("synchronous_commit", True)
|
||||
self._version = None # unknown as yet
|
||||
|
||||
self.isolation_level_map: Mapping[int, int] = {
|
||||
IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED,
|
||||
IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
|
||||
IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE,
|
||||
}
|
||||
self.default_isolation_level = (
|
||||
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
||||
)
|
||||
|
||||
@property
|
||||
def single_threaded(self) -> bool:
|
||||
return False
|
||||
@@ -46,8 +60,8 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||
self._version = db_conn.server_version
|
||||
|
||||
# Are we on a supported PostgreSQL version?
|
||||
if not allow_outdated_version and self._version < 90600:
|
||||
raise RuntimeError("Synapse requires PostgreSQL 9.6 or above.")
|
||||
if not allow_outdated_version and self._version < 100000:
|
||||
raise RuntimeError("Synapse requires PostgreSQL 10 or above.")
|
||||
|
||||
with db_conn.cursor() as txn:
|
||||
txn.execute("SHOW SERVER_ENCODING")
|
||||
@@ -104,9 +118,7 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||
return sql.replace("?", "%s")
|
||||
|
||||
def on_new_connection(self, db_conn):
|
||||
db_conn.set_isolation_level(
|
||||
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
||||
)
|
||||
db_conn.set_isolation_level(self.default_isolation_level)
|
||||
|
||||
# Set the bytea output to escape, vs the default of hex
|
||||
cursor = db_conn.cursor()
|
||||
@@ -175,3 +187,12 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||
|
||||
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
|
||||
return conn.set_session(autocommit=autocommit) # type: ignore
|
||||
|
||||
def attempt_to_set_isolation_level(
|
||||
self, conn: Connection, isolation_level: Optional[int]
|
||||
):
|
||||
if isolation_level is None:
|
||||
isolation_level = self.default_isolation_level
|
||||
else:
|
||||
isolation_level = self.isolation_level_map[isolation_level]
|
||||
return conn.set_isolation_level(isolation_level) # type: ignore
|
||||
|
||||
@@ -15,6 +15,7 @@ import platform
|
||||
import struct
|
||||
import threading
|
||||
import typing
|
||||
from typing import Optional
|
||||
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.types import Connection
|
||||
@@ -122,6 +123,12 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
|
||||
# set the connection to autocommit mode.
|
||||
pass
|
||||
|
||||
def attempt_to_set_isolation_level(
|
||||
self, conn: Connection, isolation_level: Optional[int]
|
||||
):
|
||||
# All transactions are SERIALIZABLE by default in sqllite
|
||||
pass
|
||||
|
||||
|
||||
# Following functions taken from: https://github.com/coleifer/peewee
|
||||
|
||||
|
||||
@@ -499,9 +499,12 @@ def _upgrade_existing_database(
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module) # type: ignore
|
||||
|
||||
logger.info("Running script %s", relative_path)
|
||||
module.run_create(cur, database_engine) # type: ignore
|
||||
if not is_empty:
|
||||
if hasattr(module, "run_create"):
|
||||
logger.info("Running %s:run_create", relative_path)
|
||||
module.run_create(cur, database_engine) # type: ignore
|
||||
|
||||
if not is_empty and hasattr(module, "run_upgrade"):
|
||||
logger.info("Running %s:run_upgrade", relative_path)
|
||||
module.run_upgrade(cur, database_engine, config=config) # type: ignore
|
||||
elif ext == ".pyc" or file_name == "__pycache__":
|
||||
# Sometimes .pyc files turn up anyway even though we've
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
SCHEMA_VERSION = 67 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 68 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -53,11 +53,18 @@ Changes in SCHEMA_VERSION = 66:
|
||||
|
||||
Changes in SCHEMA_VERSION = 67:
|
||||
- state_events.prev_state is no longer written to.
|
||||
|
||||
Changes in SCHEMA_VERSION = 68:
|
||||
- event_reference_hashes is no longer read.
|
||||
- `events` has `state_key` and `rejection_reason` columns, which are populated for
|
||||
new events.
|
||||
"""
|
||||
|
||||
|
||||
SCHEMA_COMPAT_VERSION = (
|
||||
61 # 61: Remove unused tables `user_stats_historical` and `room_stats_historical`
|
||||
# we now have `state_key` columns in both `events` and `state_events`, so
|
||||
# now incompatible with synapses wth SCHEMA_VERSION < 66.
|
||||
66
|
||||
)
|
||||
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user