1
0

Merge branch 'madlittlemods/13356-messages-investigation-scratch-v1' into maddlittlemods/msc2716-many-batches-optimization

Conflicts:
	synapse/handlers/federation.py
	synapse/storage/databases/main/cache.py
	synapse/storage/databases/main/event_federation.py
This commit is contained in:
Eric Eastwood
2022-09-26 15:28:14 -05:00
76 changed files with 3735 additions and 306 deletions

View File

@@ -8,6 +8,7 @@
!README.rst
!pyproject.toml
!poetry.lock
!Cargo.lock
!build_rust.py
rust/target

View File

@@ -94,7 +94,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
toolchain: 1.58.1
override: true
components: clippy
- uses: Swatinem/rust-cache@v2
@@ -112,7 +112,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
toolchain: 1.58.1
override: true
components: rustfmt
- uses: Swatinem/rust-cache@v2
@@ -204,7 +204,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
toolchain: 1.58.1
override: true
- uses: Swatinem/rust-cache@v2
@@ -320,7 +320,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
toolchain: 1.58.1
override: true
- uses: Swatinem/rust-cache@v2
@@ -452,7 +452,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
toolchain: 1.58.1
override: true
- uses: Swatinem/rust-cache@v2
@@ -478,7 +478,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.61.0
toolchain: 1.58.1
override: true
- uses: Swatinem/rust-cache@v2

3
.gitignore vendored
View File

