Compare commits
1 Commits
v1.97.0
...
clokep/sta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9622bda163 |
2
.github/workflows/release-artifacts.yml
vendored
2
.github/workflows/release-artifacts.yml
vendored
@@ -130,7 +130,7 @@ jobs:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Install cibuildwheel
|
||||
run: python -m pip install cibuildwheel==2.16.2
|
||||
run: python -m pip install cibuildwheel==2.9.0
|
||||
|
||||
- name: Set up QEMU to emulate aarch64
|
||||
if: matrix.arch == 'aarch64'
|
||||
|
||||
101
CHANGES.md
101
CHANGES.md
@@ -1,104 +1,3 @@
|
||||
# Synapse 1.97.0 (2023-11-28)
|
||||
|
||||
Synapse will soon be forked by Element under an AGPLv3.0 licence (with CLA, for
|
||||
proprietary dual licensing). You can read more about this here:
|
||||
|
||||
- https://matrix.org/blog/2023/11/06/future-of-synapse-dendrite/
|
||||
- https://element.io/blog/element-to-adopt-agplv3/
|
||||
|
||||
The Matrix.org Foundation copy of the project will be archived. Any changes needed
|
||||
by server administrators will be communicated via our usual announcements channels,
|
||||
but we are striving to make this as seamless as possible.
|
||||
|
||||
|
||||
No significant changes since 1.97.0rc1.
|
||||
|
||||
|
||||
# Synapse 1.97.0rc1 (2023-11-21)
|
||||
|
||||
### Features
|
||||
|
||||
- Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper. ([\#15503](https://github.com/matrix-org/synapse/issues/15503))
|
||||
- Improve the performance of some operations in multi-worker deployments. ([\#16613](https://github.com/matrix-org/synapse/issues/16613), [\#16616](https://github.com/matrix-org/synapse/issues/16616))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix a long-standing bug where some queries updated the same row twice. Introduced in Synapse 1.57.0. ([\#16609](https://github.com/matrix-org/synapse/issues/16609))
|
||||
- Fix a long-standing bug where Synapse would not unbind third-party identifiers for Application Service users when deactivated and would not emit a compliant response. ([\#16617](https://github.com/matrix-org/synapse/issues/16617))
|
||||
- Fix sending out of order `POSITION` over replication, causing additional database load. ([\#16639](https://github.com/matrix-org/synapse/issues/16639))
|
||||
|
||||
### Improved Documentation
|
||||
|
||||
- Note that the option [`outbound_federation_restricted_to`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#outbound_federation_restricted_to) was added in Synapse 1.89.0, and fix a nearby formatting error. ([\#16628](https://github.com/matrix-org/synapse/issues/16628))
|
||||
- Update parameter information for the `/timestamp_to_event` admin API. ([\#16631](https://github.com/matrix-org/synapse/issues/16631))
|
||||
- Provide an example for a common encrypted media response from the admin user media API and mention possible null values. ([\#16654](https://github.com/matrix-org/synapse/issues/16654))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Remove whole table locks on push rule modifications. Contributed by Nick @ Beeper (@fizzadar). ([\#16051](https://github.com/matrix-org/synapse/issues/16051))
|
||||
- Support reactor tick timings on more types of event loops. ([\#16532](https://github.com/matrix-org/synapse/issues/16532))
|
||||
- Improve type hints. ([\#16564](https://github.com/matrix-org/synapse/issues/16564), [\#16611](https://github.com/matrix-org/synapse/issues/16611), [\#16612](https://github.com/matrix-org/synapse/issues/16612))
|
||||
- Avoid executing no-op queries. ([\#16583](https://github.com/matrix-org/synapse/issues/16583))
|
||||
- Simplify persistence code to be per-room. ([\#16584](https://github.com/matrix-org/synapse/issues/16584))
|
||||
- Use standard SQL helpers in persistence code. ([\#16585](https://github.com/matrix-org/synapse/issues/16585))
|
||||
- Avoid updating the stream cache unnecessarily. ([\#16586](https://github.com/matrix-org/synapse/issues/16586))
|
||||
- Improve performance when using opentracing. ([\#16589](https://github.com/matrix-org/synapse/issues/16589))
|
||||
- Run push rule evaluator setup in parallel. ([\#16590](https://github.com/matrix-org/synapse/issues/16590))
|
||||
- Improve tests of the SQL generator. ([\#16596](https://github.com/matrix-org/synapse/issues/16596))
|
||||
- Use more generic database methods. ([\#16615](https://github.com/matrix-org/synapse/issues/16615))
|
||||
- Use `dbname` instead of the deprecated `database` connection parameter for psycopg2. ([\#16618](https://github.com/matrix-org/synapse/issues/16618))
|
||||
- Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA. ([\#16634](https://github.com/matrix-org/synapse/issues/16634))
|
||||
- Improve references to GitHub issues. ([\#16637](https://github.com/matrix-org/synapse/issues/16637), [\#16638](https://github.com/matrix-org/synapse/issues/16638))
|
||||
- More efficiently handle no-op `POSITION` over replication. ([\#16640](https://github.com/matrix-org/synapse/issues/16640), [\#16655](https://github.com/matrix-org/synapse/issues/16655))
|
||||
- Speed up deleting of device messages when deleting a device. ([\#16643](https://github.com/matrix-org/synapse/issues/16643))
|
||||
- Speed up persisting large number of outliers. ([\#16649](https://github.com/matrix-org/synapse/issues/16649))
|
||||
- Reduce max concurrency of background tasks, reducing potential max DB load. ([\#16656](https://github.com/matrix-org/synapse/issues/16656), [\#16660](https://github.com/matrix-org/synapse/issues/16660))
|
||||
- Speed up purge room by adding an index to `event_push_summary`. ([\#16657](https://github.com/matrix-org/synapse/issues/16657))
|
||||
|
||||
|
||||
|
||||
### Updates to locked dependencies
|
||||
|
||||
* Bump prometheus-client from 0.17.1 to 0.18.0. ([\#16626](https://github.com/matrix-org/synapse/issues/16626))
|
||||
* Bump pyicu from 2.11 to 2.12. ([\#16603](https://github.com/matrix-org/synapse/issues/16603))
|
||||
* Bump requests-toolbelt from 0.10.1 to 1.0.0. ([\#16659](https://github.com/matrix-org/synapse/issues/16659))
|
||||
* Bump ruff from 0.0.292 to 0.1.4. ([\#16600](https://github.com/matrix-org/synapse/issues/16600))
|
||||
* Bump serde from 1.0.190 to 1.0.192. ([\#16627](https://github.com/matrix-org/synapse/issues/16627))
|
||||
* Bump serde_json from 1.0.107 to 1.0.108. ([\#16604](https://github.com/matrix-org/synapse/issues/16604))
|
||||
* Bump setuptools-rust from 1.8.0 to 1.8.1. ([\#16601](https://github.com/matrix-org/synapse/issues/16601))
|
||||
* Bump towncrier from 23.6.0 to 23.11.0. ([\#16622](https://github.com/matrix-org/synapse/issues/16622))
|
||||
* Bump treq from 22.2.0 to 23.11.0. ([\#16623](https://github.com/matrix-org/synapse/issues/16623))
|
||||
* Bump twisted from 23.8.0 to 23.10.0. ([\#16588](https://github.com/matrix-org/synapse/issues/16588))
|
||||
* Bump types-bleach from 6.1.0.0 to 6.1.0.1. ([\#16624](https://github.com/matrix-org/synapse/issues/16624))
|
||||
* Bump types-jsonschema from 4.19.0.3 to 4.19.0.4. ([\#16599](https://github.com/matrix-org/synapse/issues/16599))
|
||||
* Bump types-pyopenssl from 23.2.0.2 to 23.3.0.0. ([\#16625](https://github.com/matrix-org/synapse/issues/16625))
|
||||
* Bump types-pyyaml from 6.0.12.11 to 6.0.12.12. ([\#16602](https://github.com/matrix-org/synapse/issues/16602))
|
||||
|
||||
# Synapse 1.96.1 (2023-11-17)
|
||||
|
||||
Synapse will soon be forked by Element under an AGPLv3.0 licence (with CLA, for
|
||||
proprietary dual licensing). You can read more about this here:
|
||||
|
||||
* https://matrix.org/blog/2023/11/06/future-of-synapse-dendrite/
|
||||
* https://element.io/blog/element-to-adopt-agplv3/
|
||||
|
||||
The Matrix.org Foundation copy of the project will be archived. Any changes needed
|
||||
by server administrators will be communicated via our usual
|
||||
[announcements channels](https://matrix.to/#/#homeowners:matrix.org), but we are
|
||||
striving to make this as seamless as possible.
|
||||
|
||||
This minor release was needed only because of CI-related trouble on [v1.96.0](https://github.com/matrix-org/synapse/releases/tag/v1.96.0), which was never released.
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Fix building of wheels in CI. ([\#16653](https://github.com/matrix-org/synapse/issues/16653))
|
||||
|
||||
# Synapse 1.96.0 (2023-11-16)
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix "'int' object is not iterable" error in `set_device_id_for_pushers` background update introduced in Synapse 1.95.0. ([\#16594](https://github.com/matrix-org/synapse/issues/16594))
|
||||
|
||||
# Synapse 1.96.0rc1 (2023-10-31)
|
||||
|
||||
### Features
|
||||
|
||||
1
changelog.d/15503.feature
Normal file
1
changelog.d/15503.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper.
|
||||
1
changelog.d/16051.misc
Normal file
1
changelog.d/16051.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove whole table locks on push rule modifications. Contributed by Nick @ Beeper (@fizzadar).
|
||||
1
changelog.d/16456.misc
Normal file
1
changelog.d/16456.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add a Postgres `REPLICA IDENTITY` to tables that do not have an implicit one. This should allow use of Postgres logical replication.
|
||||
1
changelog.d/16532.misc
Normal file
1
changelog.d/16532.misc
Normal file
@@ -0,0 +1 @@
|
||||
Support reactor tick timings on more types of event loops.
|
||||
1
changelog.d/16564.misc
Normal file
1
changelog.d/16564.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve type hints.
|
||||
1
changelog.d/16583.misc
Normal file
1
changelog.d/16583.misc
Normal file
@@ -0,0 +1 @@
|
||||
Avoid executing no-op queries.
|
||||
1
changelog.d/16584.misc
Normal file
1
changelog.d/16584.misc
Normal file
@@ -0,0 +1 @@
|
||||
Simplify persistance code to be per-room.
|
||||
1
changelog.d/16585.misc
Normal file
1
changelog.d/16585.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use standard SQL helpers in persistence code.
|
||||
1
changelog.d/16586.misc
Normal file
1
changelog.d/16586.misc
Normal file
@@ -0,0 +1 @@
|
||||
Avoid updating the stream cache unnecessarily.
|
||||
1
changelog.d/16588.misc
Normal file
1
changelog.d/16588.misc
Normal file
@@ -0,0 +1 @@
|
||||
Bump twisted from 23.8.0 to 23.10.0.
|
||||
1
changelog.d/16589.misc
Normal file
1
changelog.d/16589.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve performance when using opentracing.
|
||||
1
changelog.d/16590.misc
Normal file
1
changelog.d/16590.misc
Normal file
@@ -0,0 +1 @@
|
||||
Run push rule evaluator setup in parallel.
|
||||
1
changelog.d/16596.misc
Normal file
1
changelog.d/16596.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve tests of the SQL generator.
|
||||
1
changelog.d/16605.misc
Normal file
1
changelog.d/16605.misc
Normal file
@@ -0,0 +1 @@
|
||||
Bump setuptools-rust from 1.8.0 to 1.8.1.
|
||||
1
changelog.d/16609.bugfix
Normal file
1
changelog.d/16609.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug where some queries updated the same row twice. Introduced in Synapse 1.57.0.
|
||||
1
changelog.d/16611.misc
Normal file
1
changelog.d/16611.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve type hints.
|
||||
1
changelog.d/16612.misc
Normal file
1
changelog.d/16612.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve type hints.
|
||||
1
changelog.d/16613.feature
Normal file
1
changelog.d/16613.feature
Normal file
@@ -0,0 +1 @@
|
||||
Improve the performance of some operations in multi-worker deployments.
|
||||
1
changelog.d/16615.misc
Normal file
1
changelog.d/16615.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use more generic database methods.
|
||||
1
changelog.d/16616.feature
Normal file
1
changelog.d/16616.feature
Normal file
@@ -0,0 +1 @@
|
||||
Improve the performance of some operations in multi-worker deployments.
|
||||
1
changelog.d/16617.bugfix
Normal file
1
changelog.d/16617.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug where Synapse would not unbind third-party identifiers for Application Service users when deactivated and would not emit a compliant response.
|
||||
1
changelog.d/16618.misc
Normal file
1
changelog.d/16618.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use `dbname` instead of the deprecated `database` connection parameter for psycopg2.
|
||||
1
changelog.d/16628.doc
Normal file
1
changelog.d/16628.doc
Normal file
@@ -0,0 +1 @@
|
||||
Note that the option [`outbound_federation_restricted_to`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#outbound_federation_restricted_to) was added in Synapse 1.89.0, and fix a nearby formatting error.
|
||||
1
changelog.d/16631.doc
Normal file
1
changelog.d/16631.doc
Normal file
@@ -0,0 +1 @@
|
||||
Update parameter information for the `/timestamp_to_event` admin API.
|
||||
1
changelog.d/16634.misc
Normal file
1
changelog.d/16634.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA.
|
||||
1
changelog.d/16637.misc
Normal file
1
changelog.d/16637.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve references to GitHub issues.
|
||||
1
changelog.d/16638.misc
Normal file
1
changelog.d/16638.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve references to GitHub issues.
|
||||
24
debian/changelog
vendored
24
debian/changelog
vendored
@@ -1,27 +1,3 @@
|
||||
matrix-synapse-py3 (1.97.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.97.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 28 Nov 2023 14:08:58 +0000
|
||||
|
||||
matrix-synapse-py3 (1.97.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.97.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 21 Nov 2023 12:32:03 +0000
|
||||
|
||||
matrix-synapse-py3 (1.96.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.96.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Fri, 17 Nov 2023 12:48:45 +0000
|
||||
|
||||
matrix-synapse-py3 (1.96.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.96.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 16 Nov 2023 17:54:26 +0000
|
||||
|
||||
matrix-synapse-py3 (1.96.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.96.0rc1.
|
||||
|
||||
@@ -618,16 +618,6 @@ A response body like the following is returned:
|
||||
"quarantined_by": null,
|
||||
"safe_from_quarantine": false,
|
||||
"upload_name": "test2.png"
|
||||
},
|
||||
{
|
||||
"created_ts": 300400,
|
||||
"last_access_ts": 300700,
|
||||
"media_id": "BzYNLRUgGHphBkdKGbzXwbjX",
|
||||
"media_length": 1337,
|
||||
"media_type": "application/octet-stream",
|
||||
"quarantined_by": null,
|
||||
"safe_from_quarantine": false,
|
||||
"upload_name": null
|
||||
}
|
||||
],
|
||||
"next_token": 3,
|
||||
@@ -689,17 +679,16 @@ The following fields are returned in the JSON response body:
|
||||
- `media` - An array of objects, each containing information about a media.
|
||||
Media objects contain the following fields:
|
||||
- `created_ts` - integer - Timestamp when the content was uploaded in ms.
|
||||
- `last_access_ts` - integer or null - Timestamp when the content was last accessed in ms.
|
||||
Null if there was no access, yet.
|
||||
- `last_access_ts` - integer - Timestamp when the content was last accessed in ms.
|
||||
- `media_id` - string - The id used to refer to the media. Details about the format
|
||||
are documented under
|
||||
[media repository](../media_repository.md).
|
||||
- `media_length` - integer - Length of the media in bytes.
|
||||
- `media_type` - string - The MIME-type of the media.
|
||||
- `quarantined_by` - string or null - The user ID that initiated the quarantine request
|
||||
for this media. Null if not quarantined.
|
||||
- `quarantined_by` - string - The user ID that initiated the quarantine request
|
||||
for this media.
|
||||
- `safe_from_quarantine` - bool - Status if this media is safe from quarantining.
|
||||
- `upload_name` - string or null - The name the media was uploaded with. Null if not provided during upload.
|
||||
- `upload_name` - string - The name the media was uploaded with.
|
||||
- `next_token`: integer - Indication for pagination. See above.
|
||||
- `total` - integer - Total number of media.
|
||||
|
||||
|
||||
8
poetry.lock
generated
8
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "alabaster"
|
||||
@@ -2273,13 +2273,13 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||
|
||||
[[package]]
|
||||
name = "requests-toolbelt"
|
||||
version = "1.0.0"
|
||||
version = "0.10.1"
|
||||
description = "A utility belt for advanced users of python-requests"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
|
||||
files = [
|
||||
{file = "requests-toolbelt-1.0.0.tar.gz", hash = "sha256:7681a0a3d047012b5bdc0ee37d7f8f07ebe76ab08caeccfc3921ce23c88d5bc6"},
|
||||
{file = "requests_toolbelt-1.0.0-py2.py3-none-any.whl", hash = "sha256:cccfdd665f0a24fcf4726e690f65639d272bb0637b9b92dfd91a5568ccf6bd06"},
|
||||
{file = "requests-toolbelt-0.10.1.tar.gz", hash = "sha256:62e09f7ff5ccbda92772a29f394a49c3ad6cb181d568b1337626b2abb628a63d"},
|
||||
{file = "requests_toolbelt-0.10.1-py2.py3-none-any.whl", hash = "sha256:18565aa58116d9951ac39baa288d3adb5b3ff975c4f25eee78555d89e8f247f7"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
||||
@@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.97.0"
|
||||
version = "1.96.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -383,7 +383,7 @@ class DeviceWorkerHandler:
|
||||
)
|
||||
|
||||
DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
|
||||
DEVICE_MSGS_DELETE_SLEEP_MS = 100
|
||||
DEVICE_MSGS_DELETE_SLEEP_MS = 1000
|
||||
|
||||
async def _delete_device_messages(
|
||||
self,
|
||||
@@ -396,17 +396,15 @@ class DeviceWorkerHandler:
|
||||
up_to_stream_id = task.params["up_to_stream_id"]
|
||||
|
||||
# Delete the messages in batches to avoid too much DB load.
|
||||
from_stream_id = None
|
||||
while True:
|
||||
from_stream_id, _ = await self.store.delete_messages_for_device_between(
|
||||
res = await self.store.delete_messages_for_device(
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
from_stream_id=from_stream_id,
|
||||
to_stream_id=up_to_stream_id,
|
||||
up_to_stream_id=up_to_stream_id,
|
||||
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
|
||||
)
|
||||
|
||||
if from_stream_id is None:
|
||||
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
|
||||
return TaskStatus.COMPLETE, None, None
|
||||
|
||||
await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
|
||||
|
||||
@@ -88,7 +88,7 @@ from synapse.types import (
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
|
||||
from synapse.util.iterutils import batch_iter, partition
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.stringutils import shortstr
|
||||
|
||||
@@ -1669,13 +1669,14 @@ class FederationEventHandler:
|
||||
|
||||
# XXX: it might be possible to kick this process off in parallel with fetching
|
||||
# the events.
|
||||
while event_map:
|
||||
# build a list of events whose auth events are not in the queue.
|
||||
roots = tuple(
|
||||
ev
|
||||
for ev in event_map.values()
|
||||
if not any(aid in event_map for aid in ev.auth_event_ids())
|
||||
)
|
||||
|
||||
# We need to persist an event's auth events before the event.
|
||||
auth_graph = {
|
||||
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
|
||||
for ev in event_map.values()
|
||||
}
|
||||
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
|
||||
if not roots:
|
||||
# if *none* of the remaining events are ready, that means
|
||||
# we have a loop. This either means a bug in our logic, or that
|
||||
@@ -1697,6 +1698,9 @@ class FederationEventHandler:
|
||||
|
||||
await self._auth_and_persist_outliers_inner(room_id, roots)
|
||||
|
||||
for ev in roots:
|
||||
del event_map[ev.event_id]
|
||||
|
||||
async def _auth_and_persist_outliers_inner(
|
||||
self, room_id: str, fetched_events: Collection[EventBase]
|
||||
) -> None:
|
||||
|
||||
@@ -257,11 +257,6 @@ class ReplicationCommandHandler:
|
||||
if hs.config.redis.redis_enabled:
|
||||
self._notifier.add_lock_released_callback(self.on_lock_released)
|
||||
|
||||
# Marks if we should send POSITION commands for all streams ASAP. This
|
||||
# is checked by the `ReplicationStreamer` which manages sending
|
||||
# RDATA/POSITION commands
|
||||
self._should_announce_positions = True
|
||||
|
||||
def subscribe_to_channel(self, channel_name: str) -> None:
|
||||
"""
|
||||
Indicates that we wish to subscribe to a Redis channel by name.
|
||||
@@ -402,23 +397,29 @@ class ReplicationCommandHandler:
|
||||
return self._streams_to_replicate
|
||||
|
||||
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
|
||||
self.send_positions_to_connection()
|
||||
self.send_positions_to_connection(conn)
|
||||
|
||||
def send_positions_to_connection(self) -> None:
|
||||
def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
|
||||
"""Send current position of all streams this process is source of to
|
||||
the connection.
|
||||
"""
|
||||
|
||||
self._should_announce_positions = True
|
||||
self._notifier.notify_replication()
|
||||
|
||||
def should_announce_positions(self) -> bool:
|
||||
"""Check if we should send POSITION commands for all streams ASAP."""
|
||||
return self._should_announce_positions
|
||||
|
||||
def will_announce_positions(self) -> None:
|
||||
"""Mark that we're about to send POSITIONs out for all streams."""
|
||||
self._should_announce_positions = False
|
||||
# We respond with current position of all streams this instance
|
||||
# replicates.
|
||||
for stream in self.get_streams_to_replicate():
|
||||
# Note that we use the current token as the prev token here (rather
|
||||
# than stream.last_token), as we can't be sure that there have been
|
||||
# no rows written between last token and the current token (since we
|
||||
# might be racing with the replication sending bg process).
|
||||
current_token = stream.current_token(self._instance_name)
|
||||
self.send_command(
|
||||
PositionCommand(
|
||||
stream.NAME,
|
||||
self._instance_name,
|
||||
current_token,
|
||||
current_token,
|
||||
)
|
||||
)
|
||||
|
||||
def on_USER_SYNC(
|
||||
self, conn: IReplicationConnection, cmd: UserSyncCommand
|
||||
@@ -587,21 +588,6 @@ class ReplicationCommandHandler:
|
||||
|
||||
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
|
||||
|
||||
# Check if we can early discard this position. We can only do so for
|
||||
# connected streams.
|
||||
stream = self._streams[cmd.stream_name]
|
||||
if stream.can_discard_position(
|
||||
cmd.instance_name, cmd.prev_token, cmd.new_token
|
||||
) and self.is_stream_connected(conn, cmd.stream_name):
|
||||
logger.debug(
|
||||
"Discarding redundant POSITION %s/%s %s %s",
|
||||
cmd.instance_name,
|
||||
cmd.stream_name,
|
||||
cmd.prev_token,
|
||||
cmd.new_token,
|
||||
)
|
||||
return
|
||||
|
||||
self._add_command_to_stream_queue(conn, cmd)
|
||||
|
||||
async def _process_position(
|
||||
@@ -613,18 +599,6 @@ class ReplicationCommandHandler:
|
||||
"""
|
||||
stream = self._streams[stream_name]
|
||||
|
||||
if stream.can_discard_position(
|
||||
cmd.instance_name, cmd.prev_token, cmd.new_token
|
||||
) and self.is_stream_connected(conn, cmd.stream_name):
|
||||
logger.debug(
|
||||
"Discarding redundant POSITION %s/%s %s %s",
|
||||
cmd.instance_name,
|
||||
cmd.stream_name,
|
||||
cmd.prev_token,
|
||||
cmd.new_token,
|
||||
)
|
||||
return
|
||||
|
||||
# We're about to go and catch up with the stream, so remove from set
|
||||
# of connected streams.
|
||||
for streams in self._streams_by_connection.values():
|
||||
@@ -652,9 +626,8 @@ class ReplicationCommandHandler:
|
||||
# for why this can happen.
|
||||
|
||||
logger.info(
|
||||
"Fetching replication rows for '%s' / %s between %i and %i",
|
||||
"Fetching replication rows for '%s' between %i and %i",
|
||||
stream_name,
|
||||
cmd.instance_name,
|
||||
current_token,
|
||||
cmd.new_token,
|
||||
)
|
||||
@@ -684,13 +657,6 @@ class ReplicationCommandHandler:
|
||||
|
||||
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
|
||||
|
||||
def is_stream_connected(
|
||||
self, conn: IReplicationConnection, stream_name: str
|
||||
) -> bool:
|
||||
"""Return if stream has been successfully connected and is ready to
|
||||
receive updates"""
|
||||
return stream_name in self._streams_by_connection.get(conn, ())
|
||||
|
||||
def on_REMOTE_SERVER_UP(
|
||||
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
|
||||
) -> None:
|
||||
|
||||
@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
|
||||
# We send out our positions when there is a new connection in case the
|
||||
# other side missed updates. We do this for Redis connections as the
|
||||
# otherside won't know we've connected and so won't issue a REPLICATE.
|
||||
self.synapse_handler.send_positions_to_connection()
|
||||
self.synapse_handler.send_positions_to_connection(self)
|
||||
|
||||
def messageReceived(self, pattern: str, channel: str, message: str) -> None:
|
||||
"""Received a message from redis."""
|
||||
|
||||
@@ -123,7 +123,7 @@ class ReplicationStreamer:
|
||||
|
||||
# We check up front to see if anything has actually changed, as we get
|
||||
# poked because of changes that happened on other instances.
|
||||
if not self.command_handler.should_announce_positions() and all(
|
||||
if all(
|
||||
stream.last_token == stream.current_token(self._instance_name)
|
||||
for stream in self.streams
|
||||
):
|
||||
@@ -158,21 +158,6 @@ class ReplicationStreamer:
|
||||
all_streams = list(all_streams)
|
||||
random.shuffle(all_streams)
|
||||
|
||||
if self.command_handler.should_announce_positions():
|
||||
# We need to send out POSITIONs for all streams, usually
|
||||
# because a worker has reconnected.
|
||||
self.command_handler.will_announce_positions()
|
||||
|
||||
for stream in all_streams:
|
||||
self.command_handler.send_command(
|
||||
PositionCommand(
|
||||
stream.NAME,
|
||||
self._instance_name,
|
||||
stream.last_token,
|
||||
stream.last_token,
|
||||
)
|
||||
)
|
||||
|
||||
for stream in all_streams:
|
||||
if stream.last_token == stream.current_token(
|
||||
self._instance_name
|
||||
|
||||
@@ -144,16 +144,6 @@ class Stream:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def can_discard_position(
|
||||
self, instance_name: str, prev_token: int, new_token: int
|
||||
) -> bool:
|
||||
"""Whether or not a position command for this stream can be discarded.
|
||||
|
||||
Useful for streams that can never go backwards and where we already know
|
||||
the stream ID for the instance has advanced.
|
||||
"""
|
||||
return False
|
||||
|
||||
def discard_updates_and_advance(self) -> None:
|
||||
"""Called when the stream should advance but the updates would be discarded,
|
||||
e.g. when there are no currently connected workers.
|
||||
@@ -231,14 +221,6 @@ class _StreamFromIdGen(Stream):
|
||||
def minimal_local_current_token(self) -> Token:
|
||||
return self._stream_id_gen.get_minimal_local_current_token()
|
||||
|
||||
def can_discard_position(
|
||||
self, instance_name: str, prev_token: int, new_token: int
|
||||
) -> bool:
|
||||
# These streams can't go backwards, so we know we can ignore any
|
||||
# positions where the tokens are from before the current token.
|
||||
|
||||
return new_token <= self.current_token(instance_name)
|
||||
|
||||
|
||||
def current_token_without_instance(
|
||||
current_token: Callable[[], int]
|
||||
@@ -305,14 +287,6 @@ class BackfillStream(Stream):
|
||||
# which means we need to negate it.
|
||||
return -self.store._backfill_id_gen.get_minimal_local_current_token()
|
||||
|
||||
def can_discard_position(
|
||||
self, instance_name: str, prev_token: int, new_token: int
|
||||
) -> bool:
|
||||
# Backfill stream can't go backwards, so we know we can ignore any
|
||||
# positions where the tokens are from before the current token.
|
||||
|
||||
return new_token <= self.current_token(instance_name)
|
||||
|
||||
|
||||
class PresenceStream(_StreamFromIdGen):
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -527,14 +501,6 @@ class CachesStream(Stream):
|
||||
return self.store._cache_id_gen.get_minimal_local_current_token()
|
||||
return self.current_token(self.local_instance_name)
|
||||
|
||||
def can_discard_position(
|
||||
self, instance_name: str, prev_token: int, new_token: int
|
||||
) -> bool:
|
||||
# Caches streams can't go backwards, so we know we can ignore any
|
||||
# positions where the tokens are from before the current token.
|
||||
|
||||
return new_token <= self.current_token(instance_name)
|
||||
|
||||
|
||||
class DeviceListsStream(_StreamFromIdGen):
|
||||
"""Either a user has updated their devices or a remote server needs to be
|
||||
|
||||
@@ -768,8 +768,9 @@ class BackgroundUpdater:
|
||||
|
||||
# override the global statement timeout to avoid accidentally squashing
|
||||
# a long-running index creation process
|
||||
timeout_sql = "SET SESSION statement_timeout = 0"
|
||||
c.execute(timeout_sql)
|
||||
self.db_pool.engine.attempt_to_set_statement_timeout(
|
||||
c, 0, for_transaction=True
|
||||
)
|
||||
|
||||
sql = (
|
||||
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
|
||||
@@ -791,12 +792,6 @@ class BackgroundUpdater:
|
||||
logger.debug("[SQL] %s", sql)
|
||||
c.execute(sql)
|
||||
finally:
|
||||
# mypy ignore - `statement_timeout` is defined on PostgresEngine
|
||||
# reset the global timeout to the default
|
||||
default_timeout = self.db_pool.engine.statement_timeout # type: ignore[attr-defined]
|
||||
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
|
||||
conn.cursor().execute(undo_timeout_sql)
|
||||
|
||||
conn.engine.attempt_to_set_autocommit(conn.conn, False)
|
||||
|
||||
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
|
||||
|
||||
@@ -450,12 +450,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
user_id: str,
|
||||
device_id: Optional[str],
|
||||
up_to_stream_id: int,
|
||||
limit: Optional[int] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Args:
|
||||
user_id: The recipient user_id.
|
||||
device_id: The recipient device_id.
|
||||
up_to_stream_id: Where to delete messages up to.
|
||||
limit: maximum number of messages to delete
|
||||
|
||||
Returns:
|
||||
The number of messages deleted.
|
||||
@@ -476,22 +478,32 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
log_kv({"message": "No changes in cache since last check"})
|
||||
return 0
|
||||
|
||||
from_stream_id = None
|
||||
count = 0
|
||||
while True:
|
||||
from_stream_id, loop_count = await self.delete_messages_for_device_between(
|
||||
user_id,
|
||||
device_id,
|
||||
from_stream_id=from_stream_id,
|
||||
to_stream_id=up_to_stream_id,
|
||||
limit=1000,
|
||||
)
|
||||
count += loop_count
|
||||
if from_stream_id is None:
|
||||
break
|
||||
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
|
||||
limit_statement = "" if limit is None else f"LIMIT {limit}"
|
||||
sql = f"""
|
||||
DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
|
||||
SELECT MAX(stream_id) FROM (
|
||||
SELECT stream_id FROM device_inbox
|
||||
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
|
||||
ORDER BY stream_id
|
||||
{limit_statement}
|
||||
) AS q1
|
||||
)
|
||||
"""
|
||||
txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
|
||||
return txn.rowcount
|
||||
|
||||
count = await self.db_pool.runInteraction(
|
||||
"delete_messages_for_device", delete_messages_for_device_txn
|
||||
)
|
||||
|
||||
log_kv({"message": f"deleted {count} messages for device", "count": count})
|
||||
|
||||
# In this case we don't know if we hit the limit or the delete is complete
|
||||
# so let's not update the cache.
|
||||
if count == limit:
|
||||
return count
|
||||
|
||||
# Update the cache, ensuring that we only ever increase the value
|
||||
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
(user_id, device_id), 0
|
||||
@@ -502,74 +514,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
|
||||
return count
|
||||
|
||||
@trace
|
||||
async def delete_messages_for_device_between(
|
||||
self,
|
||||
user_id: str,
|
||||
device_id: Optional[str],
|
||||
from_stream_id: Optional[int],
|
||||
to_stream_id: int,
|
||||
limit: int,
|
||||
) -> Tuple[Optional[int], int]:
|
||||
"""Delete N device messages between the stream IDs, returning the
|
||||
highest stream ID deleted (or None if all messages in the range have
|
||||
been deleted) and the number of messages deleted.
|
||||
|
||||
This is more efficient than `delete_messages_for_device` when calling in
|
||||
a loop to batch delete messages.
|
||||
"""
|
||||
|
||||
# Keeping track of a lower bound of stream ID where we've deleted
|
||||
# everything below makes the queries much faster. Otherwise, every time
|
||||
# we scan for rows to delete we'd re-scan across all the rows that have
|
||||
# previously deleted (until the next table VACUUM).
|
||||
|
||||
if from_stream_id is None:
|
||||
# Minimum device stream ID is 1.
|
||||
from_stream_id = 0
|
||||
|
||||
def delete_messages_for_device_between_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[Optional[int], int]:
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT MAX(stream_id) FROM (
|
||||
SELECT stream_id FROM device_inbox
|
||||
WHERE user_id = ? AND device_id = ?
|
||||
AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id
|
||||
LIMIT ?
|
||||
) AS d
|
||||
""",
|
||||
(user_id, device_id, from_stream_id, to_stream_id, limit),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is None or row[0] is None:
|
||||
return None, 0
|
||||
|
||||
(max_stream_id,) = row
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM device_inbox
|
||||
WHERE user_id = ? AND device_id = ?
|
||||
AND ? < stream_id AND stream_id <= ?
|
||||
""",
|
||||
(user_id, device_id, from_stream_id, max_stream_id),
|
||||
)
|
||||
|
||||
num_deleted = txn.rowcount
|
||||
if num_deleted < limit:
|
||||
return None, num_deleted
|
||||
|
||||
return max_stream_id, num_deleted
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"delete_messages_for_device_between",
|
||||
delete_messages_for_device_between_txn,
|
||||
db_autocommit=True, # We don't need to run in a transaction
|
||||
)
|
||||
|
||||
@trace
|
||||
async def get_new_device_msgs_for_remote(
|
||||
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
|
||||
|
||||
@@ -311,14 +311,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
self._background_drop_null_thread_id_indexes,
|
||||
)
|
||||
|
||||
# Add a room ID index to speed up room deletion
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"event_push_summary_index_room_id",
|
||||
index_name="event_push_summary_index_room_id",
|
||||
table="event_push_summary",
|
||||
columns=["room_id"],
|
||||
)
|
||||
|
||||
async def _background_drop_null_thread_id_indexes(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
|
||||
@@ -89,10 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# furthermore, we might already have the table from a previous (failed)
|
||||
# purge attempt, so let's drop the table first.
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# Disable statement timeouts for this transaction; purging rooms can
|
||||
# take a while!
|
||||
txn.execute("SET LOCAL statement_timeout = 0")
|
||||
# Disable statement timeouts for this transaction; purging rooms can
|
||||
# take a while!
|
||||
self.database_engine.attempt_to_set_statement_timeout(
|
||||
txn, 0, for_transaction=True
|
||||
)
|
||||
|
||||
txn.execute("DROP TABLE IF EXISTS events_to_purge")
|
||||
|
||||
|
||||
@@ -601,7 +601,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
|
||||
(last_pusher_id, batch_size),
|
||||
)
|
||||
|
||||
rows = cast(List[Tuple[int, Optional[str], Optional[str]]], txn.fetchall())
|
||||
rows = txn.fetchall()
|
||||
if len(rows) == 0:
|
||||
return 0
|
||||
|
||||
@@ -617,7 +617,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
|
||||
txn=txn,
|
||||
table="pushers",
|
||||
key_names=("id",),
|
||||
key_values=[(row[0],) for row in rows],
|
||||
key_values=[row[0] for row in rows],
|
||||
value_names=("device_id", "access_token"),
|
||||
# If there was already a device_id on the pusher, we only want to clear
|
||||
# the access_token column, so we keep the existing device_id. Otherwise,
|
||||
|
||||
@@ -36,6 +36,9 @@ CursorType = TypeVar("CursorType", bound=Cursor)
|
||||
|
||||
|
||||
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
|
||||
# The default statement timeout to use for transactions.
|
||||
statement_timeout: Optional[int] = None
|
||||
|
||||
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
|
||||
self.module = module
|
||||
|
||||
@@ -132,6 +135,16 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def attempt_to_set_statement_timeout(
|
||||
self, cursor: CursorType, statement_timeout: int, for_transaction: bool
|
||||
) -> None:
|
||||
"""Attempt to set the cursor's statement timeout.
|
||||
|
||||
Note this has no effect on SQLite3.
|
||||
"""
|
||||
...
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def executescript(cursor: CursorType, script: str) -> None:
|
||||
|
||||
@@ -52,7 +52,7 @@ class PostgresEngine(
|
||||
# some degenerate query plan has been created and the client has probably
|
||||
# timed out/walked off anyway.
|
||||
# This is in milliseconds.
|
||||
self.statement_timeout: Optional[int] = database_config.get(
|
||||
self.statement_timeout = database_config.get(
|
||||
"statement_timeout", 60 * 60 * 1000
|
||||
)
|
||||
self._version: Optional[int] = None # unknown as yet
|
||||
@@ -169,7 +169,11 @@ class PostgresEngine(
|
||||
|
||||
# Abort really long-running statements and turn them into errors.
|
||||
if self.statement_timeout is not None:
|
||||
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
|
||||
self.attempt_to_set_statement_timeout(
|
||||
cast(psycopg2.extensions.cursor, cursor.txn),
|
||||
self.statement_timeout,
|
||||
for_transaction=False,
|
||||
)
|
||||
|
||||
cursor.close()
|
||||
db_conn.commit()
|
||||
@@ -233,6 +237,18 @@ class PostgresEngine(
|
||||
isolation_level = self.isolation_level_map[isolation_level]
|
||||
return conn.set_isolation_level(isolation_level)
|
||||
|
||||
def attempt_to_set_statement_timeout(
|
||||
self,
|
||||
cursor: psycopg2.extensions.cursor,
|
||||
statement_timeout: int,
|
||||
for_transaction: bool,
|
||||
) -> None:
|
||||
if for_transaction:
|
||||
sql = "SET LOCAL statement_timeout TO ?"
|
||||
else:
|
||||
sql = "SET statement_timeout TO ?"
|
||||
cursor.execute(sql, (statement_timeout,))
|
||||
|
||||
@staticmethod
|
||||
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
|
||||
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
||||
|
||||
@@ -143,6 +143,12 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
|
||||
# All transactions are SERIALIZABLE by default in sqlite
|
||||
pass
|
||||
|
||||
def attempt_to_set_statement_timeout(
|
||||
self, cursor: sqlite3.Cursor, statement_timeout: int, for_transaction: bool
|
||||
) -> None:
|
||||
# Not supported.
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
|
||||
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
|
||||
-- Any table that doesn't have a primary key should be annotated explicitly with
|
||||
-- a REPLICA IDENTITY so that logical replication can be used.
|
||||
-- If this is not done, then UPDATE and DELETE statements on those tables
|
||||
-- will fail if logical replication is in use.
|
||||
|
||||
|
||||
-- Where possible, re-use unique indices already defined on tables as a replica
|
||||
-- identity.
|
||||
ALTER TABLE appservice_room_list REPLICA IDENTITY USING INDEX appservice_room_list_idx;
|
||||
ALTER TABLE batch_events REPLICA IDENTITY USING INDEX chunk_events_event_id;
|
||||
ALTER TABLE blocked_rooms REPLICA IDENTITY USING INDEX blocked_rooms_idx;
|
||||
ALTER TABLE cache_invalidation_stream_by_instance REPLICA IDENTITY USING INDEX cache_invalidation_stream_by_instance_id;
|
||||
ALTER TABLE device_lists_changes_in_room REPLICA IDENTITY USING INDEX device_lists_changes_in_stream_id;
|
||||
ALTER TABLE device_lists_outbound_last_success REPLICA IDENTITY USING INDEX device_lists_outbound_last_success_unique_idx;
|
||||
ALTER TABLE device_lists_remote_cache REPLICA IDENTITY USING INDEX device_lists_remote_cache_unique_id;
|
||||
ALTER TABLE device_lists_remote_extremeties REPLICA IDENTITY USING INDEX device_lists_remote_extremeties_unique_idx;
|
||||
ALTER TABLE device_lists_remote_resync REPLICA IDENTITY USING INDEX device_lists_remote_resync_idx;
|
||||
ALTER TABLE e2e_cross_signing_keys REPLICA IDENTITY USING INDEX e2e_cross_signing_keys_stream_idx;
|
||||
ALTER TABLE e2e_room_keys REPLICA IDENTITY USING INDEX e2e_room_keys_with_version_idx;
|
||||
ALTER TABLE e2e_room_keys_versions REPLICA IDENTITY USING INDEX e2e_room_keys_versions_idx;
|
||||
ALTER TABLE erased_users REPLICA IDENTITY USING INDEX erased_users_user;
|
||||
ALTER TABLE event_relations REPLICA IDENTITY USING INDEX event_relations_id;
|
||||
ALTER TABLE federation_inbound_events_staging REPLICA IDENTITY USING INDEX federation_inbound_events_staging_instance_event;
|
||||
ALTER TABLE federation_stream_position REPLICA IDENTITY USING INDEX federation_stream_position_instance;
|
||||
ALTER TABLE ignored_users REPLICA IDENTITY USING INDEX ignored_users_uniqueness;
|
||||
ALTER TABLE insertion_events REPLICA IDENTITY USING INDEX insertion_events_event_id;
|
||||
ALTER TABLE insertion_event_extremities REPLICA IDENTITY USING INDEX insertion_event_extremities_event_id;
|
||||
ALTER TABLE monthly_active_users REPLICA IDENTITY USING INDEX monthly_active_users_users;
|
||||
ALTER TABLE ratelimit_override REPLICA IDENTITY USING INDEX ratelimit_override_idx;
|
||||
ALTER TABLE room_stats_earliest_token REPLICA IDENTITY USING INDEX room_stats_earliest_token_idx;
|
||||
ALTER TABLE room_stats_state REPLICA IDENTITY USING INDEX room_stats_state_room;
|
||||
ALTER TABLE stream_positions REPLICA IDENTITY USING INDEX stream_positions_idx;
|
||||
ALTER TABLE user_directory REPLICA IDENTITY USING INDEX user_directory_user_idx;
|
||||
ALTER TABLE user_directory_search REPLICA IDENTITY USING INDEX user_directory_search_user_idx;
|
||||
ALTER TABLE user_ips REPLICA IDENTITY USING INDEX user_ips_user_token_ip_unique_index;
|
||||
ALTER TABLE user_signature_stream REPLICA IDENTITY USING INDEX user_signature_stream_idx;
|
||||
ALTER TABLE users_in_public_rooms REPLICA IDENTITY USING INDEX users_in_public_rooms_u_idx;
|
||||
ALTER TABLE users_who_share_private_rooms REPLICA IDENTITY USING INDEX users_who_share_private_rooms_u_idx;
|
||||
ALTER TABLE user_threepid_id_server REPLICA IDENTITY USING INDEX user_threepid_id_server_idx;
|
||||
ALTER TABLE worker_locks REPLICA IDENTITY USING INDEX worker_locks_key;
|
||||
|
||||
|
||||
-- Where there are no unique indices, use the entire rows as replica identities.
|
||||
ALTER TABLE current_state_delta_stream REPLICA IDENTITY FULL;
|
||||
ALTER TABLE deleted_pushers REPLICA IDENTITY FULL;
|
||||
ALTER TABLE device_auth_providers REPLICA IDENTITY FULL;
|
||||
ALTER TABLE device_federation_inbox REPLICA IDENTITY FULL;
|
||||
ALTER TABLE device_federation_outbox REPLICA IDENTITY FULL;
|
||||
ALTER TABLE device_inbox REPLICA IDENTITY FULL;
|
||||
ALTER TABLE device_lists_outbound_pokes REPLICA IDENTITY FULL;
|
||||
ALTER TABLE device_lists_stream REPLICA IDENTITY FULL;
|
||||
ALTER TABLE e2e_cross_signing_signatures REPLICA IDENTITY FULL;
|
||||
ALTER TABLE event_auth_chain_links REPLICA IDENTITY FULL;
|
||||
ALTER TABLE event_auth REPLICA IDENTITY FULL;
|
||||
ALTER TABLE event_push_actions_staging REPLICA IDENTITY FULL;
|
||||
ALTER TABLE insertion_event_edges REPLICA IDENTITY FULL;
|
||||
ALTER TABLE local_media_repository_url_cache REPLICA IDENTITY FULL;
|
||||
ALTER TABLE presence_stream REPLICA IDENTITY FULL;
|
||||
ALTER TABLE push_rules_stream REPLICA IDENTITY FULL;
|
||||
ALTER TABLE room_alias_servers REPLICA IDENTITY FULL;
|
||||
ALTER TABLE stream_ordering_to_exterm REPLICA IDENTITY FULL;
|
||||
ALTER TABLE timeline_gaps REPLICA IDENTITY FULL;
|
||||
ALTER TABLE user_daily_visits REPLICA IDENTITY FULL;
|
||||
ALTER TABLE users_pending_deactivation REPLICA IDENTITY FULL;
|
||||
|
||||
-- special cases: unique indices on nullable columns can't be used
|
||||
ALTER TABLE event_push_summary REPLICA IDENTITY FULL;
|
||||
ALTER TABLE event_search REPLICA IDENTITY FULL;
|
||||
ALTER TABLE local_media_repository_thumbnails REPLICA IDENTITY FULL;
|
||||
ALTER TABLE remote_media_cache_thumbnails REPLICA IDENTITY FULL;
|
||||
ALTER TABLE threepid_guest_access_tokens REPLICA IDENTITY FULL;
|
||||
ALTER TABLE user_filters REPLICA IDENTITY FULL; -- sadly the `CHECK` constraint is not enough here
|
||||
@@ -1,17 +0,0 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8306, 'event_push_summary_index_room_id', '{}');
|
||||
@@ -0,0 +1,30 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
|
||||
-- Any table that doesn't have a primary key should be annotated explicitly with
|
||||
-- a REPLICA IDENTITY so that logical replication can be used.
|
||||
-- If this is not done, then UPDATE and DELETE statements on those tables
|
||||
-- will fail if logical replication is in use.
|
||||
-- See also: 82/04_replica_identities.sql.postgres on the main database
|
||||
|
||||
|
||||
-- Where possible, re-use unique indices already defined on tables as a replica
|
||||
-- identity.
|
||||
ALTER TABLE state_group_edges REPLICA IDENTITY USING INDEX state_group_edges_unique_idx;
|
||||
|
||||
|
||||
-- Where there are no unique indices, use the entire rows as replica identities.
|
||||
ALTER TABLE state_groups_state REPLICA IDENTITY FULL;
|
||||
@@ -135,54 +135,3 @@ def sorted_topologically(
|
||||
degree_map[edge] -= 1
|
||||
if degree_map[edge] == 0:
|
||||
heapq.heappush(zero_degree, edge)
|
||||
|
||||
|
||||
def sorted_topologically_batched(
|
||||
nodes: Iterable[T],
|
||||
graph: Mapping[T, Collection[T]],
|
||||
) -> Generator[Collection[T], None, None]:
|
||||
r"""Walk the graph topologically, returning batches of nodes where all nodes
|
||||
that references it have been previously returned.
|
||||
|
||||
For example, given the following graph:
|
||||
|
||||
A
|
||||
/ \
|
||||
B C
|
||||
\ /
|
||||
D
|
||||
|
||||
This function will return: `[[A], [B, C], [D]]`.
|
||||
|
||||
This function is useful for e.g. batch persisting events in an auth chain,
|
||||
where we can only persist an event if all its auth events have already been
|
||||
persisted.
|
||||
"""
|
||||
|
||||
degree_map = {node: 0 for node in nodes}
|
||||
reverse_graph: Dict[T, Set[T]] = {}
|
||||
|
||||
for node, edges in graph.items():
|
||||
if node not in degree_map:
|
||||
continue
|
||||
|
||||
for edge in set(edges):
|
||||
if edge in degree_map:
|
||||
degree_map[node] += 1
|
||||
|
||||
reverse_graph.setdefault(edge, set()).add(node)
|
||||
reverse_graph.setdefault(node, set())
|
||||
|
||||
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
|
||||
|
||||
while zero_degree:
|
||||
new_zero_degree = []
|
||||
for node in zero_degree:
|
||||
for edge in reverse_graph.get(node, []):
|
||||
if edge in degree_map:
|
||||
degree_map[edge] -= 1
|
||||
if degree_map[edge] == 0:
|
||||
new_zero_degree.append(edge)
|
||||
|
||||
yield zero_degree
|
||||
zero_degree = new_zero_degree
|
||||
|
||||
@@ -71,7 +71,7 @@ class TaskScheduler:
|
||||
# Time before a complete or failed task is deleted from the DB
|
||||
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
|
||||
# Maximum number of tasks that can run at the same time
|
||||
MAX_CONCURRENT_RUNNING_TASKS = 5
|
||||
MAX_CONCURRENT_RUNNING_TASKS = 10
|
||||
# Time from the last task update after which we will log a warning
|
||||
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
|
||||
|
||||
@@ -193,7 +193,7 @@ class TaskScheduler:
|
||||
result: Optional[JsonMapping] = None,
|
||||
error: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Update some task associated values. This is exposed publicly so it can
|
||||
"""Update some task associated values. This is exposed publically so it can
|
||||
be used inside task functions, mainly to update the result and be able to
|
||||
resume a task at a specific step after a restart of synapse.
|
||||
|
||||
@@ -377,7 +377,7 @@ class TaskScheduler:
|
||||
self._running_tasks.remove(task.id)
|
||||
|
||||
# Try launch a new task since we've finished with this one.
|
||||
self._clock.call_later(0.1, self._launch_scheduled_tasks)
|
||||
self._clock.call_later(1, self._launch_scheduled_tasks)
|
||||
|
||||
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
|
||||
return
|
||||
|
||||
@@ -35,10 +35,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
|
||||
typing = self.hs.get_typing_handler()
|
||||
assert isinstance(typing, TypingWriterHandler)
|
||||
|
||||
# Create a typing update before we reconnect so that there is a missing
|
||||
# update to fetch.
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
|
||||
self.reconnect()
|
||||
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
@@ -95,10 +91,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
|
||||
typing = self.hs.get_typing_handler()
|
||||
assert isinstance(typing, TypingWriterHandler)
|
||||
|
||||
# Create a typing update before we reconnect so that there is a missing
|
||||
# update to fetch.
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
|
||||
self.reconnect()
|
||||
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Callable, Tuple
|
||||
from typing import Callable, List, Tuple
|
||||
from unittest.mock import Mock, call
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -29,6 +29,7 @@ from synapse.storage.database import (
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.utils import USE_POSTGRES_FOR_TESTS
|
||||
|
||||
|
||||
class TupleComparisonClauseTestCase(unittest.TestCase):
|
||||
@@ -279,3 +280,84 @@ class CancellationTestCase(unittest.HomeserverTestCase):
|
||||
]
|
||||
)
|
||||
self.assertEqual(exception_callback.call_count, 6) # no additional calls
|
||||
|
||||
|
||||
class PostgresReplicaIdentityTestCase(unittest.HomeserverTestCase):
|
||||
if not USE_POSTGRES_FOR_TESTS:
|
||||
skip = "Requires Postgres"
|
||||
|
||||
def prepare(
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
self.db_pools = homeserver.get_datastores().databases
|
||||
|
||||
def test_all_tables_have_postgres_replica_identity(self) -> None:
|
||||
"""
|
||||
Tests that all tables have a Postgres REPLICA IDENTITY.
|
||||
(See https://github.com/matrix-org/synapse/issues/16224).
|
||||
|
||||
Tables with a PRIMARY KEY have an implied REPLICA IDENTITY and are fine.
|
||||
Other tables need them to be set with `ALTER TABLE`.
|
||||
|
||||
A REPLICA IDENTITY is required for Postgres logical replication to work
|
||||
properly without blocking updates and deletes.
|
||||
"""
|
||||
|
||||
sql = """
|
||||
-- Select tables that have no primary key and use the default replica identity rule
|
||||
-- (the default is to use the primary key)
|
||||
WITH tables_no_pkey AS (
|
||||
SELECT tbl.table_schema, tbl.table_name
|
||||
FROM information_schema.tables tbl
|
||||
WHERE table_type = 'BASE TABLE'
|
||||
AND table_schema not in ('pg_catalog', 'information_schema')
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM information_schema.key_column_usage kcu
|
||||
WHERE kcu.table_name = tbl.table_name
|
||||
AND kcu.table_schema = tbl.table_schema
|
||||
)
|
||||
)
|
||||
SELECT pg_class.oid::regclass FROM tables_no_pkey INNER JOIN pg_class ON pg_class.oid::regclass = table_name::regclass
|
||||
WHERE relreplident = 'd'
|
||||
|
||||
UNION
|
||||
|
||||
-- Also select tables that use an index as a replica identity
|
||||
-- but where the index doesn't exist
|
||||
-- (e.g. it could have been deleted)
|
||||
SELECT pg_class.oid::regclass
|
||||
FROM information_schema.tables tbl
|
||||
INNER JOIN pg_class ON pg_class.oid::regclass = table_name::regclass
|
||||
WHERE table_type = 'BASE TABLE'
|
||||
AND table_schema not in ('pg_catalog', 'information_schema')
|
||||
|
||||
-- 'i' means an index is used as the replica identity
|
||||
AND relreplident = 'i'
|
||||
|
||||
-- look for indices that are marked as the replica identity
|
||||
AND NOT EXISTS (
|
||||
SELECT indexrelid::regclass
|
||||
FROM pg_index
|
||||
WHERE indrelid = pg_class.oid::regclass AND indisreplident
|
||||
)
|
||||
"""
|
||||
|
||||
def _list_tables_with_missing_replica_identities_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[str]:
|
||||
txn.execute(sql)
|
||||
return [table_name for table_name, in txn]
|
||||
|
||||
for pool in self.db_pools:
|
||||
missing = self.get_success(
|
||||
pool.runInteraction(
|
||||
"test_list_missing_replica_identities",
|
||||
_list_tables_with_missing_replica_identities_txn,
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
len(missing),
|
||||
0,
|
||||
f"The following tables in the {pool.name()!r} database are missing REPLICA IDENTITIES: {missing!r}.",
|
||||
)
|
||||
|
||||
@@ -13,11 +13,7 @@
|
||||
# limitations under the License.
|
||||
from typing import Dict, Iterable, List, Sequence
|
||||
|
||||
from synapse.util.iterutils import (
|
||||
chunk_seq,
|
||||
sorted_topologically,
|
||||
sorted_topologically_batched,
|
||||
)
|
||||
from synapse.util.iterutils import chunk_seq, sorted_topologically
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
@@ -111,73 +107,3 @@ class SortTopologically(TestCase):
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
|
||||
|
||||
self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
|
||||
|
||||
|
||||
class SortTopologicallyBatched(TestCase):
|
||||
"Test cases for `sorted_topologically_batched`"
|
||||
|
||||
def test_empty(self) -> None:
|
||||
"Test that an empty graph works correctly"
|
||||
|
||||
graph: Dict[int, List[int]] = {}
|
||||
self.assertEqual(list(sorted_topologically_batched([], graph)), [])
|
||||
|
||||
def test_handle_empty_graph(self) -> None:
|
||||
"Test that a graph where a node doesn't have an entry is treated as empty"
|
||||
|
||||
graph: Dict[int, List[int]] = {}
|
||||
|
||||
# For disconnected nodes the output is simply sorted.
|
||||
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
|
||||
|
||||
def test_disconnected(self) -> None:
|
||||
"Test that a graph with no edges work"
|
||||
|
||||
graph: Dict[int, List[int]] = {1: [], 2: []}
|
||||
|
||||
# For disconnected nodes the output is simply sorted.
|
||||
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
|
||||
|
||||
def test_linear(self) -> None:
|
||||
"Test that a simple `4 -> 3 -> 2 -> 1` graph works"
|
||||
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
|
||||
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
|
||||
[[1], [2], [3], [4]],
|
||||
)
|
||||
|
||||
def test_subset(self) -> None:
|
||||
"Test that only sorting a subset of the graph works"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
|
||||
|
||||
self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])
|
||||
|
||||
def test_fork(self) -> None:
|
||||
"Test that a forked graph works"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
|
||||
|
||||
# Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
|
||||
# always get the same one.
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
|
||||
)
|
||||
|
||||
def test_duplicates(self) -> None:
|
||||
"Test that a graph with duplicate edges work"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
|
||||
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
|
||||
[[1], [2], [3], [4]],
|
||||
)
|
||||
|
||||
def test_multiple_paths(self) -> None:
|
||||
"Test that a graph with multiple paths between two nodes work"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
|
||||
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
|
||||
[[1], [2], [3], [4]],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user