1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Patrick Cloke
9622bda163 Abstract logic for setting the statement timeout. 2023-11-15 15:11:51 -05:00
55 changed files with 350 additions and 523 deletions

View File

@@ -130,7 +130,7 @@ jobs:
python-version: "3.x"
- name: Install cibuildwheel
run: python -m pip install cibuildwheel==2.16.2
run: python -m pip install cibuildwheel==2.9.0
- name: Set up QEMU to emulate aarch64
if: matrix.arch == 'aarch64'

View File

@@ -1,104 +1,3 @@
# Synapse 1.97.0 (2023-11-28)
Synapse will soon be forked by Element under an AGPLv3.0 licence (with CLA, for
proprietary dual licensing). You can read more about this here:
- https://matrix.org/blog/2023/11/06/future-of-synapse-dendrite/
- https://element.io/blog/element-to-adopt-agplv3/
The Matrix.org Foundation copy of the project will be archived. Any changes needed
by server administrators will be communicated via our usual announcements channels,
but we are striving to make this as seamless as possible.
No significant changes since 1.97.0rc1.
# Synapse 1.97.0rc1 (2023-11-21)
### Features
- Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper. ([\#15503](https://github.com/matrix-org/synapse/issues/15503))
- Improve the performance of some operations in multi-worker deployments. ([\#16613](https://github.com/matrix-org/synapse/issues/16613), [\#16616](https://github.com/matrix-org/synapse/issues/16616))
### Bugfixes
- Fix a long-standing bug where some queries updated the same row twice. Introduced in Synapse 1.57.0. ([\#16609](https://github.com/matrix-org/synapse/issues/16609))
- Fix a long-standing bug where Synapse would not unbind third-party identifiers for Application Service users when deactivated and would not emit a compliant response. ([\#16617](https://github.com/matrix-org/synapse/issues/16617))
- Fix sending out of order `POSITION` over replication, causing additional database load. ([\#16639](https://github.com/matrix-org/synapse/issues/16639))
### Improved Documentation
- Note that the option [`outbound_federation_restricted_to`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#outbound_federation_restricted_to) was added in Synapse 1.89.0, and fix a nearby formatting error. ([\#16628](https://github.com/matrix-org/synapse/issues/16628))
- Update parameter information for the `/timestamp_to_event` admin API. ([\#16631](https://github.com/matrix-org/synapse/issues/16631))
- Provide an example for a common encrypted media response from the admin user media API and mention possible null values. ([\#16654](https://github.com/matrix-org/synapse/issues/16654))
### Internal Changes
- Remove whole table locks on push rule modifications. Contributed by Nick @ Beeper (@fizzadar). ([\#16051](https://github.com/matrix-org/synapse/issues/16051))
- Support reactor tick timings on more types of event loops. ([\#16532](https://github.com/matrix-org/synapse/issues/16532))
- Improve type hints. ([\#16564](https://github.com/matrix-org/synapse/issues/16564), [\#16611](https://github.com/matrix-org/synapse/issues/16611), [\#16612](https://github.com/matrix-org/synapse/issues/16612))
- Avoid executing no-op queries. ([\#16583](https://github.com/matrix-org/synapse/issues/16583))
- Simplify persistence code to be per-room. ([\#16584](https://github.com/matrix-org/synapse/issues/16584))
- Use standard SQL helpers in persistence code. ([\#16585](https://github.com/matrix-org/synapse/issues/16585))
- Avoid updating the stream cache unnecessarily. ([\#16586](https://github.com/matrix-org/synapse/issues/16586))
- Improve performance when using opentracing. ([\#16589](https://github.com/matrix-org/synapse/issues/16589))
- Run push rule evaluator setup in parallel. ([\#16590](https://github.com/matrix-org/synapse/issues/16590))
- Improve tests of the SQL generator. ([\#16596](https://github.com/matrix-org/synapse/issues/16596))
- Use more generic database methods. ([\#16615](https://github.com/matrix-org/synapse/issues/16615))
- Use `dbname` instead of the deprecated `database` connection parameter for psycopg2. ([\#16618](https://github.com/matrix-org/synapse/issues/16618))
- Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA. ([\#16634](https://github.com/matrix-org/synapse/issues/16634))
- Improve references to GitHub issues. ([\#16637](https://github.com/matrix-org/synapse/issues/16637), [\#16638](https://github.com/matrix-org/synapse/issues/16638))
- More efficiently handle no-op `POSITION` over replication. ([\#16640](https://github.com/matrix-org/synapse/issues/16640), [\#16655](https://github.com/matrix-org/synapse/issues/16655))
- Speed up deleting of device messages when deleting a device. ([\#16643](https://github.com/matrix-org/synapse/issues/16643))
- Speed up persisting large number of outliers. ([\#16649](https://github.com/matrix-org/synapse/issues/16649))
- Reduce max concurrency of background tasks, reducing potential max DB load. ([\#16656](https://github.com/matrix-org/synapse/issues/16656), [\#16660](https://github.com/matrix-org/synapse/issues/16660))
- Speed up purge room by adding an index to `event_push_summary`. ([\#16657](https://github.com/matrix-org/synapse/issues/16657))
### Updates to locked dependencies
* Bump prometheus-client from 0.17.1 to 0.18.0. ([\#16626](https://github.com/matrix-org/synapse/issues/16626))
* Bump pyicu from 2.11 to 2.12. ([\#16603](https://github.com/matrix-org/synapse/issues/16603))
* Bump requests-toolbelt from 0.10.1 to 1.0.0. ([\#16659](https://github.com/matrix-org/synapse/issues/16659))
* Bump ruff from 0.0.292 to 0.1.4. ([\#16600](https://github.com/matrix-org/synapse/issues/16600))
* Bump serde from 1.0.190 to 1.0.192. ([\#16627](https://github.com/matrix-org/synapse/issues/16627))
* Bump serde_json from 1.0.107 to 1.0.108. ([\#16604](https://github.com/matrix-org/synapse/issues/16604))
* Bump setuptools-rust from 1.8.0 to 1.8.1. ([\#16601](https://github.com/matrix-org/synapse/issues/16601))
* Bump towncrier from 23.6.0 to 23.11.0. ([\#16622](https://github.com/matrix-org/synapse/issues/16622))
* Bump treq from 22.2.0 to 23.11.0. ([\#16623](https://github.com/matrix-org/synapse/issues/16623))
* Bump twisted from 23.8.0 to 23.10.0. ([\#16588](https://github.com/matrix-org/synapse/issues/16588))
* Bump types-bleach from 6.1.0.0 to 6.1.0.1. ([\#16624](https://github.com/matrix-org/synapse/issues/16624))
* Bump types-jsonschema from 4.19.0.3 to 4.19.0.4. ([\#16599](https://github.com/matrix-org/synapse/issues/16599))
* Bump types-pyopenssl from 23.2.0.2 to 23.3.0.0. ([\#16625](https://github.com/matrix-org/synapse/issues/16625))
* Bump types-pyyaml from 6.0.12.11 to 6.0.12.12. ([\#16602](https://github.com/matrix-org/synapse/issues/16602))
# Synapse 1.96.1 (2023-11-17)
Synapse will soon be forked by Element under an AGPLv3.0 licence (with CLA, for
proprietary dual licensing). You can read more about this here:
* https://matrix.org/blog/2023/11/06/future-of-synapse-dendrite/
* https://element.io/blog/element-to-adopt-agplv3/
The Matrix.org Foundation copy of the project will be archived. Any changes needed
by server administrators will be communicated via our usual
[announcements channels](https://matrix.to/#/#homeowners:matrix.org), but we are
striving to make this as seamless as possible.
This minor release was needed only because of CI-related trouble on [v1.96.0](https://github.com/matrix-org/synapse/releases/tag/v1.96.0), which was never released.
### Internal Changes
- Fix building of wheels in CI. ([\#16653](https://github.com/matrix-org/synapse/issues/16653))
# Synapse 1.96.0 (2023-11-16)
### Bugfixes
- Fix "'int' object is not iterable" error in `set_device_id_for_pushers` background update introduced in Synapse 1.95.0. ([\#16594](https://github.com/matrix-org/synapse/issues/16594))
# Synapse 1.96.0rc1 (2023-10-31)
### Features

View File

@@ -0,0 +1 @@
Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper.

1
changelog.d/16051.misc Normal file
View File

@@ -0,0 +1 @@
Remove whole table locks on push rule modifications. Contributed by Nick @ Beeper (@fizzadar).

1
changelog.d/16456.misc Normal file
View File

@@ -0,0 +1 @@
Add a Postgres `REPLICA IDENTITY` to tables that do not have an implicit one. This should allow use of Postgres logical replication.

1
changelog.d/16532.misc Normal file
View File

@@ -0,0 +1 @@
Support reactor tick timings on more types of event loops.

1
changelog.d/16564.misc Normal file
View File

@@ -0,0 +1 @@
Improve type hints.

1
changelog.d/16583.misc Normal file
View File

@@ -0,0 +1 @@
Avoid executing no-op queries.

1
changelog.d/16584.misc Normal file
View File

@@ -0,0 +1 @@
Simplify persistance code to be per-room.

1
changelog.d/16585.misc Normal file
View File

@@ -0,0 +1 @@
Use standard SQL helpers in persistence code.

1
changelog.d/16586.misc Normal file
View File

@@ -0,0 +1 @@
Avoid updating the stream cache unnecessarily.

1
changelog.d/16588.misc Normal file
View File

@@ -0,0 +1 @@
Bump twisted from 23.8.0 to 23.10.0.

1
changelog.d/16589.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance when using opentracing.

1
changelog.d/16590.misc Normal file
View File

@@ -0,0 +1 @@
Run push rule evaluator setup in parallel.

1
changelog.d/16596.misc Normal file
View File

@@ -0,0 +1 @@
Improve tests of the SQL generator.

1
changelog.d/16605.misc Normal file
View File

@@ -0,0 +1 @@
Bump setuptools-rust from 1.8.0 to 1.8.1.

1
changelog.d/16609.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where some queries updated the same row twice. Introduced in Synapse 1.57.0.

1
changelog.d/16611.misc Normal file
View File

@@ -0,0 +1 @@
Improve type hints.

1
changelog.d/16612.misc Normal file
View File

@@ -0,0 +1 @@
Improve type hints.

View File

@@ -0,0 +1 @@
Improve the performance of some operations in multi-worker deployments.

1
changelog.d/16615.misc Normal file
View File

@@ -0,0 +1 @@
Use more generic database methods.

View File

@@ -0,0 +1 @@
Improve the performance of some operations in multi-worker deployments.

1
changelog.d/16617.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where Synapse would not unbind third-party identifiers for Application Service users when deactivated and would not emit a compliant response.

1
changelog.d/16618.misc Normal file
View File

@@ -0,0 +1 @@
Use `dbname` instead of the deprecated `database` connection parameter for psycopg2.

1
changelog.d/16628.doc Normal file
View File

@@ -0,0 +1 @@
Note that the option [`outbound_federation_restricted_to`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#outbound_federation_restricted_to) was added in Synapse 1.89.0, and fix a nearby formatting error.

1
changelog.d/16631.doc Normal file
View File

@@ -0,0 +1 @@
Update parameter information for the `/timestamp_to_event` admin API.

1
changelog.d/16634.misc Normal file
View File

@@ -0,0 +1 @@
Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA.

1
changelog.d/16637.misc Normal file
View File

@@ -0,0 +1 @@
Improve references to GitHub issues.

1
changelog.d/16638.misc Normal file
View File

@@ -0,0 +1 @@
Improve references to GitHub issues.

24
debian/changelog vendored
View File

@@ -1,27 +1,3 @@
matrix-synapse-py3 (1.97.0) stable; urgency=medium
* New Synapse release 1.97.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 28 Nov 2023 14:08:58 +0000
matrix-synapse-py3 (1.97.0~rc1) stable; urgency=medium
* New Synapse release 1.97.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 21 Nov 2023 12:32:03 +0000
matrix-synapse-py3 (1.96.1) stable; urgency=medium
* New synapse release 1.96.1.
-- Synapse Packaging team <packages@matrix.org> Fri, 17 Nov 2023 12:48:45 +0000
matrix-synapse-py3 (1.96.0) stable; urgency=medium
* New synapse release 1.96.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 16 Nov 2023 17:54:26 +0000
matrix-synapse-py3 (1.96.0~rc1) stable; urgency=medium
* New Synapse release 1.96.0rc1.

View File

@@ -618,16 +618,6 @@ A response body like the following is returned:
"quarantined_by": null,
"safe_from_quarantine": false,
"upload_name": "test2.png"
},
{
"created_ts": 300400,
"last_access_ts": 300700,
"media_id": "BzYNLRUgGHphBkdKGbzXwbjX",
"media_length": 1337,
"media_type": "application/octet-stream",
"quarantined_by": null,
"safe_from_quarantine": false,
"upload_name": null
}
],
"next_token": 3,
@@ -689,17 +679,16 @@ The following fields are returned in the JSON response body:
- `media` - An array of objects, each containing information about a media.
Media objects contain the following fields:
- `created_ts` - integer - Timestamp when the content was uploaded in ms.
- `last_access_ts` - integer or null - Timestamp when the content was last accessed in ms.
Null if there was no access, yet.
- `last_access_ts` - integer - Timestamp when the content was last accessed in ms.
- `media_id` - string - The id used to refer to the media. Details about the format
are documented under
[media repository](../media_repository.md).
- `media_length` - integer - Length of the media in bytes.
- `media_type` - string - The MIME-type of the media.
- `quarantined_by` - string or null - The user ID that initiated the quarantine request
for this media. Null if not quarantined.
- `quarantined_by` - string - The user ID that initiated the quarantine request
for this media.
- `safe_from_quarantine` - bool - Status if this media is safe from quarantining.
- `upload_name` - string or null - The name the media was uploaded with. Null if not provided during upload.
- `upload_name` - string - The name the media was uploaded with.
- `next_token`: integer - Indication for pagination. See above.
- `total` - integer - Total number of media.

8
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
[[package]]
name = "alabaster"
@@ -2273,13 +2273,13 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "requests-toolbelt"
version = "1.0.0"
version = "0.10.1"
description = "A utility belt for advanced users of python-requests"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
{file = "requests-toolbelt-1.0.0.tar.gz", hash = "sha256:7681a0a3d047012b5bdc0ee37d7f8f07ebe76ab08caeccfc3921ce23c88d5bc6"},
{file = "requests_toolbelt-1.0.0-py2.py3-none-any.whl", hash = "sha256:cccfdd665f0a24fcf4726e690f65639d272bb0637b9b92dfd91a5568ccf6bd06"},
{file = "requests-toolbelt-0.10.1.tar.gz", hash = "sha256:62e09f7ff5ccbda92772a29f394a49c3ad6cb181d568b1337626b2abb628a63d"},
{file = "requests_toolbelt-0.10.1-py2.py3-none-any.whl", hash = "sha256:18565aa58116d9951ac39baa288d3adb5b3ff975c4f25eee78555d89e8f247f7"},
]
[package.dependencies]

View File

@@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.97.0"
version = "1.96.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"

View File

@@ -383,7 +383,7 @@ class DeviceWorkerHandler:
)
DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
DEVICE_MSGS_DELETE_SLEEP_MS = 100
DEVICE_MSGS_DELETE_SLEEP_MS = 1000
async def _delete_device_messages(
self,
@@ -396,17 +396,15 @@ class DeviceWorkerHandler:
up_to_stream_id = task.params["up_to_stream_id"]
# Delete the messages in batches to avoid too much DB load.
from_stream_id = None
while True:
from_stream_id, _ = await self.store.delete_messages_for_device_between(
res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
if from_stream_id is None:
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)

View File

@@ -88,7 +88,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.iterutils import batch_iter, partition
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -1669,13 +1669,14 @@ class FederationEventHandler:
# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
# build a list of events whose auth events are not in the queue.
roots = tuple(
ev
for ev in event_map.values()
if not any(aid in event_map for aid in ev.auth_event_ids())
)
# We need to persist an event's auth events before the event.
auth_graph = {
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
@@ -1697,6 +1698,9 @@ class FederationEventHandler:
await self._auth_and_persist_outliers_inner(room_id, roots)
for ev in roots:
del event_map[ev.event_id]
async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase]
) -> None:

View File

@@ -257,11 +257,6 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)
# Marks if we should send POSITION commands for all streams ASAP. This
# is checked by the `ReplicationStreamer` which manages sending
# RDATA/POSITION commands
self._should_announce_positions = True
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -402,23 +397,29 @@ class ReplicationCommandHandler:
return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
self.send_positions_to_connection()
self.send_positions_to_connection(conn)
def send_positions_to_connection(self) -> None:
def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
self._should_announce_positions = True
self._notifier.notify_replication()
def should_announce_positions(self) -> bool:
"""Check if we should send POSITION commands for all streams ASAP."""
return self._should_announce_positions
def will_announce_positions(self) -> None:
"""Mark that we're about to send POSITIONs out for all streams."""
self._should_announce_positions = False
# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
# Note that we use the current token as the prev token here (rather
# than stream.last_token), as we can't be sure that there have been
# no rows written between last token and the current token (since we
# might be racing with the replication sending bg process).
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
current_token,
current_token,
)
)
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -587,21 +588,6 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
# Check if we can early discard this position. We can only do so for
# connected streams.
stream = self._streams[cmd.stream_name]
if stream.can_discard_position(
cmd.instance_name, cmd.prev_token, cmd.new_token
) and self.is_stream_connected(conn, cmd.stream_name):
logger.debug(
"Discarding redundant POSITION %s/%s %s %s",
cmd.instance_name,
cmd.stream_name,
cmd.prev_token,
cmd.new_token,
)
return
self._add_command_to_stream_queue(conn, cmd)
async def _process_position(
@@ -613,18 +599,6 @@ class ReplicationCommandHandler:
"""
stream = self._streams[stream_name]
if stream.can_discard_position(
cmd.instance_name, cmd.prev_token, cmd.new_token
) and self.is_stream_connected(conn, cmd.stream_name):
logger.debug(
"Discarding redundant POSITION %s/%s %s %s",
cmd.instance_name,
cmd.stream_name,
cmd.prev_token,
cmd.new_token,
)
return
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
for streams in self._streams_by_connection.values():
@@ -652,9 +626,8 @@ class ReplicationCommandHandler:
# for why this can happen.
logger.info(
"Fetching replication rows for '%s' / %s between %i and %i",
"Fetching replication rows for '%s' between %i and %i",
stream_name,
cmd.instance_name,
current_token,
cmd.new_token,
)
@@ -684,13 +657,6 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
def is_stream_connected(
self, conn: IReplicationConnection, stream_name: str
) -> bool:
"""Return if stream has been successfully connected and is ready to
receive updates"""
return stream_name in self._streams_by_connection.get(conn, ())
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None:

View File

@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
self.synapse_handler.send_positions_to_connection()
self.synapse_handler.send_positions_to_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis."""

View File

@@ -123,7 +123,7 @@ class ReplicationStreamer:
# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
if not self.command_handler.should_announce_positions() and all(
if all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
@@ -158,21 +158,6 @@ class ReplicationStreamer:
all_streams = list(all_streams)
random.shuffle(all_streams)
if self.command_handler.should_announce_positions():
# We need to send out POSITIONs for all streams, usually
# because a worker has reconnected.
self.command_handler.will_announce_positions()
for stream in all_streams:
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.last_token,
stream.last_token,
)
)
for stream in all_streams:
if stream.last_token == stream.current_token(
self._instance_name

View File

@@ -144,16 +144,6 @@ class Stream:
"""
raise NotImplementedError()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
"""Whether or not a position command for this stream can be discarded.
Useful for streams that can never go backwards and where we already know
the stream ID for the instance has advanced.
"""
return False
def discard_updates_and_advance(self) -> None:
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
@@ -231,14 +221,6 @@ class _StreamFromIdGen(Stream):
def minimal_local_current_token(self) -> Token:
return self._stream_id_gen.get_minimal_local_current_token()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# These streams can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
def current_token_without_instance(
current_token: Callable[[], int]
@@ -305,14 +287,6 @@ class BackfillStream(Stream):
# which means we need to negate it.
return -self.store._backfill_id_gen.get_minimal_local_current_token()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# Backfill stream can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
class PresenceStream(_StreamFromIdGen):
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -527,14 +501,6 @@ class CachesStream(Stream):
return self.store._cache_id_gen.get_minimal_local_current_token()
return self.current_token(self.local_instance_name)
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# Caches streams can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
class DeviceListsStream(_StreamFromIdGen):
"""Either a user has updated their devices or a remote server needs to be

View File

@@ -768,8 +768,9 @@ class BackgroundUpdater:
# override the global statement timeout to avoid accidentally squashing
# a long-running index creation process
timeout_sql = "SET SESSION statement_timeout = 0"
c.execute(timeout_sql)
self.db_pool.engine.attempt_to_set_statement_timeout(
c, 0, for_transaction=True
)
sql = (
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
@@ -791,12 +792,6 @@ class BackgroundUpdater:
logger.debug("[SQL] %s", sql)
c.execute(sql)
finally:
# mypy ignore - `statement_timeout` is defined on PostgresEngine
# reset the global timeout to the default
default_timeout = self.db_pool.engine.statement_timeout # type: ignore[attr-defined]
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql)
conn.engine.attempt_to_set_autocommit(conn.conn, False)
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:

View File

@@ -450,12 +450,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
limit: Optional[int] = None,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -476,22 +478,32 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
from_stream_id = None
count = 0
while True:
from_stream_id, loop_count = await self.delete_messages_for_device_between(
user_id,
device_id,
from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
limit=1000,
)
count += loop_count
if from_stream_id is None:
break
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
limit_statement = "" if limit is None else f"LIMIT {limit}"
sql = f"""
DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
ORDER BY stream_id
{limit_statement}
) AS q1
)
"""
txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
return txn.rowcount
count = await self.db_pool.runInteraction(
"delete_messages_for_device", delete_messages_for_device_txn
)
log_kv({"message": f"deleted {count} messages for device", "count": count})
# In this case we don't know if we hit the limit or the delete is complete
# so let's not update the cache.
if count == limit:
return count
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -502,74 +514,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
@trace
async def delete_messages_for_device_between(
self,
user_id: str,
device_id: Optional[str],
from_stream_id: Optional[int],
to_stream_id: int,
limit: int,
) -> Tuple[Optional[int], int]:
"""Delete N device messages between the stream IDs, returning the
highest stream ID deleted (or None if all messages in the range have
been deleted) and the number of messages deleted.
This is more efficient than `delete_messages_for_device` when calling in
a loop to batch delete messages.
"""
# Keeping track of a lower bound of stream ID where we've deleted
# everything below makes the queries much faster. Otherwise, every time
# we scan for rows to delete we'd re-scan across all the rows that have
# previously deleted (until the next table VACUUM).
if from_stream_id is None:
# Minimum device stream ID is 1.
from_stream_id = 0
def delete_messages_for_device_between_txn(
txn: LoggingTransaction,
) -> Tuple[Optional[int], int]:
txn.execute(
"""
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
) AS d
""",
(user_id, device_id, from_stream_id, to_stream_id, limit),
)
row = txn.fetchone()
if row is None or row[0] is None:
return None, 0
(max_stream_id,) = row
txn.execute(
"""
DELETE FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
""",
(user_id, device_id, from_stream_id, max_stream_id),
)
num_deleted = txn.rowcount
if num_deleted < limit:
return None, num_deleted
return max_stream_id, num_deleted
return await self.db_pool.runInteraction(
"delete_messages_for_device_between",
delete_messages_for_device_between_txn,
db_autocommit=True, # We don't need to run in a transaction
)
@trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int

View File

@@ -311,14 +311,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
self._background_drop_null_thread_id_indexes,
)
# Add a room ID index to speed up room deletion
self.db_pool.updates.register_background_index_update(
"event_push_summary_index_room_id",
index_name="event_push_summary_index_room_id",
table="event_push_summary",
columns=["room_id"],
)
async def _background_drop_null_thread_id_indexes(
self, progress: JsonDict, batch_size: int
) -> int:

View File

@@ -89,10 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# furthermore, we might already have the table from a previous (failed)
# purge attempt, so let's drop the table first.
if isinstance(self.database_engine, PostgresEngine):
# Disable statement timeouts for this transaction; purging rooms can
# take a while!
txn.execute("SET LOCAL statement_timeout = 0")
# Disable statement timeouts for this transaction; purging rooms can
# take a while!
self.database_engine.attempt_to_set_statement_timeout(
txn, 0, for_transaction=True
)
txn.execute("DROP TABLE IF EXISTS events_to_purge")

View File

@@ -601,7 +601,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
(last_pusher_id, batch_size),
)
rows = cast(List[Tuple[int, Optional[str], Optional[str]]], txn.fetchall())
rows = txn.fetchall()
if len(rows) == 0:
return 0
@@ -617,7 +617,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
txn=txn,
table="pushers",
key_names=("id",),
key_values=[(row[0],) for row in rows],
key_values=[row[0] for row in rows],
value_names=("device_id", "access_token"),
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,

View File

@@ -36,6 +36,9 @@ CursorType = TypeVar("CursorType", bound=Cursor)
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
# The default statement timeout to use for transactions.
statement_timeout: Optional[int] = None
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
self.module = module
@@ -132,6 +135,16 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""
...
@abc.abstractmethod
def attempt_to_set_statement_timeout(
self, cursor: CursorType, statement_timeout: int, for_transaction: bool
) -> None:
"""Attempt to set the cursor's statement timeout.
Note this has no effect on SQLite3.
"""
...
@staticmethod
@abc.abstractmethod
def executescript(cursor: CursorType, script: str) -> None:

View File

@@ -52,7 +52,7 @@ class PostgresEngine(
# some degenerate query plan has been created and the client has probably
# timed out/walked off anyway.
# This is in milliseconds.
self.statement_timeout: Optional[int] = database_config.get(
self.statement_timeout = database_config.get(
"statement_timeout", 60 * 60 * 1000
)
self._version: Optional[int] = None # unknown as yet
@@ -169,7 +169,11 @@ class PostgresEngine(
# Abort really long-running statements and turn them into errors.
if self.statement_timeout is not None:
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
self.attempt_to_set_statement_timeout(
cast(psycopg2.extensions.cursor, cursor.txn),
self.statement_timeout,
for_transaction=False,
)
cursor.close()
db_conn.commit()
@@ -233,6 +237,18 @@ class PostgresEngine(
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level)
def attempt_to_set_statement_timeout(
self,
cursor: psycopg2.extensions.cursor,
statement_timeout: int,
for_transaction: bool,
) -> None:
if for_transaction:
sql = "SET LOCAL statement_timeout TO ?"
else:
sql = "SET statement_timeout TO ?"
cursor.execute(sql, (statement_timeout,))
@staticmethod
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.

View File

@@ -143,6 +143,12 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
# All transactions are SERIALIZABLE by default in sqlite
pass
def attempt_to_set_statement_timeout(
self, cursor: sqlite3.Cursor, statement_timeout: int, for_transaction: bool
) -> None:
# Not supported.
pass
@staticmethod
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.

View File

@@ -0,0 +1,88 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- Where possible, re-use unique indices already defined on tables as a replica
-- identity.
ALTER TABLE appservice_room_list REPLICA IDENTITY USING INDEX appservice_room_list_idx;
ALTER TABLE batch_events REPLICA IDENTITY USING INDEX chunk_events_event_id;
ALTER TABLE blocked_rooms REPLICA IDENTITY USING INDEX blocked_rooms_idx;
ALTER TABLE cache_invalidation_stream_by_instance REPLICA IDENTITY USING INDEX cache_invalidation_stream_by_instance_id;
ALTER TABLE device_lists_changes_in_room REPLICA IDENTITY USING INDEX device_lists_changes_in_stream_id;
ALTER TABLE device_lists_outbound_last_success REPLICA IDENTITY USING INDEX device_lists_outbound_last_success_unique_idx;
ALTER TABLE device_lists_remote_cache REPLICA IDENTITY USING INDEX device_lists_remote_cache_unique_id;
ALTER TABLE device_lists_remote_extremeties REPLICA IDENTITY USING INDEX device_lists_remote_extremeties_unique_idx;
ALTER TABLE device_lists_remote_resync REPLICA IDENTITY USING INDEX device_lists_remote_resync_idx;
ALTER TABLE e2e_cross_signing_keys REPLICA IDENTITY USING INDEX e2e_cross_signing_keys_stream_idx;
ALTER TABLE e2e_room_keys REPLICA IDENTITY USING INDEX e2e_room_keys_with_version_idx;
ALTER TABLE e2e_room_keys_versions REPLICA IDENTITY USING INDEX e2e_room_keys_versions_idx;
ALTER TABLE erased_users REPLICA IDENTITY USING INDEX erased_users_user;
ALTER TABLE event_relations REPLICA IDENTITY USING INDEX event_relations_id;
ALTER TABLE federation_inbound_events_staging REPLICA IDENTITY USING INDEX federation_inbound_events_staging_instance_event;
ALTER TABLE federation_stream_position REPLICA IDENTITY USING INDEX federation_stream_position_instance;
ALTER TABLE ignored_users REPLICA IDENTITY USING INDEX ignored_users_uniqueness;
ALTER TABLE insertion_events REPLICA IDENTITY USING INDEX insertion_events_event_id;
ALTER TABLE insertion_event_extremities REPLICA IDENTITY USING INDEX insertion_event_extremities_event_id;
ALTER TABLE monthly_active_users REPLICA IDENTITY USING INDEX monthly_active_users_users;
ALTER TABLE ratelimit_override REPLICA IDENTITY USING INDEX ratelimit_override_idx;
ALTER TABLE room_stats_earliest_token REPLICA IDENTITY USING INDEX room_stats_earliest_token_idx;
ALTER TABLE room_stats_state REPLICA IDENTITY USING INDEX room_stats_state_room;
ALTER TABLE stream_positions REPLICA IDENTITY USING INDEX stream_positions_idx;
ALTER TABLE user_directory REPLICA IDENTITY USING INDEX user_directory_user_idx;
ALTER TABLE user_directory_search REPLICA IDENTITY USING INDEX user_directory_search_user_idx;
ALTER TABLE user_ips REPLICA IDENTITY USING INDEX user_ips_user_token_ip_unique_index;
ALTER TABLE user_signature_stream REPLICA IDENTITY USING INDEX user_signature_stream_idx;
ALTER TABLE users_in_public_rooms REPLICA IDENTITY USING INDEX users_in_public_rooms_u_idx;
ALTER TABLE users_who_share_private_rooms REPLICA IDENTITY USING INDEX users_who_share_private_rooms_u_idx;
ALTER TABLE user_threepid_id_server REPLICA IDENTITY USING INDEX user_threepid_id_server_idx;
ALTER TABLE worker_locks REPLICA IDENTITY USING INDEX worker_locks_key;
-- Where there are no unique indices, use the entire rows as replica identities.
ALTER TABLE current_state_delta_stream REPLICA IDENTITY FULL;
ALTER TABLE deleted_pushers REPLICA IDENTITY FULL;
ALTER TABLE device_auth_providers REPLICA IDENTITY FULL;
ALTER TABLE device_federation_inbox REPLICA IDENTITY FULL;
ALTER TABLE device_federation_outbox REPLICA IDENTITY FULL;
ALTER TABLE device_inbox REPLICA IDENTITY FULL;
ALTER TABLE device_lists_outbound_pokes REPLICA IDENTITY FULL;
ALTER TABLE device_lists_stream REPLICA IDENTITY FULL;
ALTER TABLE e2e_cross_signing_signatures REPLICA IDENTITY FULL;
ALTER TABLE event_auth_chain_links REPLICA IDENTITY FULL;
ALTER TABLE event_auth REPLICA IDENTITY FULL;
ALTER TABLE event_push_actions_staging REPLICA IDENTITY FULL;
ALTER TABLE insertion_event_edges REPLICA IDENTITY FULL;
ALTER TABLE local_media_repository_url_cache REPLICA IDENTITY FULL;
ALTER TABLE presence_stream REPLICA IDENTITY FULL;
ALTER TABLE push_rules_stream REPLICA IDENTITY FULL;
ALTER TABLE room_alias_servers REPLICA IDENTITY FULL;
ALTER TABLE stream_ordering_to_exterm REPLICA IDENTITY FULL;
ALTER TABLE timeline_gaps REPLICA IDENTITY FULL;
ALTER TABLE user_daily_visits REPLICA IDENTITY FULL;
ALTER TABLE users_pending_deactivation REPLICA IDENTITY FULL;
-- special cases: unique indices on nullable columns can't be used
ALTER TABLE event_push_summary REPLICA IDENTITY FULL;
ALTER TABLE event_search REPLICA IDENTITY FULL;
ALTER TABLE local_media_repository_thumbnails REPLICA IDENTITY FULL;
ALTER TABLE remote_media_cache_thumbnails REPLICA IDENTITY FULL;
ALTER TABLE threepid_guest_access_tokens REPLICA IDENTITY FULL;
ALTER TABLE user_filters REPLICA IDENTITY FULL; -- sadly the `CHECK` constraint is not enough here

View File

@@ -1,17 +0,0 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8306, 'event_push_summary_index_room_id', '{}');

View File

@@ -0,0 +1,30 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- See also: 82/04_replica_identities.sql.postgres on the main database
-- Where possible, re-use unique indices already defined on tables as a replica
-- identity.
ALTER TABLE state_group_edges REPLICA IDENTITY USING INDEX state_group_edges_unique_idx;
-- Where there are no unique indices, use the entire rows as replica identities.
ALTER TABLE state_groups_state REPLICA IDENTITY FULL;

View File

@@ -135,54 +135,3 @@ def sorted_topologically(
degree_map[edge] -= 1
if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge)
def sorted_topologically_batched(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> Generator[Collection[T], None, None]:
r"""Walk the graph topologically, returning batches of nodes where all nodes
that references it have been previously returned.
For example, given the following graph:
A
/ \
B C
\ /
D
This function will return: `[[A], [B, C], [D]]`.
This function is useful for e.g. batch persisting events in an auth chain,
where we can only persist an event if all its auth events have already been
persisted.
"""
degree_map = {node: 0 for node in nodes}
reverse_graph: Dict[T, Set[T]] = {}
for node, edges in graph.items():
if node not in degree_map:
continue
for edge in set(edges):
if edge in degree_map:
degree_map[node] += 1
reverse_graph.setdefault(edge, set()).add(node)
reverse_graph.setdefault(node, set())
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
while zero_degree:
new_zero_degree = []
for node in zero_degree:
for edge in reverse_graph.get(node, []):
if edge in degree_map:
degree_map[edge] -= 1
if degree_map[edge] == 0:
new_zero_degree.append(edge)
yield zero_degree
zero_degree = new_zero_degree

View File

@@ -71,7 +71,7 @@ class TaskScheduler:
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
# Maximum number of tasks that can run at the same time
MAX_CONCURRENT_RUNNING_TASKS = 5
MAX_CONCURRENT_RUNNING_TASKS = 10
# Time from the last task update after which we will log a warning
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
@@ -193,7 +193,7 @@ class TaskScheduler:
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values. This is exposed publicly so it can
"""Update some task associated values. This is exposed publically so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
@@ -377,7 +377,7 @@ class TaskScheduler:
self._running_tasks.remove(task.id)
# Try launch a new task since we've finished with this one.
self._clock.call_later(0.1, self._launch_scheduled_tasks)
self._clock.call_later(1, self._launch_scheduled_tasks)
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return

View File

@@ -35,10 +35,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)
# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
@@ -95,10 +91,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)
# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Tuple
from typing import Callable, List, Tuple
from unittest.mock import Mock, call
from twisted.internet import defer
@@ -29,6 +29,7 @@ from synapse.storage.database import (
from synapse.util import Clock
from tests import unittest
from tests.utils import USE_POSTGRES_FOR_TESTS
class TupleComparisonClauseTestCase(unittest.TestCase):
@@ -279,3 +280,84 @@ class CancellationTestCase(unittest.HomeserverTestCase):
]
)
self.assertEqual(exception_callback.call_count, 6) # no additional calls
class PostgresReplicaIdentityTestCase(unittest.HomeserverTestCase):
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.db_pools = homeserver.get_datastores().databases
def test_all_tables_have_postgres_replica_identity(self) -> None:
"""
Tests that all tables have a Postgres REPLICA IDENTITY.
(See https://github.com/matrix-org/synapse/issues/16224).
Tables with a PRIMARY KEY have an implied REPLICA IDENTITY and are fine.
Other tables need them to be set with `ALTER TABLE`.
A REPLICA IDENTITY is required for Postgres logical replication to work
properly without blocking updates and deletes.
"""
sql = """
-- Select tables that have no primary key and use the default replica identity rule
-- (the default is to use the primary key)
WITH tables_no_pkey AS (
SELECT tbl.table_schema, tbl.table_name
FROM information_schema.tables tbl
WHERE table_type = 'BASE TABLE'
AND table_schema not in ('pg_catalog', 'information_schema')
AND NOT EXISTS (
SELECT 1
FROM information_schema.key_column_usage kcu
WHERE kcu.table_name = tbl.table_name
AND kcu.table_schema = tbl.table_schema
)
)
SELECT pg_class.oid::regclass FROM tables_no_pkey INNER JOIN pg_class ON pg_class.oid::regclass = table_name::regclass
WHERE relreplident = 'd'
UNION
-- Also select tables that use an index as a replica identity
-- but where the index doesn't exist
-- (e.g. it could have been deleted)
SELECT pg_class.oid::regclass
FROM information_schema.tables tbl
INNER JOIN pg_class ON pg_class.oid::regclass = table_name::regclass
WHERE table_type = 'BASE TABLE'
AND table_schema not in ('pg_catalog', 'information_schema')
-- 'i' means an index is used as the replica identity
AND relreplident = 'i'
-- look for indices that are marked as the replica identity
AND NOT EXISTS (
SELECT indexrelid::regclass
FROM pg_index
WHERE indrelid = pg_class.oid::regclass AND indisreplident
)
"""
def _list_tables_with_missing_replica_identities_txn(
txn: LoggingTransaction,
) -> List[str]:
txn.execute(sql)
return [table_name for table_name, in txn]
for pool in self.db_pools:
missing = self.get_success(
pool.runInteraction(
"test_list_missing_replica_identities",
_list_tables_with_missing_replica_identities_txn,
)
)
self.assertEqual(
len(missing),
0,
f"The following tables in the {pool.name()!r} database are missing REPLICA IDENTITIES: {missing!r}.",
)

View File

@@ -13,11 +13,7 @@
# limitations under the License.
from typing import Dict, Iterable, List, Sequence
from synapse.util.iterutils import (
chunk_seq,
sorted_topologically,
sorted_topologically_batched,
)
from synapse.util.iterutils import chunk_seq, sorted_topologically
from tests.unittest import TestCase
@@ -111,73 +107,3 @@ class SortTopologically(TestCase):
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
class SortTopologicallyBatched(TestCase):
"Test cases for `sorted_topologically_batched`"
def test_empty(self) -> None:
"Test that an empty graph works correctly"
graph: Dict[int, List[int]] = {}
self.assertEqual(list(sorted_topologically_batched([], graph)), [])
def test_handle_empty_graph(self) -> None:
"Test that a graph where a node doesn't have an entry is treated as empty"
graph: Dict[int, List[int]] = {}
# For disconnected nodes the output is simply sorted.
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
def test_disconnected(self) -> None:
"Test that a graph with no edges work"
graph: Dict[int, List[int]] = {1: [], 2: []}
# For disconnected nodes the output is simply sorted.
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
def test_linear(self) -> None:
"Test that a simple `4 -> 3 -> 2 -> 1` graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)
def test_subset(self) -> None:
"Test that only sorting a subset of the graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])
def test_fork(self) -> None:
"Test that a forked graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
# Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
# always get the same one.
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
)
def test_duplicates(self) -> None:
"Test that a graph with duplicate edges work"
graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)
def test_multiple_paths(self) -> None:
"Test that a graph with multiple paths between two nodes work"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)