@@ -15,8 +15,9 @@ _trial_temp*/
.DS_Store
__pycache__/
# We do want the poetry lockfile.
# We do want the poetry and cargo lockfile.
!poetry.lock
!Cargo.lock
# stuff that is likely to exist when you run a server locally
/*.db

View File

@@ -1,4 +1,4 @@
Synapse 1.68.0rc1 (2022-09-20)
Synapse 1.68.0rc2 (2022-09-23)
==============================
Please note that Synapse will now refuse to start if configured to use a version of SQLite earlier than 3.27.
@@ -8,6 +8,23 @@ Those using packages will not be affected. On most platforms, installing with `p
See the [upgrade notes](https://matrix-org.github.io/synapse/v1.68/upgrade.html#upgrading-to-v1670).
Bugfixes
--------
- Fix building from packaged sdist. Broke in v1.68.0rc1. ([\#13866](https://github.com/matrix-org/synapse/issues/13866))
Internal Changes
----------------
- Fix the release script not publishing binary wheels. ([\#13850](https://github.com/matrix-org/synapse/issues/13850))
- Lower minimum supported rustc version to 1.58.1. ([\#13857](https://github.com/matrix-org/synapse/issues/13857))
- Lock Rust dependencies versions. ([\#13858](https://github.com/matrix-org/synapse/issues/13858))
Synapse 1.68.0rc1 (2022-09-20)
==============================
Features
--------

466
Cargo.lock generated Normal file
View File

@@ -0,0 +1,466 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e"
dependencies = [
"memchr",
]
[[package]]
name = "anyhow"
version = "1.0.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602"
[[package]]
name = "arc-swap"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake2"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388"
dependencies = [
"digest",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e"
dependencies = [
"generic-array",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "digest"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [
"block-buffer",
"crypto-common",
"subtle",
]
[[package]]
name = "generic-array"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "indoc"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adab1eaa3408fb7f0c777a73e7465fd5656136fc93b670eb6df3c88c2c1344e3"
[[package]]
name = "itoa"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.132"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
[[package]]
name = "lock_api"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
]
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]]
name = "proc-macro2"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
dependencies = [
"unicode-ident",
]
[[package]]
name = "pyo3"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12f72538a0230791398a0986a6518ebd88abc3fded89007b506ed072acc831e1"
dependencies = [
"anyhow",
"cfg-if",
"indoc",
"libc",
"memoffset",
"parking_lot",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
"unindent",
]
[[package]]
name = "pyo3-build-config"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4cf18c20f4f09995f3554e6bcf9b09bd5e4d6b67c562fdfaafa644526ba479"
dependencies = [
"once_cell",
"target-lexicon",
]
[[package]]
name = "pyo3-ffi"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a41877f28d8ebd600b6aa21a17b40c3b0fc4dfe73a27b6e81ab3d895e401b0e9"
dependencies = [
"libc",
"pyo3-build-config",
]
[[package]]
name = "pyo3-log"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5695ccff5060c13ca1751cf8c857a12da9b0bf0378cb071c5e0326f7c7e4c1b"
dependencies = [
"arc-swap",
"log",
"pyo3",
]
[[package]]
name = "pyo3-macros"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e81c8d4bcc2f216dc1b665412df35e46d12ee8d3d046b381aad05f1fcf30547"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85752a767ee19399a78272cc2ab625cd7d373b2e112b4b13db28de71fa892784"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pythonize"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7f0c136f5fbc01868185eef462800e49659eb23acca83b9e884367a006acb6"
dependencies = [
"pyo3",
"serde",
]
[[package]]
name = "quote"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
[[package]]
name = "ryu"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "smallvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "synapse"
version = "0.1.0"
dependencies = [
"anyhow",
"blake2",
"hex",
"lazy_static",
"log",
"pyo3",
"pyo3-log",
"pythonize",
"regex",
"serde",
"serde_json",
]
[[package]]
name = "target-lexicon"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1"
[[package]]
name = "typenum"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicode-ident"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
[[package]]
name = "unindent"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ee9362deb4a96cef4d437d1ad49cffc9b9e92d202b6995674e928ce684f112"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "windows-sys"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"

View File

@@ -0,0 +1 @@
Exponentially backoff from backfilling the same event over and over.

View File

@@ -0,0 +1 @@
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).

1
changelog.d/13796.misc Normal file
View File

@@ -0,0 +1 @@
Use shared methods for cache invalidation when persisting events, remove duplicate codepaths. Contributed by Nick @ Beeper (@fizzadar).

1
changelog.d/13818.doc Normal file
View File

@@ -0,0 +1 @@
Update URL for the NixOS module for Synapse.

1
changelog.d/13823.misc Normal file
View File

@@ -0,0 +1 @@
Faster Remote Room Joins: tell remote homeservers that we are unable to authorise them if they query a room which has partial state on our server.

1
changelog.d/13830.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where typing events would be accepted from remote servers not present in a room. Also fix a bug where incoming typing events would cause other incoming events to get stuck during a fast join.

1
changelog.d/13855.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix access token leak to logs from proxy agent.

1
changelog.d/13873.misc Normal file
View File

@@ -0,0 +1 @@
Create a new snapshot of the database schema.

1
changelog.d/13874.misc Normal file
View File

@@ -0,0 +1 @@
Faster room joins: Send device list updates to most servers in rooms with partial state.

1
changelog.d/13876.misc Normal file
View File

@@ -0,0 +1 @@
Add comments to the Prometheus recording rules to make it clear which set of rules you need for Grafana or Prometheus Console.

1
changelog.d/13888.misc Normal file
View File

@@ -0,0 +1 @@
Faster room joins: Avoid waiting for full state when processing `/keys/changes` requests.

1
changelog.d/13889.misc Normal file
View File

@@ -0,0 +1 @@
Port push rules to using Rust.

View File

@@ -0,0 +1 @@
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).

1
changelog.d/13905.misc Normal file
View File

@@ -0,0 +1 @@
Fix mypy errors with canonicaljson 1.6.3.

1
changelog.d/13909.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix packaging to include `Cargo.lock` in `sdist`.

1
changelog.d/13911.doc Normal file
View File

@@ -0,0 +1 @@
Update the man page for the `hash_password` script to correct the default number of bcrypt rounds performed.

View File

@@ -1,7 +1,12 @@
groups:
- name: synapse
rules:
# These 3 rules are used in the included Prometheus console
###
### Prometheus Console Only
### The following rules are only needed if you use the Prometheus Console
### in contrib/prometheus/consoles/synapse.html
###
- record: 'synapse_federation_client_sent'
labels:
type: "EDU"
@@ -15,7 +20,6 @@ groups:
type: "Query"
expr: 'sum(synapse_federation_client_sent_queries) by (job)'
# These 3 rules are used in the included Prometheus console
- record: 'synapse_federation_server_received'
labels:
type: "EDU"
@@ -29,7 +33,6 @@ groups:
type: "Query"
expr: 'sum(synapse_federation_server_received_queries) by (job)'
# These 2 rules are used in the included Prometheus console
- record: 'synapse_federation_transaction_queue_pending'
labels:
type: "EDU"
@@ -38,8 +41,16 @@ groups:
labels:
type: "PDU"
expr: 'synapse_federation_transaction_queue_pending_pdus + 0'
###
### End of 'Prometheus Console Only' rules block
###
# These 3 rules are used in the included Grafana dashboard
###
### Grafana Only
### The following rules are only needed if you use the Grafana dashboard
### in contrib/grafana/synapse.json
###
- record: synapse_storage_events_persisted_by_source_type
expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_type="remote"})
labels:
@@ -53,11 +64,11 @@ groups:
labels:
type: bridges
# This rule is used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_event_type
expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep_total)
# This rule is used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_origin
expr: sum without(type) (synapse_storage_events_persisted_events_sep_total)
###
### End of 'Grafana Only' rules block
###

13
debian/changelog vendored
View File

@@ -1,3 +1,16 @@
matrix-synapse-py3 (1.69.0~rc1+nmu1) UNRELEASED; urgency=medium
* The man page for the hash_password script has been updated to reflect
the correct default value of 'bcrypt_rounds'.
-- Synapse Packaging team <packages@matrix.org> Mon, 26 Sep 2022 18:05:09 +0100
matrix-synapse-py3 (1.68.0~rc2) stable; urgency=medium
* New Synapse release 1.68.0rc2.
-- Synapse Packaging team <packages@matrix.org> Fri, 23 Sep 2022 09:40:10 +0100
matrix-synapse-py3 (1.68.0~rc1) stable; urgency=medium
* New Synapse release 1.68.0rc1.

View File

@@ -14,7 +14,7 @@ or the `STDIN` if not supplied.
It accepts an YAML file which can be used to specify parameters like the
number of rounds for bcrypt and password_config section having the pepper
value used for the hashing. By default `bcrypt_rounds` is set to **10**.
value used for the hashing. By default `bcrypt_rounds` is set to **12**.
The hashed password is written on the `STDOUT`.

View File

@@ -181,7 +181,7 @@ doas pkg_add synapse
#### NixOS
Robin Lambertz has packaged Synapse for NixOS at:
<https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/matrix-synapse.nix>
<https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/matrix/synapse.nix>
### Installing as a Python module from PyPI

9
poetry.lock generated
View File

@@ -95,14 +95,15 @@ webencodings = "*"
[[package]]
name = "canonicaljson"
version = "1.6.0"
version = "1.6.3"
description = "Canonical JSON"
category = "main"
optional = false
python-versions = "~=3.7"
python-versions = ">=3.7"
[package.dependencies]
simplejson = ">=3.14.0"
typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.8\""}
[package.extras]
frozendict = ["frozendict (>=1.0)"]
@@ -1737,8 +1738,8 @@ bleach = [
{file = "bleach-4.1.0.tar.gz", hash = "sha256:0900d8b37eba61a802ee40ac0061f8c2b5dee29c1927dd1d233e075ebf5a71da"},
]
canonicaljson = [
{file = "canonicaljson-1.6.0-py3-none-any.whl", hash = "sha256:7230c2a2a3db07874f622af84effe41a655e07bf23734830e18a454e65d5b998"},
{file = "canonicaljson-1.6.0.tar.gz", hash = "sha256:8739d5fd91aca7281d425660ae65af7663808c8177778965f67e90b16a2b2427"},
{file = "canonicaljson-1.6.3-py3-none-any.whl", hash = "sha256:6ba3cf1702fa3d209b3e915a4e9a3e4ef194f1e8fca189c1f0b7a2a7686a27e6"},
{file = "canonicaljson-1.6.3.tar.gz", hash = "sha256:ca59760bc274a899a0da75809d6909ae43e5123381fd6ef040a44d1952c0b448"},
]
certifi = [
{file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"},

View File

@@ -57,7 +57,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry]
name = "matrix-synapse"
version = "1.68.0rc1"
version = "1.68.0rc2"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
@@ -86,8 +86,9 @@ include = [
{ path = "tests", format = "sdist" },
{ path = "UPGRADE.rst", format = "sdist" },
{ path = "Cargo.toml", format = "sdist" },
{ path = "Cargo.lock", format = "sdist" },
{ path = "rust/Cargo.toml", format = "sdist" },
{ path = "rust/Cargo.lock", format = "sdist" },
{ path = "rust/build.rs", format = "sdist" },
{ path = "rust/src/**", format = "sdist" },
]
exclude = [

View File

@@ -7,7 +7,7 @@ name = "synapse"
version = "0.1.0"
edition = "2021"
rust-version = "1.61.0"
rust-version = "1.58.1"
[lib]
name = "synapse"

View File

@@ -26,6 +26,9 @@ usage() {
echo " Defaults to 9999."
echo "-h"
echo " Display this help text."
echo ""
echo " NB: make sure to run this against the *oldest* supported version of postgres,"
echo " or else pg_dump might output non-backwards-compatible syntax."
}
SCHEMA_NUMBER="9999"
@@ -240,25 +243,54 @@ DROP TABLE user_directory_search_stat;
echo "Dumping SQLite3 schema..."
mkdir -p "$OUTPUT_DIR/"{common,main,state}"/full_schema/$SCHEMA_NUMBER"
sqlite3 "$SQLITE_COMMON_DB" ".schema --indent" > "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_COMMON_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_MAIN_DB" ".schema --indent" > "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_MAIN_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_STATE_DB" ".schema --indent" > "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_STATE_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
mkdir -p "$OUTPUT_DIR/"{common,main,state}"/full_schemas/$SCHEMA_NUMBER"
sqlite3 "$SQLITE_COMMON_DB" ".schema" > "$OUTPUT_DIR/common/full_schemas/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_COMMON_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/common/full_schemas/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_MAIN_DB" ".schema" > "$OUTPUT_DIR/main/full_schemas/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_MAIN_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/main/full_schemas/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_STATE_DB" ".schema" > "$OUTPUT_DIR/state/full_schemas/$SCHEMA_NUMBER/full.sql.sqlite"
sqlite3 "$SQLITE_STATE_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/state/full_schemas/$SCHEMA_NUMBER/full.sql.sqlite"
cleanup_pg_schema() {
sed -e '/^$/d' -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d'
# Cleanup as follows:
# - Remove empty lines. pg_dump likes to output a lot of these.
# - Remove comment-only lines. pg_dump also likes to output a lot of these to visually
# separate tables etc.
# - Remove "public." prefix --- the schema name.
# - Remove "SET" commands. Last time I ran this, the output commands were
# SET statement_timeout = 0;
# SET lock_timeout = 0;
# SET idle_in_transaction_session_timeout = 0;
# SET client_encoding = 'UTF8';
# SET standard_conforming_strings = on;
# SET check_function_bodies = false;
# SET xmloption = content;
# SET client_min_messages = warning;
# SET row_security = off;
# SET default_table_access_method = heap;
# - Very carefully remove specific SELECT statements. We CANNOT blanket remove all
# SELECT statements because some of those have side-effects which we do want in the
# schema. Last time I ran this, the only SELECTS were
# SELECT pg_catalog.set_config('search_path', '', false);
# and
# SELECT pg_catalog.setval(text, bigint, bool);
# We do want to remove the former, but the latter is important. If the last argument
# is `true` or omitted, this marks the given integer as having been consumed and
# will NOT appear as the nextval.
sed -e '/^$/d' \
-e '/^--/d' \
-e 's/public\.//g' \
-e '/^SET /d' \
-e '/^SELECT pg_catalog.set_config/d'
}
echo "Dumping Postgres schema..."
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner "$POSTGRES_COMMON_DB_NAME" | cleanup_pg_schema > "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_COMMON_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner "$POSTGRES_MAIN_DB_NAME" | cleanup_pg_schema > "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_MAIN_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner "$POSTGRES_STATE_DB_NAME" | cleanup_pg_schema > "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_STATE_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner "$POSTGRES_COMMON_DB_NAME" | cleanup_pg_schema > "$OUTPUT_DIR/common/full_schemas/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_COMMON_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/common/full_schemas/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner "$POSTGRES_MAIN_DB_NAME" | cleanup_pg_schema > "$OUTPUT_DIR/main/full_schemas/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_MAIN_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/main/full_schemas/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner "$POSTGRES_STATE_DB_NAME" | cleanup_pg_schema > "$OUTPUT_DIR/state/full_schemas/$SCHEMA_NUMBER/full.sql.postgres"
pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_STATE_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/state/full_schemas/$SCHEMA_NUMBER/full.sql.postgres"
echo "Done! Files dumped to: $OUTPUT_DIR"

View File

@@ -100,6 +100,12 @@ class Codes(str, Enum):
UNREDACTED_CONTENT_DELETED = "FI.MAU.MSC2815_UNREDACTED_CONTENT_DELETED"
# Returned for federation requests where we can't process a request as we
# can't ensure the sending server is in a room which is partial-stated on
# our side.
# Part of MSC3895.
UNABLE_DUE_TO_PARTIAL_STATE = "ORG.MATRIX.MSC3895_UNABLE_DUE_TO_PARTIAL_STATE"
class CodeMessageException(RuntimeError):
"""An exception with integer code and message string attributes.

View File

@@ -63,7 +63,8 @@ class ExperimentalConfig(Config):
# MSC3706 (server-side support for partial state in /send_join responses)
self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False)
# experimental support for faster joins over federation (msc2775, msc3706)
# experimental support for faster joins over federation
# (MSC2775, MSC3706, MSC3895)
# requires a target server with msc3706_enabled enabled.
self.faster_joins_enabled: bool = experimental.get("faster_joins", False)
@@ -82,6 +83,8 @@ class ExperimentalConfig(Config):
# MSC3786 (Add a default push rule to ignore m.room.server_acl events)
self.msc3786_enabled: bool = experimental.get("msc3786_enabled", False)
# MSC3771: Thread read receipts
self.msc3771_enabled: bool = experimental.get("msc3771_enabled", False)
# MSC3772: A push rule for mutual relations.
self.msc3772_enabled: bool = experimental.get("msc3772_enabled", False)

View File

@@ -525,13 +525,10 @@ class FederationServer(FederationBase):
async def on_room_state_request(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
@@ -555,13 +552,10 @@ class FederationServer(FederationBase):
if not event_id:
raise NotImplementedError("Specify an event")
await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
resp = await self._state_ids_resp_cache.wrap(
(room_id, event_id),
self._on_state_ids_request_compute,
@@ -950,6 +944,7 @@ class FederationServer(FederationBase):
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, Dict[str, Any]]:
async with self._server_linearizer.queue((origin, room_id)):
await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

View File

@@ -195,7 +195,9 @@ class DeviceWorkerHandler:
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
current_state_ids = await self._state_storage.get_current_state_ids(room_id)
current_state_ids = await self._state_storage.get_current_state_ids(
room_id, await_full_state=False
)
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
@@ -234,7 +236,8 @@ class DeviceWorkerHandler:
# mapping from event_id -> state_dict
prev_state_ids = await self._state_storage.get_state_ids_for_events(
event_ids
event_ids,
await_full_state=False,
)
# Check if we've joined the room? If so we just blindly add all the users to
@@ -688,11 +691,15 @@ class DeviceHandler(DeviceWorkerHandler):
# Ignore any users that aren't ours
if self.hs.is_mine_id(user_id):
hosts = set(
await self._storage_controllers.state.get_current_hosts_in_room(
await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
)
hosts.discard(self.server_name)
# For rooms with partial state, `hosts` is merely an
# approximation. When we transition to a full state room, we
# will have to send out device list updates to any servers we
# missed.
# Check if we've already sent this update to some hosts
if current_stream_id == stream_id:

View File

@@ -31,7 +31,6 @@ from synapse.events import EventBase
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.types import StateMap, get_domain_from_id
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -156,9 +155,33 @@ class EventAuthHandler:
Codes.UNABLE_TO_GRANT_JOIN,
)
async def check_host_in_room(self, room_id: str, host: str) -> bool:
with Measure(self._clock, "check_host_in_room"):
return await self._store.is_host_joined(room_id, host)
async def is_host_in_room(self, room_id: str, host: str) -> bool:
return await self._store.is_host_joined(room_id, host)
async def assert_host_in_room(
self, room_id: str, host: str, allow_partial_state_rooms: bool = False
) -> None:
"""
Asserts that the host is in the room, or raises an AuthError.
If the room is partial-stated, we raise an AuthError with the
UNABLE_DUE_TO_PARTIAL_STATE error code, unless `allow_partial_state_rooms` is true.
If allow_partial_state_rooms is True and the room is partial-stated,
this function may return an incorrect result as we are not able to fully
track server membership in a room without full state.
"""
if not allow_partial_state_rooms and await self._store.is_partial_state_room(
room_id
):
raise AuthError(
403,
"Unable to authorise you right now; room is partial-stated here.",
errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE,
)
if not await self.is_host_in_room(room_id, host):
raise AuthError(403, "Host not in room.")
async def check_restricted_join_rules(
self,

View File

@@ -803,7 +803,7 @@ class FederationHandler:
)
# now check that we are *still* in the room
is_in_room = await self._event_auth_handler.check_host_in_room(
is_in_room = await self._event_auth_handler.is_host_in_room(
room_id, self.server_name
)
if not is_in_room:
@@ -1149,9 +1149,7 @@ class FederationHandler:
async def on_backfill_request(
self, origin: str, room_id: str, pdu_list: List[str], limit: int
) -> List[EventBase]:
in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
await self._event_auth_handler.assert_host_in_room(room_id, origin)
# Synapse asks for 100 events per backfill request. Do not allow more.
limit = min(limit, 100)
@@ -1197,21 +1195,17 @@ class FederationHandler:
event_id, allow_none=True, allow_rejected=True
)
if event:
in_room = await self._event_auth_handler.check_host_in_room(
event.room_id, origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
events = await filter_events_for_server(
self._storage_controllers, origin, [event]
)
event = events[0]
return event
else:
if not event:
return None
await self._event_auth_handler.assert_host_in_room(event.room_id, origin)
events = await filter_events_for_server(
self._storage_controllers, origin, [event]
)
event = events[0]
return event
async def on_get_missing_events(
self,
origin: str,
@@ -1220,9 +1214,7 @@ class FederationHandler:
latest_events: List[str],
limit: int,
) -> List[EventBase]:
in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
await self._event_auth_handler.assert_host_in_room(room_id, origin)
# Only allow up to 20 events to be retrieved per request.
limit = min(limit, 20)
@@ -1256,7 +1248,7 @@ class FederationHandler:
"state_key": target_user_id,
}
if await self._event_auth_handler.check_host_in_room(room_id, self.hs.hostname):
if await self._event_auth_handler.is_host_in_room(room_id, self.hs.hostname):
room_version_obj = await self.store.get_room_version(room_id)
builder = self.event_builder_factory.for_room_version(
room_version_obj, event_dict

View File

@@ -240,7 +240,7 @@ class FederationEventHandler:
#
# Note that if we were never in the room then we would have already
# dropped the event, since we wouldn't know the room version.
is_in_room = await self._event_auth_handler.check_host_in_room(
is_in_room = await self._event_auth_handler.is_host_in_room(
room_id, self._server_name
)
if not is_in_room:

View File

@@ -63,6 +63,8 @@ class ReceiptsHandler:
self.clock = self.hs.get_clock()
self.state = hs.get_state_handler()
self._msc3771_enabled = hs.config.experimental.msc3771_enabled
async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None:
"""Called when we receive an EDU of type m.receipt from a remote HS."""
receipts = []
@@ -70,7 +72,7 @@ class ReceiptsHandler:
# If we're not in the room just ditch the event entirely. This is
# probably an old server that has come back and thinks we're still in
# the room (or we've been rejoined to the room by a state reset).
is_in_room = await self.event_auth_handler.check_host_in_room(
is_in_room = await self.event_auth_handler.is_host_in_room(
room_id, self.server_name
)
if not is_in_room:
@@ -91,13 +93,23 @@ class ReceiptsHandler:
)
continue
# Check if these receipts apply to a thread.
thread_id = None
data = user_values.get("data", {})
if self._msc3771_enabled and isinstance(data, dict):
thread_id = data.get("thread_id")
# If the thread ID is invalid, consider it missing.
if not isinstance(thread_id, str):
thread_id = None
receipts.append(
ReadReceipt(
room_id=room_id,
receipt_type=receipt_type,
user_id=user_id,
event_ids=user_values["event_ids"],
data=user_values.get("data", {}),
thread_id=thread_id,
data=data,
)
)
@@ -114,6 +126,7 @@ class ReceiptsHandler:
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.thread_id,
receipt.data,
)
@@ -146,7 +159,12 @@ class ReceiptsHandler:
return True
async def received_client_receipt(
self, room_id: str, receipt_type: str, user_id: str, event_id: str
self,
room_id: str,
receipt_type: str,
user_id: str,
event_id: str,
thread_id: Optional[str],
) -> None:
"""Called when a client tells us a local user has read up to the given
event_id in the room.
@@ -156,6 +174,7 @@ class ReceiptsHandler:
receipt_type=receipt_type,
user_id=user_id,
event_ids=[event_id],
thread_id=thread_id,
data={"ts": int(self.clock.time_msec())},
)

View File

@@ -609,7 +609,7 @@ class RoomSummaryHandler:
# If this is a request over federation, check if the host is in the room or
# has a user who could join the room.
elif origin:
if await self._event_auth_handler.check_host_in_room(
if await self._event_auth_handler.is_host_in_room(
room_id, origin
) or await self._store.is_host_invited(room_id, origin):
return True
@@ -624,9 +624,7 @@ class RoomSummaryHandler:
await self._event_auth_handler.get_rooms_that_allow_join(state_ids)
)
for space_id in allowed_rooms:
if await self._event_auth_handler.check_host_in_room(
space_id, origin
):
if await self._event_auth_handler.is_host_in_room(space_id, origin):
return True
logger.info(

View File

@@ -340,7 +340,7 @@ class TypingWriterHandler(FollowerTypingHandler):
# If we're not in the room just ditch the event entirely. This is
# probably an old server that has come back and thinks we're still in
# the room (or we've been rejoined to the room by a state reset).
is_in_room = await self.event_auth_handler.check_host_in_room(
is_in_room = await self.event_auth_handler.is_host_in_room(
room_id, self.server_name
)
if not is_in_room:
@@ -362,11 +362,14 @@ class TypingWriterHandler(FollowerTypingHandler):
)
return
domains = await self._storage_controllers.state.get_current_hosts_in_room(
# Let's check that the origin server is in the room before accepting the typing
# event. We don't want to block waiting on a partial state so take an
# approximation if needed.
domains = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
if self.server_name in domains:
if user.domain in domains:
logger.info("Got typing update from %s: %r", user_id, content)
now = self.clock.time_msec()
self._member_typing_until[member] = now + FEDERATION_TIMEOUT

View File

@@ -36,6 +36,7 @@ from twisted.web.error import SchemeNotSupported
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS
from synapse.http import redact_uri
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
from synapse.types import ISynapseReactor
@@ -220,7 +221,11 @@ class ProxyAgent(_AgentBase):
self._reactor, parsed_uri.host, parsed_uri.port, **self._endpoint_kwargs
)
logger.debug("Requesting %s via %s", uri, endpoint)
logger.debug(
"Requesting %s via %s",
redact_uri(uri.decode("ascii", errors="replace")),
endpoint,
)
if parsed_uri.scheme == b"https":
tls_connection_creator = self._policy_for_https.creatorForNetloc(

View File

@@ -705,7 +705,7 @@ class _ByteProducer:
self._request = None
def _encode_json_bytes(json_object: Any) -> bytes:
def _encode_json_bytes(json_object: object) -> bytes:
"""
Encode an object into JSON. Returns an iterator of bytes.
"""
@@ -746,7 +746,7 @@ def respond_with_json(
return None
if canonical_json:
encoder = encode_canonical_json
encoder: Callable[[object], bytes] = encode_canonical_json
else:
encoder = _encode_json_bytes

View File

@@ -427,7 +427,8 @@ class FederationSenderHandler:
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
receipt.data,
thread_id=receipt.thread_id,
data=receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)

View File

@@ -361,6 +361,7 @@ class ReceiptsStream(Stream):
receipt_type: str
user_id: str
event_id: str
thread_id: Optional[str]
data: dict
NAME = "receipts"

View File

@@ -83,6 +83,8 @@ class ReadMarkerRestServlet(RestServlet):
receipt_type,
user_id=requester.user.to_string(),
event_id=event_id,
# Setting the thread ID is not possible with the /read_markers endpoint.
thread_id=None,
)
return 200, {}

View File

@@ -49,6 +49,7 @@ class ReceiptRestServlet(RestServlet):
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.FULLY_READ,
}
self._msc3771_enabled = hs.config.experimental.msc3771_enabled
async def on_POST(
self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str
@@ -61,7 +62,17 @@ class ReceiptRestServlet(RestServlet):
f"Receipt type must be {', '.join(self._known_receipt_types)}",
)
parse_json_object_from_request(request, allow_empty_body=False)
body = parse_json_object_from_request(request)
# Pull the thread ID, if one exists.
thread_id = None
if self._msc3771_enabled:
if "thread_id" in body:
thread_id = body.get("thread_id")
if not thread_id or not isinstance(thread_id, str):
raise SynapseError(
400, "thread_id field must be a non-empty string"
)
await self.presence_handler.bump_presence_active_time(requester.user)
@@ -77,6 +88,7 @@ class ReceiptRestServlet(RestServlet):
receipt_type,
user_id=requester.user.to_string(),
event_id=event_id,
thread_id=thread_id,
)
return 200, {}

View File

@@ -103,6 +103,8 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc3030": self.config.experimental.msc3030_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
# Support for thread read receipts.
"org.matrix.msc3771": self.config.experimental.msc3771_enabled,
# Allows moderators to fetch redacted event content as described in MSC2815
"fi.mau.msc2815": self.config.experimental.msc2815_enabled,
# Adds support for login token requests as per MSC3882

View File

@@ -42,7 +42,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersio
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.tracing import SynapseTags, log_kv, trace, tag_args, set_attribute
from synapse.logging.tracing import SynapseTags, log_kv, set_attribute, tag_args, trace
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour

View File

@@ -91,6 +91,9 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (user_id,)
)
# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

View File

@@ -23,6 +23,7 @@ from typing import (
List,
Mapping,
Optional,
Sequence,
Tuple,
)
@@ -406,6 +407,7 @@ class StateStorageController:
self,
room_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
on_invalidate: Optional[Callable[[], None]] = None,
) -> StateMap[str]:
"""Get the current state event ids for a room based on the
@@ -418,13 +420,17 @@ class StateStorageController:
room_id: The room to get the state IDs of. state_filter: The state
filter used to fetch state from the
database.
await_full_state: if true, will block if we do not yet have complete
state for the room.
on_invalidate: Callback for when the `get_current_state_ids` cache
for the room gets invalidated.
Returns:
The current state of the room.
"""
if not state_filter or state_filter.must_await_full_state(self._is_mine_id):
if await_full_state and (
not state_filter or state_filter.must_await_full_state(self._is_mine_id)
):
await self._partial_state_room_tracker.await_full_state(room_id)
if state_filter and not state_filter.is_full():
@@ -524,12 +530,53 @@ class StateStorageController:
return state_map.get(key)
async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
"""Get current hosts in room based on current state."""
"""Get current hosts in room based on current state.
Blocks until we have full state for the given room. This only happens for rooms
with partial state.
Returns:
A list of hosts in the room, sorted by longest in the room first. (aka.
sorted by join with the lowest depth first).
"""
await self._partial_state_room_tracker.await_full_state(room_id)
return await self.stores.main.get_current_hosts_in_room(room_id)
async def get_current_hosts_in_room_or_partial_state_approximation(
self, room_id: str
) -> Sequence[str]:
"""Get approximation of current hosts in room based on current state.
For rooms with full state, this is equivalent to `get_current_hosts_in_room`,
with the same order of results.
For rooms with partial state, no blocking occurs. Instead, the list of hosts
in the room at the time of joining is combined with the list of hosts which
joined the room afterwards. The returned list may include hosts that are not
actually in the room and exclude hosts that are in the room, since we may
calculate state incorrectly during the partial state phase. The order of results
is arbitrary for rooms with partial state.
"""
# We have to read this list first to mitigate races with un-partial stating.
# This will be empty for rooms with full state.
hosts_at_join = await self.stores.main.get_partial_state_servers_at_join(
room_id
)
hosts_from_state = await self.stores.main.get_current_hosts_in_room(room_id)
hosts_from_state_set = set(hosts_from_state)
# First take the list of hosts based on the current state.
# For rooms with partial state, this will be missing most hosts.
hosts = list(hosts_from_state)
# Then add in the list of hosts in the room at the time we joined.
# This will be an empty list for rooms with full state.
hosts.extend(host for host in hosts_at_join if host not in hosts_from_state_set)
return hosts
async def get_users_in_room_with_profiles(
self, room_id: str
) -> Dict[str, ProfileInfo]:

View File

@@ -95,6 +95,8 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
"local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx",
"remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx",
"event_push_summary": "event_push_summary_unique_index",
"receipts_linearized": "receipts_linearized_unique_index",
"receipts_graph": "receipts_graph_unique_index",
}
@@ -391,6 +393,14 @@ class LoggingTransaction:
def executemany(self, sql: str, *args: Any) -> None:
self._do_execute(self.txn.executemany, sql, *args)
def executescript(self, sql: str) -> None:
if isinstance(self.database_engine, Sqlite3Engine):
self._do_execute(self.txn.executescript, sql) # type: ignore[attr-defined]
else:
raise NotImplementedError(
f"executescript only exists for sqlite driver, not {type(self.database_engine)}"
)
def _make_sql_one_line(self, sql: str) -> str:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
return " ".join(line.strip() for line in sql.splitlines() if line.strip())

View File

@@ -223,15 +223,17 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate(((room_id, event_id),))
self.have_seen_event.invalidate((room_id, event_id))
self.get_latest_event_ids_in_room.invalidate((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
self._get_membership_from_event_id.invalidate((event_id,))
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
@@ -240,19 +242,26 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._invalidate_local_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
self.get_relations_for_event.invalidate((redacts,))
self.get_applicable_edit.invalidate((redacts,))
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
if relates_to:
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
self.get_thread_summary.invalidate((relates_to,))
self.get_thread_participated.invalidate((relates_to,))
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
self._attempt_to_invalidate_cache(
"get_aggregation_groups_for_event", (relates_to,)
)
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
self._attempt_to_invalidate_cache(
"get_mutual_event_relations_for_rel_type", (relates_to,)
)
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]

View File

@@ -73,6 +73,14 @@ pdus_pruned_from_federation_queue = Counter(
logger = logging.getLogger(__name__)
BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
datetime.timedelta(days=7).total_seconds()
)
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
datetime.timedelta(hours=1).total_seconds()
)
# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
class BackfillQueueNavigationItem:
@@ -778,9 +786,24 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
*/
INNER JOIN event_backward_extremities AS backward_extrem
ON edge.prev_event_id = backward_extrem.event_id
/**
* We use this info to make sure we don't retry to use a backfill point
* if we've already attempted to backfill from it recently.
*/
LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
ON
failed_backfill_attempt_info.room_id = backward_extrem.room_id
AND failed_backfill_attempt_info.event_id = backward_extrem.event_id
WHERE
backward_extrem.room_id = ?
/* We only care about non-state events because TODO: why */
/* We only care about non-state edges because we used to use
* `event_edges` for two different sorts of "edges" (the current
* event DAG, but also a link to the previous state, for state
* events). These legacy state event edges can be distinguished by
* `is_state` and are removed from the codebase and schema but
* because the schema change is in a background update, it's not
* necessarily safe to assume that it will have been completed.
*/
AND edge.is_state is ? /* False */
/**
* We only want backwards extremities that are older than or at
@@ -793,21 +816,43 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
* <oldest-in-time> [0]<--[1]▼<--[2]<--[3]<--[4] <newest-in-time>
*/
AND event.depth <= ? /* current_depth */
/**
* Exponential back-off (up to the upper bound) so we don't retry the
* same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
*
* We use `1 << n` as a power of 2 equivalent for compatibility
* with older SQLites. The left shift equivalent only works with
* powers of 2 because left shift is a binary operation (base-2).
* Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
* because the closest are most relevant to backfill from first.
* Then tie-break on alphabetical order of the event_ids so we get a
* consistent ordering which is nice when asserting things in tests.
* Sort from highest to the lowest depth. Then tie-break on
* alphabetical order of the event_ids so we get a consistent
* ordering which is nice when asserting things in tests.
*/
ORDER BY event.depth DESC, event.stream_ordering DESC, backward_extrem.event_id DESC
"""
if isinstance(self.database_engine, PostgresEngine):
least_function = "least"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "min"
else:
raise RuntimeError("Unknown database engine")
txn.execute(
sql,
sql % (least_function,),
(
room_id,
False,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
),
)
@@ -868,26 +913,66 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
FROM insertion_event_extremities AS insertion_event_extremity
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS event USING (event_id)
/**
* We use this info to make sure we don't retry to use a backfill point
* if we've already attempted to backfill from it recently.
*/
LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
ON
failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
WHERE
insertion_event_extremity.room_id = ?
/**
* We only want extremities that are older than or at
* the same position of the given `current_depth` (where older
* means less than the given depth) because we're looking backwards
* from the `current_depth` when backfilling.
*
* current_depth (ignore events that come after this, ignore 2-4)
* |
* <oldest-in-time> [0]<--[1]▼<--[2]<--[3]<--[4] <newest-in-time>
*/
AND event.depth <= ? /* current_depth */
/**
* Exponential back-off (up to the upper bound) so we don't retry the
* same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
*
* We use `1 << n` as a power of 2 equivalent for compatibility
* with older SQLites. The left shift equivalent only works with
* powers of 2 because left shift is a binary operation (base-2).
* Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
* because the closest are most relevant to backfill from first.
* Then tie-break on alphabetical order of the event_ids so we get a
* consistent ordering which is nice when asserting things in tests.
* Sort from highest to the lowest depth. Then tie-break on
* alphabetical order of the event_ids so we get a consistent
* ordering which is nice when asserting things in tests.
*/
ORDER BY event.depth DESC, event.stream_ordering DESC, insertion_event_extremity.event_id DESC
"""
if isinstance(self.database_engine, PostgresEngine):
least_function = "least"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "min"
else:
raise RuntimeError("Unknown database engine")
txn.execute(
sql,
sql % (least_function,),
(
room_id,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
),
)
return cast(List[Tuple[str, int, int]], txn.fetchall())
return cast(List[Tuple[str, int]], txn.fetchall())
return await self.db_pool.runInteraction(
"get_insertion_event_backward_extremities_in_room",

View File

@@ -559,7 +559,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def _get_receipts_by_room_txn(
self, txn: LoggingTransaction, user_id: str
) -> List[Tuple[str, int]]:
) -> Dict[str, int]:
"""
Generate a map of room ID to the latest stream ordering that has been
read by the given user.
Args:
txn:
user_id: The user to fetch receipts for.
Returns:
A map of room ID to stream ordering for all rooms the user has a receipt in.
"""
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
@@ -580,7 +591,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
args.extend((user_id,))
txn.execute(sql, args)
return cast(List[Tuple[str, int]], txn.fetchall())
return {
room_id: latest_stream_ordering
for room_id, latest_stream_ordering in txn.fetchall()
}
async def get_unread_push_actions_for_user_in_range_for_http(
self,
@@ -605,12 +619,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries.
"""
receipts_by_room = dict(
await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
),
receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
)
def get_push_actions_txn(
@@ -679,12 +691,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will have between 0~limit entries.
"""
receipts_by_room = dict(
await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
),
receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
)
def get_push_actions_txn(

View File

@@ -35,7 +35,7 @@ import attr
from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
@@ -486,6 +486,7 @@ class PersistEventsStore:
# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
def _persist_event_auth_chain_txn(
@@ -1199,13 +1200,6 @@ class PersistEventsStore:
)
# Invalidate the various caches
for member in members_changed:
txn.call_after(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)
self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
)
@@ -1249,9 +1243,6 @@ class PersistEventsStore:
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
txn.call_after(
self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
)
self.db_pool.simple_insert_many_txn(
txn,
@@ -1321,8 +1312,6 @@ class PersistEventsStore:
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
@@ -1724,16 +1713,7 @@ class PersistEventsStore:
txn.async_call_after(prefill)
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Invalidate the caches for the redacted event.
Note that these caches are also cleared as part of event replication in
_invalidate_caches_for_event.
"""
assert event.redacts is not None
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
self.db_pool.simple_upsert_txn(
txn,
table="redactions",
@@ -1834,34 +1814,6 @@ class PersistEventsStore:
for event in events:
assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
event.state_key,
event.internal_metadata.stream_ordering,
)
txn.call_after(
self.store.get_invited_rooms_for_local_user.invalidate,
(event.state_key,),
)
txn.call_after(
self.store.get_local_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_number_joined_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_user_in_room_with_profile.invalidate,
(event.room_id, event.state_key),
)
# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
txn.call_after(
self.store._get_membership_from_event_id.invalidate,
(event.event_id,),
)
# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
@@ -1910,35 +1862,6 @@ class PersistEventsStore:
},
)
txn.call_after(
self.store.get_relations_for_event.invalidate, (relation.parent_id,)
)
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate,
(relation.parent_id,),
)
txn.call_after(
self.store.get_mutual_event_relations_for_rel_type.invalidate,
(relation.parent_id,),
)
if relation.rel_type == RelationTypes.REPLACE:
txn.call_after(
self.store.get_applicable_edit.invalidate, (relation.parent_id,)
)
if relation.rel_type == RelationTypes.THREAD:
txn.call_after(
self.store.get_thread_summary.invalidate, (relation.parent_id,)
)
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
txn.call_after(
self.store.get_thread_participated.invalidate,
(relation.parent_id, event.sender),
)
def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
@@ -2240,28 +2163,6 @@ class PersistEventsStore:
),
)
room_to_event_ids: Dict[str, List[str]] = {}
for e in non_outlier_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
for room_id, event_ids in room_to_event_ids.items():
rows = self.db_pool.simple_select_many_txn(
txn,
table="event_push_actions_staging",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("user_id",),
)
user_ids = {row["user_id"] for row in rows}
for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)
# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
@@ -2276,11 +2177,6 @@ class PersistEventsStore:
def _remove_push_actions_for_event_id_txn(
self, txn: LoggingTransaction, room_id: str, event_id: str
) -> None:
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id),

View File

@@ -540,7 +540,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
async def get_all_updated_receipts(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
) -> Tuple[
List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], int, bool
]:
"""Get updates for receipts replication stream.
Args:
@@ -567,9 +569,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
def get_all_updated_receipts_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, list]], int, bool]:
) -> Tuple[
List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
int,
bool,
]:
sql = """
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
@@ -578,8 +584,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, limit))
updates = cast(
List[Tuple[int, list]],
[(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn],
List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
[(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
)
limited = False
@@ -631,6 +637,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: str,
user_id: str,
event_id: str,
thread_id: Optional[str],
data: JsonDict,
stream_id: int,
) -> Optional[int]:
@@ -657,12 +664,27 @@ class ReceiptsWorkerStore(SQLBaseStore):
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
if thread_id is None:
thread_clause = "r.thread_id IS NULL"
thread_args: Tuple[str, ...] = ()
else:
thread_clause = "r.thread_id = ?"
thread_args = (thread_id,)
sql = f"""
SELECT stream_ordering, event_id FROM events
INNER JOIN receipts_linearized AS r USING (event_id, room_id)
WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
"""
txn.execute(
sql,
(
room_id,
receipt_type,
user_id,
)
+ thread_args,
)
txn.execute(sql, (room_id, receipt_type, user_id))
for so, eid in txn:
if int(so) >= stream_ordering:
@@ -682,21 +704,28 @@ class ReceiptsWorkerStore(SQLBaseStore):
self._receipts_stream_cache.entity_has_changed, room_id, stream_id
)
keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
}
where_clause = ""
if thread_id is None:
where_clause = "thread_id IS NULL"
else:
keyvalues["thread_id"] = thread_id
self.db_pool.simple_upsert_txn(
txn,
table="receipts_linearized",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
},
keyvalues=keyvalues,
values={
"stream_id": stream_id,
"event_id": event_id,
"event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
"thread_id": None,
},
where_clause=where_clause,
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
@@ -748,6 +777,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: str,
user_id: str,
event_ids: List[str],
thread_id: Optional[str],
data: dict,
) -> Optional[Tuple[int, int]]:
"""Insert a receipt, either from local client or remote server.
@@ -780,6 +810,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type,
user_id,
linearized_event_id,
thread_id,
data,
stream_id=stream_id,
# Read committed is actually beneficial here because we check for a receipt with
@@ -794,7 +825,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
now = self._clock.time_msec()
logger.debug(
"RR for event %s in %s (%i ms old)",
"Receipt %s for event %s in %s (%i ms old)",
receipt_type,
linearized_event_id,
room_id,
now - event_ts,
@@ -807,6 +839,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type,
user_id,
event_ids,
thread_id,
data,
)
@@ -821,6 +854,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: str,
user_id: str,
event_ids: List[str],
thread_id: Optional[str],
data: JsonDict,
) -> None:
assert self._can_write_to_receipts
@@ -832,19 +866,26 @@ class ReceiptsWorkerStore(SQLBaseStore):
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
}
where_clause = ""
if thread_id is None:
where_clause = "thread_id IS NULL"
else:
keyvalues["thread_id"] = thread_id
self.db_pool.simple_upsert_txn(
txn,
table="receipts_graph",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
},
keyvalues=keyvalues,
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
where_clause=where_clause,
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,

