Always treat RETURNING as supported by SQL engines (#19047)

Can do this now that SQLite 3.35.0 added support for `RETURNING`.

> The RETURNING syntax has been supported by SQLite since version 3.35.0
(2021-03-12).
>
> *-- https://sqlite.org/lang_returning.html*

This also bumps the minimum supported SQLite version according to
Synapse's [deprecation
policy](https://element-hq.github.io/synapse/latest/deprecation_policy.html#platform-dependencies).

Fix https://github.com/element-hq/synapse/issues/17577
This commit is contained in:
Andrew Ferrazzutti
2025-10-24 14:21:49 -04:00
committed by GitHub
parent 40893be93c
commit 9d81bb703c
16 changed files with 80 additions and 186 deletions

View File

@@ -99,24 +99,24 @@ set_output("trial_test_matrix", test_matrix)
# First calculate the various sytest jobs.
#
# For each type of test we only run on bullseye on PRs
# For each type of test we only run on bookworm on PRs
sytest_tests = [
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "postgres",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "multi-postgres",
"workers": "workers",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "multi-postgres",
"workers": "workers",
"reactor": "asyncio",
@@ -127,11 +127,11 @@ if not IS_PR:
sytest_tests.extend(
[
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"reactor": "asyncio",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "postgres",
"reactor": "asyncio",
},

View File

@@ -139,9 +139,9 @@ jobs:
fail-fast: false
matrix:
include:
- sytest-tag: bullseye
- sytest-tag: bookworm
- sytest-tag: bullseye
- sytest-tag: bookworm
postgres: postgres
workers: workers
redis: redis

View File

@@ -108,11 +108,11 @@ jobs:
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
container:
# We're using debian:bullseye because it uses Python 3.9 which is our minimum supported Python version.
# We're using bookworm because that's what Debian oldstable is at the time of writing.
# This job is a canary to warn us about unreleased twisted changes that would cause problems for us if
# they were to be released immediately. For simplicity's sake (and to save CI runners) we use the oldest
# version, assuming that any incompatibilities on newer versions would also be present on the oldest.
image: matrixdotorg/sytest-synapse:bullseye
image: matrixdotorg/sytest-synapse:bookworm
volumes:
- ${{ github.workspace }}:/src

1
changelog.d/19047.doc Normal file
View File

@@ -0,0 +1 @@
Update the link to the Debian oldstable package for SQLite.

1
changelog.d/19047.misc Normal file
View File

@@ -0,0 +1 @@
Always treat `RETURNING` as supported by SQL engines, now that the minimum-supported versions of both SQLite and PostgreSQL support it.

View File

@@ -0,0 +1 @@
Remove support for SQLite < 3.37.2.

View File

@@ -21,7 +21,7 @@ people building from source should ensure they can fetch recent versions of Rust
(e.g. by using [rustup](https://rustup.rs/)).
The oldest supported version of SQLite is the version
[provided](https://packages.debian.org/bullseye/libsqlite3-0) by
[provided](https://packages.debian.org/oldstable/libsqlite3-0) by
[Debian oldstable](https://wiki.debian.org/DebianOldStable).

View File

@@ -320,7 +320,7 @@ The following command will let you run the integration test with the most common
configuration:
```sh
$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:bullseye
$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:bookworm
```
(Note that the paths must be full paths! You could also write `$(realpath relative/path)` if needed.)

View File

@@ -1161,36 +1161,17 @@ class DatabasePool:
SQLite versions that don't support it).
"""
if txn.database_engine.supports_returning:
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
table,
", ".join(k for k in values.keys()),
", ".join("?" for _ in values.keys()),
", ".join(k for k in returning),
)
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
table,
", ".join(k for k in values.keys()),
", ".join("?" for _ in values.keys()),
", ".join(k for k in returning),
)
txn.execute(sql, list(values.values()))
row = txn.fetchone()
assert row is not None
return row
else:
# For old versions of SQLite we do a standard insert and then can
# use `last_insert_rowid` to get at the row we just inserted
DatabasePool.simple_insert_txn(
txn,
table=table,
values=values,
)
txn.execute("SELECT last_insert_rowid()")
row = txn.fetchone()
assert row is not None
(rowid,) = row
row = DatabasePool.simple_select_one_txn(
txn, table=table, keyvalues={"rowid": rowid}, retcols=returning
)
assert row is not None
return row
txn.execute(sql, list(values.values()))
row = txn.fetchone()
assert row is not None
return row
async def simple_insert_many(
self,

View File

@@ -347,33 +347,28 @@ class DelayedEventsStore(SQLBaseStore):
EventDetails,
Optional[Timestamp],
]:
sql_cols = ", ".join(
(
"room_id",
"event_type",
"state_key",
"origin_server_ts",
"content",
"device_id",
)
)
sql_update = "UPDATE delayed_events SET is_processed = TRUE"
sql_where = "WHERE delay_id = ? AND user_localpart = ? AND NOT is_processed"
sql_args = (delay_id, user_localpart)
txn.execute(
"""
UPDATE delayed_events
SET is_processed = TRUE
WHERE delay_id = ? AND user_localpart = ?
AND NOT is_processed
RETURNING
room_id,
event_type,
state_key,
origin_server_ts,
content,
device_id
""",
(
f"{sql_update} {sql_where} RETURNING {sql_cols}"
if self.database_engine.supports_returning
else f"SELECT {sql_cols} FROM delayed_events {sql_where}"
delay_id,
user_localpart,
),
sql_args,
)
row = txn.fetchone()
if row is None:
raise NotFoundError("Delayed event not found")
elif not self.database_engine.supports_returning:
txn.execute(f"{sql_update} {sql_where}", sql_args)
assert txn.rowcount == 1
event = EventDetails(
RoomID.from_string(row[0]),

View File

@@ -2040,61 +2040,29 @@ class EventFederationWorkerStore(
Returns:
The received_ts of the row that was deleted, if any.
"""
if self.db_pool.engine.supports_returning:
def _remove_received_event_from_staging_txn(
txn: LoggingTransaction,
) -> Optional[int]:
sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""
def _remove_received_event_from_staging_txn(
txn: LoggingTransaction,
) -> Optional[int]:
sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""
txn.execute(sql, (origin, event_id))
row = cast(Optional[tuple[int]], txn.fetchone())
txn.execute(sql, (origin, event_id))
row = cast(Optional[tuple[int]], txn.fetchone())
if row is None:
return None
if row is None:
return None
return row[0]
return row[0]
return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)
else:
def _remove_received_event_from_staging_txn(
txn: LoggingTransaction,
) -> Optional[int]:
received_ts = self.db_pool.simple_select_one_onecol_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
retcol="received_ts",
allow_none=True,
)
self.db_pool.simple_delete_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
)
return received_ts
return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
)
return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)
async def get_next_staged_event_id_for_room(
self,

View File

@@ -2544,31 +2544,13 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
)
args.append(user_id)
if self.database_engine.supports_returning:
sql = f"""
DELETE FROM access_tokens
WHERE {clause} AND user_id = ?
RETURNING token, id, device_id
"""
txn.execute(sql, args)
tokens_and_devices = txn.fetchall()
else:
tokens_and_devices = self.db_pool.simple_select_many_txn(
txn,
table="access_tokens",
column="device_id",
iterable=batch_device_ids,
keyvalues={"user_id": user_id},
retcols=("token", "id", "device_id"),
)
self.db_pool.simple_delete_many_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
column="device_id",
values=batch_device_ids,
)
sql = f"""
DELETE FROM access_tokens
WHERE {clause} AND user_id = ?
RETURNING token, id, device_id
"""
txn.execute(sql, args)
tokens_and_devices = txn.fetchall()
self._invalidate_cache_and_stream_bulk(
txn,

View File

@@ -353,27 +353,19 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
def _populate_user_directory_process_users_txn(
txn: LoggingTransaction,
) -> Optional[int]:
if self.database_engine.supports_returning:
# Note: we use an ORDER BY in the SELECT to force usage of an
# index. Otherwise, postgres does a sequential scan that is
# surprisingly slow (I think due to the fact it will read/skip
# over lots of already deleted rows).
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(list[tuple[str]], txn.fetchall())
else:
sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
# Note: we use an ORDER BY in the SELECT to force usage of an
# index. Otherwise, postgres does a sequential scan that is
# surprisingly slow (I think due to the fact it will read/skip
# over lots of already deleted rows).
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
txn.execute(sql)
user_result = cast(list[tuple[str]], txn.fetchall())
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(list[tuple[str]], txn.fetchall())
if not user_result:
return None
@@ -432,17 +424,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# Actually insert the users with their profiles into the directory.
self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)
# We've finished processing the users. Delete it from the table, if
# we haven't already.
if not self.database_engine.supports_returning:
self.db_pool.simple_delete_many_txn(
txn,
table=TEMP_TABLE + "_users",
column="user_id",
values=users_to_work_on,
keyvalues={},
)
# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
self.db_pool.updates._background_update_progress_txn(

View File

@@ -63,12 +63,6 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""
...
@property
@abc.abstractmethod
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
...
@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False

View File

@@ -193,11 +193,6 @@ class PostgresEngine(
"""Do we support using `a = ANY(?)` and passing a list"""
return True
@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True
def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html

View File

@@ -68,11 +68,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
"""Do we support using `a = ANY(?)` and passing a list"""
return False
@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return sqlite3.sqlite_version_info >= (3, 35, 0)
def check_database(
self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
) -> None:
@@ -80,8 +75,8 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
# Synapse is untested against older SQLite versions, and we don't want
# to let users upgrade to a version of Synapse with broken support for their
# sqlite version, because it risks leaving them with a half-upgraded db.
if sqlite3.sqlite_version_info < (3, 27, 0):
raise RuntimeError("Synapse requires sqlite 3.27 or above.")
if sqlite3.sqlite_version_info < (3, 37, 2):
raise RuntimeError("Synapse requires sqlite 3.37.2 or above.")
def check_new_database(self, txn: Cursor) -> None:
"""Gets called when setting up a brand new database. This allows us to