View File

@@ -30,7 +30,6 @@ import attr
from synapse.api.constants import RelationTypes
from synapse.events import EventBase
from synapse.logging.tracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.databases.main.stream import generate_pagination_where_clause

View File

@@ -25,6 +25,7 @@ from typing import (
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
cast,
@@ -1133,6 +1134,22 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
get_rooms_for_retention_period_in_range_txn,
)
async def get_partial_state_servers_at_join(self, room_id: str) -> Sequence[str]:
"""Gets the list of servers in a partial state room at the time we joined it.
Returns:
The `servers_in_room` list from the `/send_join` response for partial state
rooms. May not be accurate or complete, as it comes from a remote
homeserver.
An empty list for full state rooms.
"""
return await self.db_pool.simple_select_onecol(
"partial_state_rooms_servers",
keyvalues={"room_id": room_id},
retcol="server_name",
desc="get_partial_state_servers_at_join",
)
async def get_partial_state_rooms_and_servers(
self,
) -> Mapping[str, Collection[str]]:

View File

@@ -32,9 +32,10 @@ class IncorrectDatabaseSetup(RuntimeError):
ConnectionType = TypeVar("ConnectionType", bound=Connection)
CursorType = TypeVar("CursorType", bound=Cursor)
class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
self.module = module
@@ -64,7 +65,7 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
...
@abc.abstractmethod
def check_new_database(self, txn: Cursor) -> None:
def check_new_database(self, txn: CursorType) -> None:
"""Gets called when setting up a brand new database. This allows us to
apply stricter checks on new databases versus existing database.
"""
@@ -124,3 +125,21 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default.
"""
...
@staticmethod
@abc.abstractmethod
def executescript(cursor: CursorType, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
This is not provided by DBAPI2, and so needs engine-specific support.
"""
...
@classmethod
def execute_script_file(cls, cursor: CursorType, filepath: str) -> None:
"""Execute a file containing multiple semicolon-delimited SQL statements.
This is not provided by DBAPI2, and so needs engine-specific support.
"""
with open(filepath, "rt") as f:
cls.executescript(cursor, f.read())

View File

@@ -31,7 +31,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
class PostgresEngine(
BaseDatabaseEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor]
):
def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
@@ -212,3 +214,11 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
else:
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level)
@staticmethod
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
Psycopg2 seems happy to do this in DBAPI2's `execute()` function.
"""
cursor.execute(script)

View File

@@ -24,7 +24,7 @@ if TYPE_CHECKING:
from synapse.storage.database import LoggingDatabaseConnection
class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
def __init__(self, database_config: Mapping[str, Any]):
super().__init__(sqlite3, database_config)
@@ -120,6 +120,25 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
# All transactions are SERIALIZABLE by default in sqlite
pass
@staticmethod
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
Python's built-in SQLite driver does not allow you to do this with DBAPI2's
`execute`:
> execute() will only execute a single SQL statement. If you try to execute more
> than one statement with it, it will raise a Warning. Use executescript() if
> you want to execute multiple SQL statements with one call.
Though the docs for `executescript` warn:
> If there is a pending transaction, an implicit COMMIT statement is executed
> first. No other implicit transaction control is performed; any transaction
> control must be added to sql_script.
"""
cursor.executescript(script)
# Following functions taken from: https://github.com/coleifer/peewee

View File

@@ -266,7 +266,7 @@ def _setup_new_database(
".sql." + specific
):
logger.debug("Applying schema %s", entry.absolute_path)
executescript(cur, entry.absolute_path)
database_engine.execute_script_file(cur, entry.absolute_path)
cur.execute(
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)",
@@ -517,7 +517,7 @@ def _upgrade_existing_database(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
database_engine.execute_script_file(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it
if is_worker:
@@ -525,7 +525,7 @@ def _upgrade_existing_database(
UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
)
logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path)
database_engine.execute_script_file(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
# A .sql file for a different engine; skip it.
continue
@@ -666,7 +666,7 @@ def _get_or_create_schema_state(
) -> Optional[_SchemaState]:
# Bluntly try creating the schema_version tables.
sql_path = os.path.join(schema_path, "common", "schema_version.sql")
executescript(txn, sql_path)
database_engine.execute_script_file(txn, sql_path)
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()

View File

@@ -0,0 +1,8 @@
CREATE TABLE background_updates (
update_name text NOT NULL,
progress_json text NOT NULL,
depends_on text,
ordering integer DEFAULT 0 NOT NULL
);
ALTER TABLE ONLY background_updates
ADD CONSTRAINT background_updates_uniqueness UNIQUE (update_name);

View File

@@ -0,0 +1,6 @@
CREATE TABLE background_updates (
update_name text NOT NULL,
progress_json text NOT NULL,
depends_on text, ordering INT NOT NULL DEFAULT 0,
CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,646 @@
CREATE TABLE application_services_txns( as_id TEXT NOT NULL, txn_id INTEGER NOT NULL, event_ids TEXT NOT NULL, UNIQUE(as_id, txn_id) );
CREATE INDEX application_services_txns_id ON application_services_txns ( as_id );
CREATE TABLE presence( user_id TEXT NOT NULL, state VARCHAR(20), status_msg TEXT, mtime BIGINT, UNIQUE (user_id) );
CREATE TABLE users( name TEXT, password_hash TEXT, creation_ts BIGINT, admin SMALLINT DEFAULT 0 NOT NULL, upgrade_ts BIGINT, is_guest SMALLINT DEFAULT 0 NOT NULL, appservice_id TEXT, consent_version TEXT, consent_server_notice_sent TEXT, user_type TEXT DEFAULT NULL, deactivated SMALLINT DEFAULT 0 NOT NULL, shadow_banned BOOLEAN, consent_ts bigint, UNIQUE(name) );
CREATE TABLE user_ips ( user_id TEXT NOT NULL, access_token TEXT NOT NULL, device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, last_seen BIGINT NOT NULL );
CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, UNIQUE(user_id) );
CREATE TABLE received_transactions( transaction_id TEXT, origin TEXT, ts BIGINT, response_code INTEGER, response_json bytea, has_been_referenced smallint default 0, UNIQUE (transaction_id, origin) );
CREATE TABLE destinations( destination TEXT PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER , failure_ts BIGINT, last_successful_stream_ordering BIGINT);
CREATE TABLE events( stream_ordering INTEGER PRIMARY KEY, topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, content TEXT, unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, origin_server_ts BIGINT, received_ts BIGINT, sender TEXT, contains_url BOOLEAN, instance_name TEXT, state_key TEXT DEFAULT NULL, rejection_reason TEXT DEFAULT NULL, UNIQUE (event_id) );
CREATE INDEX events_order_room ON events ( room_id, topological_ordering, stream_ordering );
CREATE TABLE event_json( event_id TEXT NOT NULL, room_id TEXT NOT NULL, internal_metadata TEXT NOT NULL, json TEXT NOT NULL, format_version INTEGER, UNIQUE (event_id) );
CREATE TABLE state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, prev_state TEXT, UNIQUE (event_id) );
CREATE TABLE current_state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, membership TEXT, UNIQUE (event_id), UNIQUE (room_id, type, state_key) );
CREATE TABLE room_memberships( event_id TEXT NOT NULL, user_id TEXT NOT NULL, sender TEXT NOT NULL, room_id TEXT NOT NULL, membership TEXT NOT NULL, forgotten INTEGER DEFAULT 0, display_name TEXT, avatar_url TEXT, UNIQUE (event_id) );
CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
CREATE TABLE rooms( room_id TEXT PRIMARY KEY NOT NULL, is_public BOOL, creator TEXT , room_version TEXT, has_auth_chain_index BOOLEAN);
CREATE TABLE server_signature_keys( server_name TEXT, key_id TEXT, from_server TEXT, ts_added_ms BIGINT, verify_key bytea, ts_valid_until_ms BIGINT, UNIQUE (server_name, key_id) );
CREATE TABLE rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, UNIQUE (event_id) );
CREATE TABLE push_rules ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, rule_id TEXT NOT NULL, priority_class SMALLINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, conditions TEXT NOT NULL, actions TEXT NOT NULL, UNIQUE(user_name, rule_id) );
CREATE INDEX push_rules_user_name on push_rules (user_name);
CREATE TABLE push_rules_enable ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, rule_id TEXT NOT NULL, enabled SMALLINT, UNIQUE(user_name, rule_id) );
CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name);
CREATE TABLE event_forward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, UNIQUE (event_id, room_id) );
CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
CREATE TABLE event_backward_extremities( event_id TEXT NOT NULL, room_id TEXT NOT NULL, UNIQUE (event_id, room_id) );
CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
CREATE TABLE room_depth( room_id TEXT NOT NULL, min_depth INTEGER NOT NULL, UNIQUE (room_id) );
CREATE TABLE event_to_state_groups( event_id TEXT NOT NULL, state_group BIGINT NOT NULL, UNIQUE (event_id) );
CREATE TABLE local_media_repository ( media_id TEXT, media_type TEXT, media_length INTEGER, created_ts BIGINT, upload_name TEXT, user_id TEXT, quarantined_by TEXT, url_cache TEXT, last_access_ts BIGINT, safe_from_quarantine BOOLEAN NOT NULL DEFAULT 0, UNIQUE (media_id) );
CREATE TABLE remote_media_cache ( media_origin TEXT, media_id TEXT, media_type TEXT, created_ts BIGINT, upload_name TEXT, media_length INTEGER, filesystem_id TEXT, last_access_ts BIGINT, quarantined_by TEXT, UNIQUE (media_origin, media_id) );
CREATE TABLE redactions ( event_id TEXT NOT NULL, redacts TEXT NOT NULL, have_censored BOOL NOT NULL DEFAULT false, received_ts BIGINT, UNIQUE (event_id) );
CREATE INDEX redactions_redacts ON redactions (redacts);
CREATE TABLE room_aliases( room_alias TEXT NOT NULL, room_id TEXT NOT NULL, creator TEXT, UNIQUE (room_alias) );
CREATE INDEX room_aliases_id ON room_aliases(room_id);
CREATE TABLE room_alias_servers( room_alias TEXT NOT NULL, server TEXT NOT NULL );
CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias);
CREATE TABLE IF NOT EXISTS "server_keys_json" ( server_name TEXT NOT NULL, key_id TEXT NOT NULL, from_server TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, ts_valid_until_ms BIGINT NOT NULL, key_json bytea NOT NULL, CONSTRAINT server_keys_json_uniqueness UNIQUE (server_name, key_id, from_server) );
CREATE TABLE e2e_device_keys_json ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, key_json TEXT NOT NULL, CONSTRAINT e2e_device_keys_json_uniqueness UNIQUE (user_id, device_id) );
CREATE TABLE e2e_one_time_keys_json ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, algorithm TEXT NOT NULL, key_id TEXT NOT NULL, ts_added_ms BIGINT NOT NULL, key_json TEXT NOT NULL, CONSTRAINT e2e_one_time_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm, key_id) );
CREATE TABLE IF NOT EXISTS "user_threepids" ( user_id TEXT NOT NULL, medium TEXT NOT NULL, address TEXT NOT NULL, validated_at BIGINT NOT NULL, added_at BIGINT NOT NULL, CONSTRAINT medium_address UNIQUE (medium, address) );
CREATE INDEX user_threepids_user_id ON user_threepids(user_id);
CREATE VIRTUAL TABLE event_search USING fts4 ( event_id, room_id, sender, key, value )
/* event_search(event_id,room_id,sender,"key",value) */;
CREATE TABLE room_tags( user_id TEXT NOT NULL, room_id TEXT NOT NULL, tag TEXT NOT NULL, content TEXT NOT NULL, CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag) );
CREATE TABLE room_tags_revisions ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, stream_id BIGINT NOT NULL, instance_name TEXT, CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id) );
CREATE TABLE account_data( user_id TEXT NOT NULL, account_data_type TEXT NOT NULL, stream_id BIGINT NOT NULL, content TEXT NOT NULL, instance_name TEXT, CONSTRAINT account_data_uniqueness UNIQUE (user_id, account_data_type) );
CREATE TABLE room_account_data( user_id TEXT NOT NULL, room_id TEXT NOT NULL, account_data_type TEXT NOT NULL, stream_id BIGINT NOT NULL, content TEXT NOT NULL, instance_name TEXT, CONSTRAINT room_account_data_uniqueness UNIQUE (user_id, room_id, account_data_type) );
CREATE INDEX account_data_stream_id on account_data(user_id, stream_id);
CREATE INDEX room_account_data_stream_id on room_account_data(user_id, stream_id);
CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
CREATE TABLE event_push_actions( room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, topological_ordering BIGINT, stream_ordering BIGINT, notif SMALLINT, highlight SMALLINT, unread SMALLINT, thread_id TEXT, CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) );
CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
CREATE INDEX events_room_stream on events(room_id, stream_ordering);
CREATE INDEX public_room_index on rooms(is_public);
CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
CREATE TABLE presence_stream( stream_id BIGINT, user_id TEXT, state TEXT, last_active_ts BIGINT, last_federation_update_ts BIGINT, last_user_sync_ts BIGINT, status_msg TEXT, currently_active BOOLEAN , instance_name TEXT);
CREATE INDEX presence_stream_id ON presence_stream(stream_id, user_id);
CREATE INDEX presence_stream_user_id ON presence_stream(user_id);
CREATE TABLE push_rules_stream( stream_id BIGINT NOT NULL, event_stream_ordering BIGINT NOT NULL, user_id TEXT NOT NULL, rule_id TEXT NOT NULL, op TEXT NOT NULL, priority_class SMALLINT, priority INTEGER, conditions TEXT, actions TEXT );
CREATE INDEX push_rules_stream_id ON push_rules_stream(stream_id);
CREATE INDEX push_rules_stream_user_stream_id on push_rules_stream(user_id, stream_id);
CREATE TABLE ex_outlier_stream( event_stream_ordering BIGINT PRIMARY KEY NOT NULL, event_id TEXT NOT NULL, state_group BIGINT NOT NULL , instance_name TEXT);
CREATE TABLE threepid_guest_access_tokens( medium TEXT, address TEXT, guest_access_token TEXT, first_inviter TEXT );
CREATE UNIQUE INDEX threepid_guest_access_tokens_index ON threepid_guest_access_tokens(medium, address);
CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
CREATE TABLE open_id_tokens ( token TEXT NOT NULL PRIMARY KEY, ts_valid_until_ms bigint NOT NULL, user_id TEXT NOT NULL, UNIQUE (token) );
CREATE INDEX open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
CREATE TABLE pusher_throttle( pusher BIGINT NOT NULL, room_id TEXT NOT NULL, last_sent_ts BIGINT, throttle_ms BIGINT, PRIMARY KEY (pusher, room_id) );
CREATE TABLE event_reports( id BIGINT NOT NULL PRIMARY KEY, received_ts BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, reason TEXT, content TEXT );
CREATE TABLE appservice_stream_position( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_ordering BIGINT, CHECK (Lock='X') );
CREATE TABLE device_inbox ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, stream_id BIGINT NOT NULL, message_json TEXT NOT NULL , instance_name TEXT);
CREATE INDEX device_inbox_user_stream_id ON device_inbox(user_id, device_id, stream_id);
CREATE INDEX received_transactions_ts ON received_transactions(ts);
CREATE TABLE device_federation_outbox ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, queued_ts BIGINT NOT NULL, messages_json TEXT NOT NULL , instance_name TEXT);
CREATE INDEX device_federation_outbox_destination_id ON device_federation_outbox(destination, stream_id);
CREATE TABLE device_federation_inbox ( origin TEXT NOT NULL, message_id TEXT NOT NULL, received_ts BIGINT NOT NULL , instance_name TEXT);
CREATE INDEX device_federation_inbox_sender_id ON device_federation_inbox(origin, message_id);
CREATE TABLE stream_ordering_to_exterm ( stream_ordering BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL );
CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( stream_ordering );
CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( room_id, stream_ordering );
CREATE TABLE IF NOT EXISTS "event_auth"( event_id TEXT NOT NULL, auth_id TEXT NOT NULL, room_id TEXT NOT NULL );
CREATE INDEX evauth_edges_id ON event_auth(event_id);
CREATE INDEX user_threepids_medium_address on user_threepids (medium, address);
CREATE TABLE appservice_room_list( appservice_id TEXT NOT NULL, network_id TEXT NOT NULL, room_id TEXT NOT NULL );
CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list( appservice_id, network_id, room_id );
CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
CREATE TABLE federation_stream_position( type TEXT NOT NULL, stream_id INTEGER NOT NULL , instance_name TEXT NOT NULL DEFAULT 'master');
CREATE TABLE device_lists_remote_cache ( user_id TEXT NOT NULL, device_id TEXT NOT NULL, content TEXT NOT NULL );
CREATE TABLE device_lists_remote_extremeties ( user_id TEXT NOT NULL, stream_id TEXT NOT NULL );
CREATE TABLE device_lists_stream ( stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL );
CREATE INDEX device_lists_stream_id ON device_lists_stream(stream_id, user_id);
CREATE TABLE device_lists_outbound_pokes ( destination TEXT NOT NULL, stream_id BIGINT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, sent BOOLEAN NOT NULL, ts BIGINT NOT NULL , opentracing_context TEXT);
CREATE INDEX device_lists_outbound_pokes_id ON device_lists_outbound_pokes(destination, stream_id);
CREATE INDEX device_lists_outbound_pokes_user ON device_lists_outbound_pokes(destination, user_id);
CREATE TABLE event_push_summary ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, notif_count BIGINT NOT NULL, stream_ordering BIGINT NOT NULL , unread_count BIGINT, last_receipt_stream_ordering BIGINT, thread_id TEXT);
CREATE TABLE event_push_summary_stream_ordering ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_ordering BIGINT NOT NULL, CHECK (Lock='X') );
CREATE TABLE IF NOT EXISTS "pushers" ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, access_token BIGINT DEFAULT NULL, profile_tag TEXT NOT NULL, kind TEXT NOT NULL, app_id TEXT NOT NULL, app_display_name TEXT NOT NULL, device_display_name TEXT NOT NULL, pushkey TEXT NOT NULL, ts BIGINT NOT NULL, lang TEXT, data TEXT, last_stream_ordering INTEGER, last_success BIGINT, failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) );
CREATE INDEX device_lists_outbound_pokes_stream ON device_lists_outbound_pokes(stream_id);
CREATE TABLE ratelimit_override ( user_id TEXT NOT NULL, messages_per_second BIGINT, burst_count BIGINT );
CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id);
CREATE TABLE current_state_delta_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, type TEXT NOT NULL, state_key TEXT NOT NULL, event_id TEXT, prev_event_id TEXT , instance_name TEXT);
CREATE INDEX current_state_delta_stream_idx ON current_state_delta_stream(stream_id);
CREATE TABLE user_directory_stream_pos ( Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, stream_id BIGINT, CHECK (Lock='X') );
CREATE VIRTUAL TABLE user_directory_search USING fts4 ( user_id, value )
/* user_directory_search(user_id,value) */;
CREATE TABLE blocked_rooms ( room_id TEXT NOT NULL, user_id TEXT NOT NULL );
CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id);
CREATE TABLE IF NOT EXISTS "local_media_repository_url_cache"( url TEXT, response_code INTEGER, etag TEXT, expires_ts BIGINT, og TEXT, media_id TEXT, download_ts BIGINT );
CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts);
CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts);
CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id);
CREATE TABLE IF NOT EXISTS "deleted_pushers" ( stream_id BIGINT NOT NULL, app_id TEXT NOT NULL, pushkey TEXT NOT NULL, user_id TEXT NOT NULL );
CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id);
CREATE TABLE IF NOT EXISTS "user_directory" ( user_id TEXT NOT NULL, room_id TEXT, display_name TEXT, avatar_url TEXT );
CREATE INDEX user_directory_room_idx ON user_directory(room_id);
CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
CREATE TABLE event_push_actions_staging ( event_id TEXT NOT NULL, user_id TEXT NOT NULL, actions TEXT NOT NULL, notif SMALLINT NOT NULL, highlight SMALLINT NOT NULL , unread SMALLINT, thread_id TEXT);
CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
CREATE TABLE users_pending_deactivation ( user_id TEXT NOT NULL );
CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, device_id TEXT, timestamp BIGINT NOT NULL , user_agent TEXT);
CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
CREATE TABLE erased_users ( user_id TEXT NOT NULL );
CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);
CREATE TABLE monthly_active_users ( user_id TEXT NOT NULL, timestamp BIGINT NOT NULL );
CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users(user_id);
CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users(timestamp);
CREATE TABLE IF NOT EXISTS "e2e_room_keys_versions" ( user_id TEXT NOT NULL, version BIGINT NOT NULL, algorithm TEXT NOT NULL, auth_data TEXT NOT NULL, deleted SMALLINT DEFAULT 0 NOT NULL , etag BIGINT);
CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version);
CREATE TABLE IF NOT EXISTS "e2e_room_keys" ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, session_id TEXT NOT NULL, version BIGINT NOT NULL, first_message_index INT, forwarded_count INT, is_verified BOOLEAN, session_data TEXT NOT NULL );
CREATE TABLE users_who_share_private_rooms ( user_id TEXT NOT NULL, other_user_id TEXT NOT NULL, room_id TEXT NOT NULL );
CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id);
CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id);
CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id);
CREATE TABLE user_threepid_id_server ( user_id TEXT NOT NULL, medium TEXT NOT NULL, address TEXT NOT NULL, id_server TEXT NOT NULL );
CREATE UNIQUE INDEX user_threepid_id_server_idx ON user_threepid_id_server( user_id, medium, address, id_server );
CREATE TABLE users_in_public_rooms ( user_id TEXT NOT NULL, room_id TEXT NOT NULL );
CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id, room_id);
CREATE TABLE account_validity ( user_id TEXT PRIMARY KEY, expiration_ts_ms BIGINT NOT NULL, email_sent BOOLEAN NOT NULL, renewal_token TEXT , token_used_ts_ms BIGINT);
CREATE TABLE event_relations ( event_id TEXT NOT NULL, relates_to_id TEXT NOT NULL, relation_type TEXT NOT NULL, aggregation_key TEXT );
CREATE UNIQUE INDEX event_relations_id ON event_relations(event_id);
CREATE INDEX event_relations_relates ON event_relations(relates_to_id, relation_type, aggregation_key);
CREATE TABLE room_stats_earliest_token ( room_id TEXT NOT NULL, token BIGINT NOT NULL );
CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
CREATE INDEX user_ips_device_id ON user_ips (user_id, device_id, last_seen);
CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
CREATE INDEX device_inbox_stream_id_user_id ON device_inbox (stream_id, user_id);
CREATE INDEX device_lists_stream_user_id ON device_lists_stream (user_id, device_id);
CREATE INDEX user_ips_last_seen ON user_ips (user_id, last_seen);
CREATE INDEX user_ips_last_seen_only ON user_ips (last_seen);
CREATE INDEX users_creation_ts ON users (creation_ts);
CREATE INDEX event_to_state_groups_sg_index ON event_to_state_groups (state_group);
CREATE UNIQUE INDEX device_lists_remote_cache_unique_id ON device_lists_remote_cache (user_id, device_id);
CREATE UNIQUE INDEX device_lists_remote_extremeties_unique_idx ON device_lists_remote_extremeties (user_id);
CREATE UNIQUE INDEX user_ips_user_token_ip_unique_index ON user_ips (user_id, access_token, ip);
CREATE TABLE threepid_validation_session (
session_id TEXT PRIMARY KEY,
medium TEXT NOT NULL,
address TEXT NOT NULL,
client_secret TEXT NOT NULL,
last_send_attempt BIGINT NOT NULL,
validated_at BIGINT
);
CREATE TABLE threepid_validation_token (
token TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
next_link TEXT,
expires BIGINT NOT NULL
);
CREATE INDEX threepid_validation_token_session_id ON threepid_validation_token(session_id);
CREATE TABLE event_expiry (
event_id TEXT PRIMARY KEY,
expiry_ts BIGINT NOT NULL
);
CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts);
CREATE TABLE event_labels (
event_id TEXT,
label TEXT,
room_id TEXT NOT NULL,
topological_ordering BIGINT NOT NULL,
PRIMARY KEY(event_id, label)
);
CREATE INDEX event_labels_room_id_label_idx ON event_labels(room_id, label, topological_ordering);
CREATE UNIQUE INDEX e2e_room_keys_with_version_idx ON e2e_room_keys(user_id, version, room_id, session_id);
CREATE TABLE IF NOT EXISTS "devices" (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
display_name TEXT,
last_seen BIGINT,
ip TEXT,
user_agent TEXT,
hidden BOOLEAN DEFAULT 0,
CONSTRAINT device_uniqueness UNIQUE (user_id, device_id)
);
CREATE TABLE room_retention(
room_id TEXT,
event_id TEXT,
min_lifetime BIGINT,
max_lifetime BIGINT,
PRIMARY KEY(room_id, event_id)
);
CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
CREATE TABLE e2e_cross_signing_keys (
user_id TEXT NOT NULL,
-- the type of cross-signing key (master, user_signing, or self_signing)
keytype TEXT NOT NULL,
-- the full key information, as a json-encoded dict
keydata TEXT NOT NULL,
-- for keeping the keys in order, so that we can fetch the latest one
stream_id BIGINT NOT NULL
);
CREATE UNIQUE INDEX e2e_cross_signing_keys_idx ON e2e_cross_signing_keys(user_id, keytype, stream_id);
CREATE TABLE e2e_cross_signing_signatures (
-- user who did the signing
user_id TEXT NOT NULL,
-- key used to sign
key_id TEXT NOT NULL,
-- user who was signed
target_user_id TEXT NOT NULL,
-- device/key that was signed
target_device_id TEXT NOT NULL,
-- the actual signature
signature TEXT NOT NULL
);
CREATE TABLE user_signature_stream (
-- uses the same stream ID as device list stream
stream_id BIGINT NOT NULL,
-- user who did the signing
from_user_id TEXT NOT NULL,
-- list of users who were signed, as a JSON array
user_ids TEXT NOT NULL
);
CREATE UNIQUE INDEX user_signature_stream_idx ON user_signature_stream(stream_id);
CREATE INDEX e2e_cross_signing_signatures2_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
CREATE TABLE stats_incremental_position (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);
CREATE TABLE room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
-- These are absolute counts
current_state_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
local_users_in_room INT NOT NULL,
-- The maximum delta stream position that this row takes into account.
completed_delta_stream_id BIGINT NOT NULL
, knocked_members INT);
CREATE TABLE user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
joined_rooms BIGINT NOT NULL,
-- The maximum delta stream position that this row takes into account.
completed_delta_stream_id BIGINT NOT NULL
);
CREATE TABLE room_stats_state (
room_id TEXT NOT NULL,
name TEXT,
canonical_alias TEXT,
join_rules TEXT,
history_visibility TEXT,
encryption TEXT,
avatar TEXT,
guest_access TEXT,
is_federatable BOOLEAN,
topic TEXT
, room_type TEXT);
CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id);
CREATE TABLE IF NOT EXISTS "user_filters" ( user_id TEXT NOT NULL, filter_id BIGINT NOT NULL, filter_json BYTEA NOT NULL );
CREATE UNIQUE INDEX user_filters_unique ON "user_filters" (user_id, filter_id);
CREATE TABLE user_external_ids (
auth_provider TEXT NOT NULL,
external_id TEXT NOT NULL,
user_id TEXT NOT NULL,
UNIQUE (auth_provider, external_id)
);
CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);
CREATE TABLE device_lists_remote_resync (
user_id TEXT NOT NULL,
added_ts BIGINT NOT NULL
);
CREATE UNIQUE INDEX device_lists_remote_resync_idx ON device_lists_remote_resync (user_id);
CREATE INDEX device_lists_remote_resync_ts_idx ON device_lists_remote_resync (added_ts);
CREATE TABLE local_current_membership (
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
membership TEXT NOT NULL
);
CREATE UNIQUE INDEX local_current_membership_idx ON local_current_membership(user_id, room_id);
CREATE INDEX local_current_membership_room_idx ON local_current_membership(room_id);
CREATE TABLE ui_auth_sessions(
session_id TEXT NOT NULL, -- The session ID passed to the client.
creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds).
serverdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data added by Synapse.
clientdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data from the client.
uri TEXT NOT NULL, -- The URI the UI authentication session is using.
method TEXT NOT NULL, -- The HTTP method the UI authentication session is using.
-- The clientdict, uri, and method make up an tuple that must be immutable
-- throughout the lifetime of the UI Auth session.
description TEXT NOT NULL, -- A human readable description of the operation which caused the UI Auth flow to occur.
UNIQUE (session_id)
);
CREATE TABLE ui_auth_sessions_credentials(
session_id TEXT NOT NULL, -- The corresponding UI Auth session.
stage_type TEXT NOT NULL, -- The stage type.
result TEXT NOT NULL, -- The result of the stage verification, stored as JSON.
UNIQUE (session_id, stage_type),
FOREIGN KEY (session_id)
REFERENCES ui_auth_sessions (session_id)
);
CREATE TABLE IF NOT EXISTS "device_lists_outbound_last_success" ( destination TEXT NOT NULL, user_id TEXT NOT NULL, stream_id BIGINT NOT NULL );
CREATE UNIQUE INDEX device_lists_outbound_last_success_unique_idx ON "device_lists_outbound_last_success" (destination, user_id);
CREATE TABLE IF NOT EXISTS "local_media_repository_thumbnails" ( media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_type TEXT, thumbnail_method TEXT, thumbnail_length INTEGER, UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type, thumbnail_method ) );
CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id);
CREATE TABLE IF NOT EXISTS "remote_media_cache_thumbnails" ( media_origin TEXT, media_id TEXT, thumbnail_width INTEGER, thumbnail_height INTEGER, thumbnail_method TEXT, thumbnail_type TEXT, thumbnail_length INTEGER, filesystem_id TEXT, UNIQUE ( media_origin, media_id, thumbnail_width, thumbnail_height, thumbnail_type, thumbnail_method ) );
CREATE TABLE ui_auth_sessions_ips(
session_id TEXT NOT NULL,
ip TEXT NOT NULL,
user_agent TEXT NOT NULL,
UNIQUE (session_id, ip, user_agent),
FOREIGN KEY (session_id)
REFERENCES ui_auth_sessions (session_id)
);
CREATE UNIQUE INDEX federation_stream_position_instance ON federation_stream_position(type, instance_name);
CREATE TABLE dehydrated_devices(
user_id TEXT NOT NULL PRIMARY KEY,
device_id TEXT NOT NULL,
device_data TEXT NOT NULL -- JSON-encoded client-defined data
);
CREATE TABLE e2e_fallback_keys_json (
user_id TEXT NOT NULL, -- The user this fallback key is for.
device_id TEXT NOT NULL, -- The device this fallback key is for.
algorithm TEXT NOT NULL, -- Which algorithm this fallback key is for.
key_id TEXT NOT NULL, -- An id for suppressing duplicate uploads.
key_json TEXT NOT NULL, -- The key as a JSON blob.
used BOOLEAN NOT NULL DEFAULT FALSE, -- Whether the key has been used or not.
CONSTRAINT e2e_fallback_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm)
);
CREATE TABLE destination_rooms (
-- the destination in question.
destination TEXT NOT NULL REFERENCES destinations (destination),
-- the ID of the room in question
room_id TEXT NOT NULL REFERENCES rooms (room_id),
-- the stream_ordering of the event
stream_ordering BIGINT NOT NULL,
PRIMARY KEY (destination, room_id)
-- We don't declare a foreign key on stream_ordering here because that'd mean
-- we'd need to either maintain an index (expensive) or do a table scan of
-- destination_rooms whenever we delete an event (also potentially expensive).
-- In addition to that, a foreign key on stream_ordering would be redundant
-- as this row doesn't need to refer to a specific event; if the event gets
-- deleted then it doesn't affect the validity of the stream_ordering here.
);
CREATE INDEX destination_rooms_room_id
ON destination_rooms (room_id);
CREATE TABLE stream_positions (
stream_name TEXT NOT NULL,
instance_name TEXT NOT NULL,
stream_id BIGINT NOT NULL
);
CREATE UNIQUE INDEX stream_positions_idx ON stream_positions(stream_name, instance_name);
CREATE TABLE IF NOT EXISTS "access_tokens" (
id BIGINT PRIMARY KEY,
user_id TEXT NOT NULL,
device_id TEXT,
token TEXT NOT NULL,
valid_until_ms BIGINT,
puppets_user_id TEXT,
last_validated BIGINT, refresh_token_id BIGINT REFERENCES refresh_tokens (id) ON DELETE CASCADE, used BOOLEAN,
UNIQUE(token)
);
CREATE INDEX access_tokens_device_id ON access_tokens (user_id, device_id);
CREATE TABLE IF NOT EXISTS "event_txn_id" (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
token_id BIGINT NOT NULL,
txn_id TEXT NOT NULL,
inserted_ts BIGINT NOT NULL,
FOREIGN KEY (event_id)
REFERENCES events (event_id) ON DELETE CASCADE,
FOREIGN KEY (token_id)
REFERENCES access_tokens (id) ON DELETE CASCADE
);
CREATE UNIQUE INDEX event_txn_id_event_id ON event_txn_id(event_id);
CREATE UNIQUE INDEX event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
CREATE INDEX event_txn_id_ts ON event_txn_id(inserted_ts);
CREATE TABLE ignored_users( ignorer_user_id TEXT NOT NULL, ignored_user_id TEXT NOT NULL );
CREATE UNIQUE INDEX ignored_users_uniqueness ON ignored_users (ignorer_user_id, ignored_user_id);
CREATE INDEX ignored_users_ignored_user_id ON ignored_users (ignored_user_id);
CREATE TABLE event_auth_chains (
event_id TEXT PRIMARY KEY,
chain_id BIGINT NOT NULL,
sequence_number BIGINT NOT NULL
);
CREATE UNIQUE INDEX event_auth_chains_c_seq_index ON event_auth_chains (chain_id, sequence_number);
CREATE TABLE event_auth_chain_links (
origin_chain_id BIGINT NOT NULL,
origin_sequence_number BIGINT NOT NULL,
target_chain_id BIGINT NOT NULL,
target_sequence_number BIGINT NOT NULL
);
CREATE INDEX event_auth_chain_links_idx ON event_auth_chain_links (origin_chain_id, target_chain_id);
CREATE TABLE event_auth_chain_to_calculate (
event_id TEXT PRIMARY KEY,
room_id TEXT NOT NULL,
type TEXT NOT NULL,
state_key TEXT NOT NULL
);
CREATE INDEX event_auth_chain_to_calculate_rm_id ON event_auth_chain_to_calculate(room_id);
CREATE TABLE users_to_send_full_presence_to(
-- The user ID to send full presence to.
user_id TEXT PRIMARY KEY,
-- A presence stream ID token - the current presence stream token when the row was last upserted.
-- If a user calls /sync and this token is part of the update they're to receive, we also include
-- full user presence in the response.
-- This allows multiple devices for a user to receive full presence whenever they next call /sync.
presence_stream_id BIGINT,
FOREIGN KEY (user_id)
REFERENCES users (name)
);
CREATE TABLE refresh_tokens (
id BIGINT PRIMARY KEY,
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
token TEXT NOT NULL,
-- When consumed, a new refresh token is generated, which is tracked by
-- this foreign key
next_token_id BIGINT REFERENCES refresh_tokens (id) ON DELETE CASCADE, expiry_ts BIGINT DEFAULT NULL, ultimate_session_expiry_ts BIGINT DEFAULT NULL,
UNIQUE(token)
);
CREATE TABLE worker_locks (
lock_name TEXT NOT NULL,
lock_key TEXT NOT NULL,
-- We write the instance name to ease manual debugging, we don't ever read
-- from it.
-- Note: instance names aren't guarenteed to be unique.
instance_name TEXT NOT NULL,
-- A random string generated each time an instance takes out a lock. Used by
-- the instance to tell whether the lock is still held by it (e.g. in the
-- case where the process stalls for a long time the lock may time out and
-- be taken out by another instance, at which point the original instance
-- can tell it no longer holds the lock as the tokens no longer match).
token TEXT NOT NULL,
last_renewed_ts BIGINT NOT NULL
);
CREATE UNIQUE INDEX worker_locks_key ON worker_locks (lock_name, lock_key);
CREATE TABLE federation_inbound_events_staging (
origin TEXT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
received_ts BIGINT NOT NULL,
event_json TEXT NOT NULL,
internal_metadata TEXT NOT NULL
);
CREATE INDEX federation_inbound_events_staging_room ON federation_inbound_events_staging(room_id, received_ts);
CREATE UNIQUE INDEX federation_inbound_events_staging_instance_event ON federation_inbound_events_staging(origin, event_id);
CREATE TABLE insertion_event_edges(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
insertion_prev_event_id TEXT NOT NULL
);
CREATE INDEX insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
CREATE INDEX insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
CREATE TABLE insertion_event_extremities(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL
);
CREATE UNIQUE INDEX insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
CREATE INDEX insertion_event_extremities_room_id ON insertion_event_extremities(room_id);
CREATE TABLE registration_tokens(
token TEXT NOT NULL, -- The token that can be used for authentication.
uses_allowed INT, -- The total number of times this token can be used. NULL if no limit.
pending INT NOT NULL, -- The number of in progress registrations using this token.
completed INT NOT NULL, -- The number of times this token has been used to complete a registration.
expiry_time BIGINT, -- The latest time this token will be valid (epoch time in milliseconds). NULL if token doesn't expire.
UNIQUE (token)
);
CREATE TABLE sessions(
session_type TEXT NOT NULL, -- The unique key for this type of session.
session_id TEXT NOT NULL, -- The session ID passed to the client.
value TEXT NOT NULL, -- A JSON dictionary to persist.
expiry_time_ms BIGINT NOT NULL, -- The time this session will expire (epoch time in milliseconds).
UNIQUE (session_type, session_id)
);
CREATE TABLE insertion_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
next_batch_id TEXT NOT NULL
);
CREATE UNIQUE INDEX insertion_events_event_id ON insertion_events(event_id);
CREATE INDEX insertion_events_next_batch_id ON insertion_events(next_batch_id);
CREATE TABLE batch_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
batch_id TEXT NOT NULL
);
CREATE UNIQUE INDEX batch_events_event_id ON batch_events(event_id);
CREATE INDEX batch_events_batch_id ON batch_events(batch_id);
CREATE INDEX insertion_event_edges_event_id ON insertion_event_edges(event_id);
CREATE TABLE device_auth_providers (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
auth_provider_id TEXT NOT NULL,
auth_provider_session_id TEXT NOT NULL
);
CREATE INDEX device_auth_providers_devices
ON device_auth_providers (user_id, device_id);
CREATE INDEX device_auth_providers_sessions
ON device_auth_providers (auth_provider_id, auth_provider_session_id);
CREATE INDEX refresh_tokens_next_token_id
ON refresh_tokens(next_token_id)
WHERE next_token_id IS NOT NULL;
CREATE TABLE partial_state_rooms (
room_id TEXT PRIMARY KEY,
FOREIGN KEY(room_id) REFERENCES rooms(room_id)
);
CREATE TABLE partial_state_rooms_servers (
room_id TEXT NOT NULL REFERENCES partial_state_rooms(room_id),
server_name TEXT NOT NULL,
UNIQUE(room_id, server_name)
);
CREATE TABLE partial_state_events (
-- the room_id is denormalised for efficient indexing (the canonical source is `events`)
room_id TEXT NOT NULL REFERENCES partial_state_rooms(room_id),
event_id TEXT NOT NULL REFERENCES events(event_id),
UNIQUE(event_id)
);
CREATE INDEX partial_state_events_room_id_idx
ON partial_state_events (room_id);
CREATE TRIGGER partial_state_events_bad_room_id
BEFORE INSERT ON partial_state_events
FOR EACH ROW
BEGIN
SELECT RAISE(ABORT, 'Incorrect room_id in partial_state_events')
WHERE EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.room_id != NEW.room_id
);
END;
CREATE TABLE device_lists_changes_in_room (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
room_id TEXT NOT NULL,
-- This initially matches `device_lists_stream.stream_id`. Note that we
-- delete older values from `device_lists_stream`, so we can't use a foreign
-- constraint here.
--
-- The table will contain rows with the same `stream_id` but different
-- `room_id`, as for each device update we store a row per room the user is
-- joined to. Therefore `(stream_id, room_id)` gives a unique index.
stream_id BIGINT NOT NULL,
-- We have a background process which goes through this table and converts
-- entries into rows in `device_lists_outbound_pokes`. Once we have processed
-- a row, we mark it as such by setting `converted_to_destinations=TRUE`.
converted_to_destinations BOOLEAN NOT NULL,
opentracing_context TEXT
);
CREATE UNIQUE INDEX device_lists_changes_in_stream_id ON device_lists_changes_in_room(stream_id, room_id);
CREATE INDEX device_lists_changes_in_stream_id_unconverted ON device_lists_changes_in_room(stream_id) WHERE NOT converted_to_destinations;
CREATE TABLE IF NOT EXISTS "event_edges" (
event_id TEXT NOT NULL,
prev_event_id TEXT NOT NULL,
room_id TEXT NULL,
is_state BOOL NOT NULL DEFAULT 0,
FOREIGN KEY(event_id) REFERENCES events(event_id)
);
CREATE UNIQUE INDEX event_edges_event_id_prev_event_id_idx
ON event_edges (event_id, prev_event_id);
CREATE INDEX ev_edges_prev_id ON event_edges (prev_event_id);
CREATE TABLE event_push_summary_last_receipt_stream_id (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);
CREATE TABLE IF NOT EXISTS "application_services_state" (
as_id TEXT PRIMARY KEY NOT NULL,
state VARCHAR(5),
read_receipt_stream_id BIGINT,
presence_stream_id BIGINT,
to_device_stream_id BIGINT,
device_list_stream_id BIGINT
);
CREATE TABLE IF NOT EXISTS "receipts_linearized" (
stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
thread_id TEXT,
event_stream_ordering BIGINT,
data TEXT NOT NULL,
CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id),
CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
);
CREATE TABLE IF NOT EXISTS "receipts_graph" (
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_ids TEXT NOT NULL,
thread_id TEXT,
data TEXT NOT NULL,
CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id),
CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
);
CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
CREATE INDEX redactions_have_censored_ts ON redactions (received_ts) WHERE NOT have_censored;
CREATE INDEX room_memberships_user_room_forgotten ON room_memberships (user_id, room_id) WHERE forgotten = 1;
CREATE INDEX users_have_local_media ON local_media_repository (user_id, created_ts) ;
CREATE UNIQUE INDEX e2e_cross_signing_keys_stream_idx ON e2e_cross_signing_keys (stream_id) ;
CREATE INDEX user_external_ids_user_id_idx ON user_external_ids (user_id) ;
CREATE INDEX presence_stream_state_not_offline_idx ON presence_stream (state) WHERE state != 'offline';
CREATE UNIQUE INDEX event_push_summary_unique_index ON event_push_summary (user_id, room_id) ;
CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ;
CREATE UNIQUE INDEX receipts_graph_unique_index ON receipts_graph (room_id, receipt_type, user_id) WHERE thread_id IS NULL;
CREATE UNIQUE INDEX receipts_linearized_unique_index ON receipts_linearized (room_id, receipt_type, user_id) WHERE thread_id IS NULL;
CREATE INDEX event_push_actions_stream_highlight_index ON event_push_actions (highlight, stream_ordering) WHERE highlight=0;
CREATE INDEX current_state_events_member_index ON current_state_events (state_key) WHERE type='m.room.member';
CREATE INDEX event_contains_url_index ON events (room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering) WHERE highlight=1;
CREATE INDEX local_media_repository_url_idx ON local_media_repository (created_ts) WHERE url_cache IS NOT NULL;
INSERT INTO appservice_stream_position VALUES('X',0);
INSERT INTO federation_stream_position VALUES('federation',-1,'master');
INSERT INTO federation_stream_position VALUES('events',-1,'master');
INSERT INTO event_push_summary_stream_ordering VALUES('X',0);
INSERT INTO user_directory_stream_pos VALUES('X',1);
INSERT INTO stats_incremental_position VALUES('X',1);
INSERT INTO event_push_summary_last_receipt_stream_id VALUES('X',0);

View File

@@ -0,0 +1,30 @@
CREATE TABLE state_group_edges (
state_group bigint NOT NULL,
prev_state_group bigint NOT NULL
);
CREATE SEQUENCE state_group_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
CREATE TABLE state_groups (
id bigint NOT NULL,
room_id text NOT NULL,
event_id text NOT NULL
);
CREATE TABLE state_groups_state (
state_group bigint NOT NULL,
room_id text NOT NULL,
type text NOT NULL,
state_key text NOT NULL,
event_id text NOT NULL
);
ALTER TABLE ONLY state_groups_state ALTER COLUMN state_group SET (n_distinct=-0.02);
ALTER TABLE ONLY state_groups
ADD CONSTRAINT state_groups_pkey PRIMARY KEY (id);
CREATE INDEX state_group_edges_prev_idx ON state_group_edges USING btree (prev_state_group);
CREATE UNIQUE INDEX state_group_edges_unique_idx ON state_group_edges USING btree (state_group, prev_state_group);
CREATE INDEX state_groups_room_id_idx ON state_groups USING btree (room_id);
CREATE INDEX state_groups_state_type_idx ON state_groups_state USING btree (state_group, type, state_key);
SELECT pg_catalog.setval('state_group_id_seq', 1, false);

View File

@@ -0,0 +1,20 @@
CREATE TABLE state_groups (
id BIGINT PRIMARY KEY,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL
);
CREATE TABLE state_groups_state (
state_group BIGINT NOT NULL,
room_id TEXT NOT NULL,
type TEXT NOT NULL,
state_key TEXT NOT NULL,
event_id TEXT NOT NULL
);
CREATE TABLE state_group_edges (
state_group BIGINT NOT NULL,
prev_state_group BIGINT NOT NULL
);
CREATE INDEX state_group_edges_prev_idx ON state_group_edges (prev_state_group);
CREATE INDEX state_groups_state_type_idx ON state_groups_state (state_group, type, state_key);
CREATE INDEX state_groups_room_id_idx ON state_groups (room_id) ;
CREATE UNIQUE INDEX state_group_edges_unique_idx ON state_group_edges (state_group, prev_state_group) ;

View File

@@ -835,6 +835,7 @@ class ReadReceipt:
receipt_type: str
user_id: str
event_ids: List[str]
thread_id: Optional[str]
data: JsonDict

View File

@@ -49,7 +49,12 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
"room_id",
"m.read",
"user_id",
["event_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
@@ -89,7 +94,12 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
"room_id",
"m.read",
"user_id",
["event_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
@@ -121,7 +131,12 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
# send the second RR
receipt = ReadReceipt(
"room_id", "m.read", "user_id", ["other_id"], {"ts": 1234}
"room_id",
"m.read",
"user_id",
["other_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
self.pump()

View File

@@ -447,6 +447,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
thread_id=None,
data={},
)
)

View File

@@ -129,7 +129,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
async def check_host_in_room(room_id: str, server_name: str) -> bool:
return room_id == ROOM_ID
hs.get_event_auth_handler().check_host_in_room = check_host_in_room
hs.get_event_auth_handler().is_host_in_room = check_host_in_room
async def get_current_hosts_in_room(room_id: str):
return {member.domain for member in self.room_members}
@@ -138,6 +138,10 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
get_current_hosts_in_room
)
hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = (
get_current_hosts_in_room
)
async def get_users_in_room(room_id: str):
return {str(u) for u in self.room_members}

View File

@@ -171,7 +171,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
if send_receipt:
self.get_success(
self.master_store.insert_receipt(
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {}
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], None, {}
)
)

View File

@@ -33,7 +33,12 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
# tell the master to send a new receipt
self.get_success(
self.hs.get_datastores().main.insert_receipt(
"!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1}
"!room:blue",
"m.read",
USER_ID,
["$event:blue"],
thread_id=None,
data={"a": 1},
)
)
self.replicate()
@@ -48,6 +53,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
self.assertEqual("$event:blue", row.event_id)
self.assertIsNone(row.thread_id)
self.assertEqual({"a": 1}, row.data)
# Now let's disconnect and insert some data.
@@ -57,7 +63,12 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
self.get_success(
self.hs.get_datastores().main.insert_receipt(
"!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2}
"!room2:blue",
"m.read",
USER_ID,
["$event2:foo"],
thread_id=None,
data={"a": 2},
)
)
self.replicate()

View File

@@ -12,25 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Tuple, Union
import datetime
from typing import Dict, List, Tuple, Union
import attr
from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
)
from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder
from synapse.server import HomeServer
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict
from synapse.util import Clock, json_encoder
import tests.unittest
import tests.utils
@attr.s(auto_attribs=True, frozen=True, slots=True)
class _BackfillSetupInfo:
room_id: str
depth_map: Dict[str, int]
class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
def test_get_prev_events_for_room(self):
@@ -571,11 +584,471 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
)
self.assertEqual(count, 1)
_, event_id = self.get_success(
next_staged_event_info = self.get_success(
self.store.get_next_staged_event_id_for_room(room_id)
)
assert next_staged_event_info
_, event_id = next_staged_event_info
self.assertEqual(event_id, "$fake_event_id_500")
def _setup_room_for_backfill_tests(self) -> _BackfillSetupInfo:
"""
Sets up a room with various events and backward extremities to test
backfill functions against.
Returns:
_BackfillSetupInfo including the `room_id` to test against and
`depth_map` of events in the room
"""
room_id = "!backfill-room-test:some-host"
# The silly graph we use to test grabbing backward extremities,
# where the top is the oldest events.
# 1 (oldest)
# |
# 2 ⹁
# | \
# | [b1, b2, b3]
# | |
# | A
# | /
# 3 {
# | \
# | [b4, b5, b6]
# | |
# | B
# | /
# 4 ´
# |
# 5 (newest)
event_graph: Dict[str, List[str]] = {
"1": [],
"2": ["1"],
"3": ["2", "A"],
"4": ["3", "B"],
"5": ["4"],
"A": ["b1", "b2", "b3"],
"b1": ["2"],
"b2": ["2"],
"b3": ["2"],
"B": ["b4", "b5", "b6"],
"b4": ["3"],
"b5": ["3"],
"b6": ["3"],
}
depth_map: Dict[str, int] = {
"1": 1,
"2": 2,
"b1": 3,
"b2": 3,
"b3": 3,
"A": 4,
"3": 5,
"b4": 6,
"b5": 6,
"b6": 6,
"B": 7,
"4": 8,
"5": 9,
}
# The events we have persisted on our server.
# The rest are events in the room but not backfilled tet.
our_server_events = {"5", "4", "B", "3", "A"}
complete_event_dict_map: Dict[str, JsonDict] = {}
stream_ordering = 0
for (event_id, prev_event_ids) in event_graph.items():
depth = depth_map[event_id]
complete_event_dict_map[event_id] = {
"event_id": event_id,
"type": "test_regular_type",
"room_id": room_id,
"sender": "@sender",
"prev_event_ids": prev_event_ids,
"auth_event_ids": [],
"origin_server_ts": stream_ordering,
"depth": depth,
"stream_ordering": stream_ordering,
"content": {"body": "event" + event_id},
}
stream_ordering += 1
def populate_db(txn: LoggingTransaction):
# Insert the room to satisfy the foreign key constraint of
# `event_failed_pull_attempts`
self.store.db_pool.simple_insert_txn(
txn,
"rooms",
{
"room_id": room_id,
"creator": "room_creator_user_id",
"is_public": True,
"room_version": "6",
},
)
# Insert our server events
for event_id in our_server_events:
event_dict = complete_event_dict_map[event_id]
self.store.db_pool.simple_insert_txn(
txn,
table="events",
values={
"event_id": event_dict.get("event_id"),
"type": event_dict.get("type"),
"room_id": event_dict.get("room_id"),
"depth": event_dict.get("depth"),
"topological_ordering": event_dict.get("depth"),
"stream_ordering": event_dict.get("stream_ordering"),
"processed": True,
"outlier": False,
},
)
# Insert the event edges
for event_id in our_server_events:
for prev_event_id in event_graph[event_id]:
self.store.db_pool.simple_insert_txn(
txn,
table="event_edges",
values={
"event_id": event_id,
"prev_event_id": prev_event_id,
"room_id": room_id,
},
)
# Insert the backward extremities
prev_events_of_our_events = {
prev_event_id
for our_server_event in our_server_events
for prev_event_id in complete_event_dict_map[our_server_event][
"prev_event_ids"
]
}
backward_extremities = prev_events_of_our_events - our_server_events
for backward_extremity in backward_extremities:
self.store.db_pool.simple_insert_txn(
txn,
table="event_backward_extremities",
values={
"event_id": backward_extremity,
"room_id": room_id,
},
)
self.get_success(
self.store.db_pool.runInteraction(
"_setup_room_for_backfill_tests_populate_db",
populate_db,
)
)
return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
def test_get_backfill_points_in_room(self):
"""
Test to make sure we get some backfill points
"""
setup_info = self._setup_room_for_backfill_tests()
room_id = setup_info.room_id
backfill_points = self.get_success(
self.store.get_backfill_points_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(
backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
)
def test_get_backfill_points_in_room_excludes_events_we_have_attempted(
self,
):
"""
Test to make sure that events we have attempted to backfill (and within
backoff timeout duration) do not show up as an event to backfill again.
"""
setup_info = self._setup_room_for_backfill_tests()
room_id = setup_info.room_id
# Record some attempts to backfill these events which will make
# `get_backfill_points_in_room` exclude them because we
# haven't passed the backoff interval.
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b5", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b4", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b2", "fake cause")
)
# No time has passed since we attempted to backfill ^
backfill_points = self.get_success(
self.store.get_backfill_points_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
# Only the backfill points that we didn't record earlier exist here.
self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"])
def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
self,
):
"""
Test to make sure after we fake attempt to backfill event "b3" many times,
we can see retry and see the "b3" again after the backoff timeout duration
has exceeded.
"""
setup_info = self._setup_room_for_backfill_tests()
room_id = setup_info.room_id
# Record some attempts to backfill these events which will make
# `get_backfill_points_in_room` exclude them because we
# haven't passed the backoff interval.
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
)
self.get_success(
self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
)
# Now advance time by 2 hours and we should only be able to see "b3"
# because we have waited long enough for the single attempt (2^1 hours)
# but we still shouldn't see "b1" because we haven't waited long enough
# for this many attempts. We didn't do anything to "b2" so it should be
# visible regardless.
self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
# Make sure that "b1" is not in the list because we've
# already attempted many times
backfill_points = self.get_success(
self.store.get_backfill_points_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2"])
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
# see if we can now backfill it
self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
# Try again after we advanced enough time and we should see "b3" again
backfill_points = self.get_success(
self.store.get_backfill_points_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(
backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
)
def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
"""
Sets up a room with various insertion event backward extremities to test
backfill functions against.
Returns:
_BackfillSetupInfo including the `room_id` to test against and
`depth_map` of events in the room
"""
room_id = "!backfill-room-test:some-host"
depth_map: Dict[str, int] = {
"1": 1,
"2": 2,
"insertion_eventA": 3,
"3": 4,
"insertion_eventB": 5,
"4": 6,
"5": 7,
}
def populate_db(txn: LoggingTransaction):
# Insert the room to satisfy the foreign key constraint of
# `event_failed_pull_attempts`
self.store.db_pool.simple_insert_txn(
txn,
"rooms",
{
"room_id": room_id,
"creator": "room_creator_user_id",
"is_public": True,
"room_version": "6",
},
)
# Insert our server events
stream_ordering = 0
for event_id, depth in depth_map.items():
self.store.db_pool.simple_insert_txn(
txn,
table="events",
values={
"event_id": event_id,
"type": EventTypes.MSC2716_INSERTION
if event_id.startswith("insertion_event")
else "test_regular_type",
"room_id": room_id,
"depth": depth,
"topological_ordering": depth,
"stream_ordering": stream_ordering,
"processed": True,
"outlier": False,
},
)
if event_id.startswith("insertion_event"):
self.store.db_pool.simple_insert_txn(
txn,
table="insertion_event_extremities",
values={
"event_id": event_id,
"room_id": room_id,
},
)
stream_ordering += 1
self.get_success(
self.store.db_pool.runInteraction(
"_setup_room_for_insertion_backfill_tests_populate_db",
populate_db,
)
)
return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
def test_get_insertion_event_backward_extremities_in_room(self):
"""
Test to make sure insertion event backward extremities are returned.
"""
setup_info = self._setup_room_for_insertion_backfill_tests()
room_id = setup_info.room_id
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(
backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
)
def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted(
self,
):
"""
Test to make sure that insertion events we have attempted to backfill
(and within backoff timeout duration) do not show up as an event to
backfill again.
"""
setup_info = self._setup_room_for_insertion_backfill_tests()
room_id = setup_info.room_id
# Record some attempts to backfill these events which will make
# `get_insertion_event_backward_extremities_in_room` exclude them
# because we haven't passed the backoff interval.
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
# No time has passed since we attempted to backfill ^
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
# Only the backfill points that we didn't record earlier exist here.
self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
self,
):
"""
Test to make sure after we fake attempt to backfill event
"insertion_eventA" many times, we can see retry and see the
"insertion_eventA" again after the backoff timeout duration has
exceeded.
"""
setup_info = self._setup_room_for_insertion_backfill_tests()
room_id = setup_info.room_id
# Record some attempts to backfill these events which will make
# `get_backfill_points_in_room` exclude them because we
# haven't passed the backoff interval.
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventB", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "insertion_eventA", "fake cause"
)
)
# Now advance time by 2 hours and we should only be able to see
# "insertion_eventB" because we have waited long enough for the single
# attempt (2^1 hours) but we still shouldn't see "insertion_eventA"
# because we haven't waited long enough for this many attempts.
self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
# Make sure that "insertion_eventA" is not in the list because we've
# already attempted many times
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
# see if we can now backfill it
self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
# Try at "insertion_eventA" again after we advanced enough time and we
# should see "insertion_eventA" again
backfill_points = self.get_success(
self.store.get_insertion_event_backward_extremities_in_room(room_id)
)
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertListEqual(
backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
)
@attr.s
class FakeEvent:

View File

@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Tuple
from twisted.test.proto_helpers import MemoryReactor
from synapse.rest import admin
@@ -22,8 +24,6 @@ from synapse.util import Clock
from tests.unittest import HomeserverTestCase
USER_ID = "@user:example.com"
class EventPushActionsStoreTestCase(HomeserverTestCase):
servlets = [
@@ -38,21 +38,13 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
assert persist_events_store is not None
self.persist_events_store = persist_events_store
def test_get_unread_push_actions_for_user_in_range_for_http(self) -> None:
self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_http(
USER_ID, 0, 1000, 20
)
)
def _create_users_and_room(self) -> Tuple[str, str, str, str, str]:
"""
Creates two users and a shared room.
def test_get_unread_push_actions_for_user_in_range_for_email(self) -> None:
self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_email(
USER_ID, 0, 1000, 20
)
)
def test_count_aggregation(self) -> None:
Returns:
Tuple of (user 1 ID, user 1 token, user 2 ID, user 2 token, room ID).
"""
# Create a user to receive notifications and send receipts.
user_id = self.register_user("user1235", "pass")
token = self.login("user1235", "pass")
@@ -65,6 +57,70 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
room_id = self.helper.create_room_as(user_id, tok=token)
self.helper.join(room_id, other_id, tok=other_token)
return user_id, token, other_id, other_token, room_id
def test_get_unread_push_actions_for_user_in_range(self) -> None:
"""Test getting unread push actions for HTTP and email pushers."""
user_id, token, _, other_token, room_id = self._create_users_and_room()
# Create two events, one of which is a highlight.
self.helper.send_event(
room_id,
type="m.room.message",
content={"msgtype": "m.text", "body": "msg"},
tok=other_token,
)
event_id = self.helper.send_event(
room_id,
type="m.room.message",
content={"msgtype": "m.text", "body": user_id},
tok=other_token,
)["event_id"]
# Fetch unread actions for HTTP pushers.
http_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_http(
user_id, 0, 1000, 20
)
)
self.assertEqual(2, len(http_actions))
# Fetch unread actions for email pushers.
email_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_email(
user_id, 0, 1000, 20
)
)
self.assertEqual(2, len(email_actions))
# Send a receipt, which should clear any actions.
self.get_success(
self.store.insert_receipt(
room_id,
"m.read",
user_id=user_id,
event_ids=[event_id],
thread_id=None,
data={},
)
)
http_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_http(
user_id, 0, 1000, 20
)
)
self.assertEqual([], http_actions)
email_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_email(
user_id, 0, 1000, 20
)
)
self.assertEqual([], email_actions)
def test_count_aggregation(self) -> None:
# Create a user to receive notifications and send receipts.
user_id, token, _, other_token, room_id = self._create_users_and_room()
last_event_id: str
def _assert_counts(noitf_count: int, highlight_count: int) -> None:
@@ -106,6 +162,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
"m.read",
user_id=user_id,
event_ids=[event_id],
thread_id=None,
data={},
)
)

View File

@@ -131,13 +131,18 @@ class ReceiptTestCase(HomeserverTestCase):
# Send public read receipt for the first event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {}
)
)
# Send private read receipt for the second event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {}
self.room_id1,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event1_2_id],
None,
{},
)
)
@@ -164,7 +169,7 @@ class ReceiptTestCase(HomeserverTestCase):
# Test receipt updating
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {}
)
)
res = self.get_success(
@@ -180,7 +185,12 @@ class ReceiptTestCase(HomeserverTestCase):
# Test new room is reflected in what the method returns
self.get_success(
self.store.insert_receipt(
self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {}
self.room_id2,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event2_1_id],
None,
{},
)
)
res = self.get_success(
@@ -202,13 +212,18 @@ class ReceiptTestCase(HomeserverTestCase):
# Send public read receipt for the first event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {}
)
)
# Send private read receipt for the second event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {}
self.room_id1,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event1_2_id],
None,
{},
)
)
@@ -241,7 +256,7 @@ class ReceiptTestCase(HomeserverTestCase):
# Test receipt updating
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {}
)
)
res = self.get_success(
@@ -259,7 +274,12 @@ class ReceiptTestCase(HomeserverTestCase):
# Test new room is reflected in what the method returns
self.get_success(
self.store.insert_receipt(
self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {}
self.room_id2,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event2_1_id],
None,
{},
)
)
res = self.get_success(