Compare commits

...

21 Commits

Author SHA1 Message Date
Patrick Cloke
f118aa6889 Lint 2023-11-09 15:01:30 -05:00
Patrick Cloke
7b82cba4bc Support running tests in CI. 2023-11-09 14:58:51 -05:00
Patrick Cloke
ff40f22a23 Back out unneeded changes. 2023-11-09 14:46:48 -05:00
Patrick Cloke
55193f38e5 Merge remote-tracking branch 'origin/develop' into clokep/psycopg3 2023-11-09 14:45:50 -05:00
Patrick Cloke
eb2b0dceb4 Another instance of psycopg2. 2023-11-09 14:38:15 -05:00
Patrick Cloke
626f468155 Backout unneeded changes. 2023-11-09 14:21:33 -05:00
Patrick Cloke
396fa974a1 Merge remote-tracking branch 'origin/develop' into clokep/psycopg3 2023-11-09 09:48:24 -05:00
Patrick Cloke
a280d117dc Don't use separate copy_read method. 2023-09-29 14:24:54 -04:00
Patrick Cloke
a072285e9d Use _do_execute for COPY TO/FROM. 2023-09-29 14:07:34 -04:00
Patrick Cloke
29492b7e85 Lint. 2023-09-29 13:44:49 -04:00
Patrick Cloke
208a5944a6 Tweaks 2023-09-29 06:33:26 -04:00
Patrick Cloke
856faa8fce Add to poetry env. 2023-09-28 15:41:41 -04:00
Patrick Cloke
2b02f4a41c Merge remote-tracking branch 'origin/develop' into clokep/psycopg3 2023-09-28 15:31:22 -04:00
Patrick Cloke
4a0dfb336f temp 2023-09-22 15:58:55 -04:00
Patrick Cloke
edff9f7dca Merge remote-tracking branch 'origin/develop' into clokep/psycopg3 2023-09-22 14:26:42 -04:00
Patrick Cloke
2d8cbbd53a More fixes. 2023-09-22 14:26:40 -04:00
Patrick Cloke
be042ce2c6 Separate engines via subclassing. 2023-09-22 14:25:23 -04:00
Patrick Cloke
8bb700e9ac Merge remote-tracking branch 'origin/develop' into clokep/psycopg3 2023-09-13 14:35:46 -04:00
Patrick Cloke
9f0ccbdbaf Working version. 2022-11-30 08:20:48 -05:00
Patrick Cloke
f5ef7e13d7 Initial abstraction. 2022-11-30 08:20:48 -05:00
Patrick Cloke
61fc1cb1e7 Copy PostgresEngine. 2022-11-30 08:20:47 -05:00
24 changed files with 529 additions and 190 deletions

View File

@@ -50,23 +50,38 @@ if not IS_PR:
for version in ("3.9", "3.10", "3.11", "3.12")
)
# Run with both psycopg2 and psycopg.
trial_postgres_tests = [
{
"python-version": "3.8",
"database": "postgres",
"postgres-version": "11",
"extras": "all",
}
},
{
"python-version": "3.8",
"database": "psycopg",
"postgres-version": "11",
"extras": "all",
},
]
if not IS_PR:
trial_postgres_tests.append(
{
"python-version": "3.12",
"database": "postgres",
"postgres-version": "16",
"extras": "all",
}
trial_postgres_tests.extend(
[
{
"python-version": "3.12",
"database": "postgres",
"postgres-version": "16",
"extras": "all",
},
{
"python-version": "3.12",
"database": "psycopg",
"postgres-version": "16",
"extras": "all",
},
]
)
trial_no_extra_tests = [

View File

@@ -346,7 +346,9 @@ jobs:
run: until pg_isready -h localhost; do sleep 1; done
- run: poetry run trial --jobs=6 tests
env:
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
# If matrix.job.database is 'psycopg' set SYNAPSE_POSTGRES to that string;
# otherwise if it is 'postgres' set it to true. Otherwise, empty.
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'psycopg' && 'psycopg' || matrix.job.database == 'postgres' || '' }}
SYNAPSE_POSTGRES_HOST: /var/run/postgresql
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres

94
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand.
[[package]]
name = "alabaster"
@@ -108,6 +108,34 @@ files = [
[package.dependencies]
pytz = {version = ">=2015.7", markers = "python_version < \"3.9\""}
[[package]]
name = "backports-zoneinfo"
version = "0.2.1"
description = "Backport of the standard library zoneinfo module"
optional = true
python-versions = ">=3.6"
files = [
{file = "backports.zoneinfo-0.2.1-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:da6013fd84a690242c310d77ddb8441a559e9cb3d3d59ebac9aca1a57b2e18bc"},
{file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:89a48c0d158a3cc3f654da4c2de1ceba85263fafb861b98b59040a5086259722"},
{file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:1c5742112073a563c81f786e77514969acb58649bcdf6cdf0b4ed31a348d4546"},
{file = "backports.zoneinfo-0.2.1-cp36-cp36m-win32.whl", hash = "sha256:e8236383a20872c0cdf5a62b554b27538db7fa1bbec52429d8d106effbaeca08"},
{file = "backports.zoneinfo-0.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:8439c030a11780786a2002261569bdf362264f605dfa4d65090b64b05c9f79a7"},
{file = "backports.zoneinfo-0.2.1-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:f04e857b59d9d1ccc39ce2da1021d196e47234873820cbeaad210724b1ee28ac"},
{file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:17746bd546106fa389c51dbea67c8b7c8f0d14b5526a579ca6ccf5ed72c526cf"},
{file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:5c144945a7752ca544b4b78c8c41544cdfaf9786f25fe5ffb10e838e19a27570"},
{file = "backports.zoneinfo-0.2.1-cp37-cp37m-win32.whl", hash = "sha256:e55b384612d93be96506932a786bbcde5a2db7a9e6a4bb4bffe8b733f5b9036b"},
{file = "backports.zoneinfo-0.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:a76b38c52400b762e48131494ba26be363491ac4f9a04c1b7e92483d169f6582"},
{file = "backports.zoneinfo-0.2.1-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:8961c0f32cd0336fb8e8ead11a1f8cd99ec07145ec2931122faaac1c8f7fd987"},
{file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e81b76cace8eda1fca50e345242ba977f9be6ae3945af8d46326d776b4cf78d1"},
{file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7b0a64cda4145548fed9efc10322770f929b944ce5cee6c0dfe0c87bf4c0c8c9"},
{file = "backports.zoneinfo-0.2.1-cp38-cp38-win32.whl", hash = "sha256:1b13e654a55cd45672cb54ed12148cd33628f672548f373963b0bff67b217328"},
{file = "backports.zoneinfo-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:4a0f800587060bf8880f954dbef70de6c11bbe59c673c3d818921f042f9954a6"},
{file = "backports.zoneinfo-0.2.1.tar.gz", hash = "sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2"},
]
[package.extras]
tzdata = ["tzdata"]
[[package]]
name = "bcrypt"
version = "4.0.1"
@@ -763,17 +791,6 @@ files = [
{file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4a3a6a2fbbe7550ffe52d151cf76065e6b89cfb3e9d0463e49a7e322a25d0426"},
{file = "ijson-3.2.3-cp311-cp311-win32.whl", hash = "sha256:6a4db2f7fb9acfb855c9ae1aae602e4648dd1f88804a0d5cfb78c3639bcf156c"},
{file = "ijson-3.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:ccd6be56335cbb845f3d3021b1766299c056c70c4c9165fb2fbe2d62258bae3f"},
{file = "ijson-3.2.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:055b71bbc37af5c3c5861afe789e15211d2d3d06ac51ee5a647adf4def19c0ea"},
{file = "ijson-3.2.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:c075a547de32f265a5dd139ab2035900fef6653951628862e5cdce0d101af557"},
{file = "ijson-3.2.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:457f8a5fc559478ac6b06b6d37ebacb4811f8c5156e997f0d87d708b0d8ab2ae"},
{file = "ijson-3.2.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9788f0c915351f41f0e69ec2618b81ebfcf9f13d9d67c6d404c7f5afda3e4afb"},
{file = "ijson-3.2.3-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fa234ab7a6a33ed51494d9d2197fb96296f9217ecae57f5551a55589091e7853"},
{file = "ijson-3.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bdd0dc5da4f9dc6d12ab6e8e0c57d8b41d3c8f9ceed31a99dae7b2baf9ea769a"},
{file = "ijson-3.2.3-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:c6beb80df19713e39e68dc5c337b5c76d36ccf69c30b79034634e5e4c14d6904"},
{file = "ijson-3.2.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:a2973ce57afb142d96f35a14e9cfec08308ef178a2c76b8b5e1e98f3960438bf"},
{file = "ijson-3.2.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:105c314fd624e81ed20f925271ec506523b8dd236589ab6c0208b8707d652a0e"},
{file = "ijson-3.2.3-cp312-cp312-win32.whl", hash = "sha256:ac44781de5e901ce8339352bb5594fcb3b94ced315a34dbe840b4cff3450e23b"},
{file = "ijson-3.2.3-cp312-cp312-win_amd64.whl", hash = "sha256:0567e8c833825b119e74e10a7c29761dc65fcd155f5d4cb10f9d3b8916ef9912"},
{file = "ijson-3.2.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:eeb286639649fb6bed37997a5e30eefcacddac79476d24128348ec890b2a0ccb"},
{file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:396338a655fb9af4ac59dd09c189885b51fa0eefc84d35408662031023c110d1"},
{file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0e0243d166d11a2a47c17c7e885debf3b19ed136be2af1f5d1c34212850236ac"},
@@ -1754,6 +1771,30 @@ files = [
[package.extras]
twisted = ["twisted"]
[[package]]
name = "psycopg"
version = "3.1.12"
description = "PostgreSQL database adapter for Python"
optional = true
python-versions = ">=3.7"
files = [
{file = "psycopg-3.1.12-py3-none-any.whl", hash = "sha256:8ec5230d6a7eb654b4fb3cf2d3eda8871d68f24807b934790504467f1deee9f8"},
{file = "psycopg-3.1.12.tar.gz", hash = "sha256:cec7ad2bc6a8510e56c45746c631cf9394148bdc8a9a11fd8cf8554ce129ae78"},
]
[package.dependencies]
"backports.zoneinfo" = {version = ">=0.2.0", markers = "python_version < \"3.9\""}
typing-extensions = ">=4.1"
tzdata = {version = "*", markers = "sys_platform == \"win32\""}
[package.extras]
binary = ["psycopg-binary (==3.1.12)"]
c = ["psycopg-c (==3.1.12)"]
dev = ["black (>=23.1.0)", "dnspython (>=2.1)", "flake8 (>=4.0)", "mypy (>=1.4.1)", "types-setuptools (>=57.4)", "wheel (>=0.37)"]
docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"]
pool = ["psycopg-pool"]
test = ["anyio (>=3.6.2,<4.0)", "mypy (>=1.4.1)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"]
[[package]]
name = "psycopg2"
version = "2.9.9"
@@ -1765,8 +1806,6 @@ files = [
{file = "psycopg2-2.9.9-cp310-cp310-win_amd64.whl", hash = "sha256:426f9f29bde126913a20a96ff8ce7d73fd8a216cfb323b1f04da402d452853c3"},
{file = "psycopg2-2.9.9-cp311-cp311-win32.whl", hash = "sha256:ade01303ccf7ae12c356a5e10911c9e1c51136003a9a1d92f7aa9d010fb98372"},
{file = "psycopg2-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:121081ea2e76729acfb0673ff33755e8703d45e926e416cb59bae3a86c6a4981"},
{file = "psycopg2-2.9.9-cp312-cp312-win32.whl", hash = "sha256:d735786acc7dd25815e89cc4ad529a43af779db2e25aa7c626de864127e5a024"},
{file = "psycopg2-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:a7653d00b732afb6fc597e29c50ad28087dcb4fbfb28e86092277a559ae4e693"},
{file = "psycopg2-2.9.9-cp37-cp37m-win32.whl", hash = "sha256:5e0d98cade4f0e0304d7d6f25bbfbc5bd186e07b38eac65379309c4ca3193efa"},
{file = "psycopg2-2.9.9-cp37-cp37m-win_amd64.whl", hash = "sha256:7e2dacf8b009a1c1e843b5213a87f7c544b2b042476ed7755be813eaf4e8347a"},
{file = "psycopg2-2.9.9-cp38-cp38-win32.whl", hash = "sha256:ff432630e510709564c01dafdbe996cb552e0b9f3f065eb89bdce5bd31fabf4c"},
@@ -2182,7 +2221,6 @@ files = [
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"},
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"},
{file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"},
{file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"},
{file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"},
{file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"},
{file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"},
@@ -2190,15 +2228,8 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"},
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"},
{file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"},
{file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"},
{file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"},
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
{file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"},
{file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"},
{file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"},
{file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"},
@@ -2215,7 +2246,6 @@ files = [
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"},
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"},
{file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"},
{file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"},
{file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"},
{file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"},
{file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"},
@@ -2223,7 +2253,6 @@ files = [
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"},
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"},
{file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"},
{file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"},
{file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"},
{file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"},
{file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"},
@@ -3186,6 +3215,17 @@ files = [
{file = "typing_extensions-4.8.0.tar.gz", hash = "sha256:df8e4339e9cb77357558cbdbceca33c303714cf861d1eef15e1070055ae8b7ef"},
]
[[package]]
name = "tzdata"
version = "2023.3"
description = "Provider of IANA time zone data"
optional = true
python-versions = ">=2"
files = [
{file = "tzdata-2023.3-py2.py3-none-any.whl", hash = "sha256:7e65763eef3120314099b6939b5546db7adce1e7d6f2e179e3df563c70511eda"},
{file = "tzdata-2023.3.tar.gz", hash = "sha256:11ef1e08e54acb0d4f95bdb1be05da659673de4acbd21bf9c69e94cc5e907a3a"},
]
[[package]]
name = "unpaddedbase64"
version = "2.1.0"
@@ -3429,13 +3469,13 @@ docs = ["Sphinx", "repoze.sphinx.autointerface"]
test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"]
[extras]
all = ["Pympler", "authlib", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pyicu", "pysaml2", "sentry-sdk", "txredisapi"]
all = ["Pympler", "authlib", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pyicu", "pysaml2", "sentry-sdk", "txredisapi"]
cache-memory = ["Pympler"]
jwt = ["authlib"]
matrix-synapse-ldap3 = ["matrix-synapse-ldap3"]
oidc = ["authlib"]
opentracing = ["jaeger-client", "opentracing"]
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
postgres = ["psycopg", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
redis = ["hiredis", "txredisapi"]
saml2 = ["pysaml2"]
sentry = ["sentry-sdk"]
@@ -3447,4 +3487,4 @@ user-search = ["pyicu"]
[metadata]
lock-version = "2.0"
python-versions = "^3.8.0"
content-hash = "369455d6a67753a6bcfbad3cd86801b1dd02896d0180080e2ba9501e007353ec"
content-hash = "73869fa198a8ddd7f724915dbea25b462a44fa418ce47a425b1adf16ecd1b82f"

View File

@@ -233,6 +233,7 @@ matrix-synapse-ldap3 = { version = ">=0.1", optional = true }
psycopg2 = { version = ">=2.8", markers = "platform_python_implementation != 'PyPy'", optional = true }
psycopg2cffi = { version = ">=2.8", markers = "platform_python_implementation == 'PyPy'", optional = true }
psycopg2cffi-compat = { version = "==1.1", markers = "platform_python_implementation == 'PyPy'", optional = true }
psycopg = { version = "^3.1", optional = true }
pysaml2 = { version = ">=4.5.0", optional = true }
authlib = { version = ">=0.15.1", optional = true }
# systemd-python is necessary for logging to the systemd journal via
@@ -255,7 +256,7 @@ pyicu = { version = ">=2.10.2", optional = true }
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
# twice: once here, and once in the `all` extra.
matrix-synapse-ldap3 = ["matrix-synapse-ldap3"]
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "psycopg"]
saml2 = ["pysaml2"]
oidc = ["authlib"]
# systemd-python is necessary for logging to the systemd journal via
@@ -292,7 +293,7 @@ all = [
# matrix-synapse-ldap3
"matrix-synapse-ldap3",
# postgres
"psycopg2", "psycopg2cffi", "psycopg2cffi-compat",
"psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "psycopg",
# saml2
"pysaml2",
# oidc and jwt

View File

@@ -1390,7 +1390,7 @@ def main() -> None:
if "name" not in postgres_config:
sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
if postgres_config["name"] not in ("psycopg", "psycopg2"):
sys.stderr.write("Database must use the 'psycopg2' connector.\n")
sys.exit(3)

View File

@@ -50,7 +50,7 @@ class DatabaseConnectionConfig:
def __init__(self, name: str, db_config: dict):
db_engine = db_config.get("name", "sqlite3")
if db_engine not in ("sqlite3", "psycopg2"):
if db_engine not in ("sqlite3", "psycopg2", "psycopg"):
raise ConfigError("Unsupported database type %r" % (db_engine,))
if db_engine == "sqlite3":

View File

@@ -49,7 +49,11 @@ else:
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
logger = logging.getLogger(__name__)
@@ -746,10 +750,10 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index.
"""
def create_index_psql(conn: Connection) -> None:
def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
conn.rollback()
# postgres insists on autocommit for the index
conn.set_session(autocommit=True) # type: ignore
conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
c = conn.cursor()
@@ -793,9 +797,9 @@ class BackgroundUpdater:
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql)
conn.set_session(autocommit=False) # type: ignore
conn.engine.attempt_to_set_autocommit(conn.conn, False)
def create_index_sqlite(conn: Connection) -> None:
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
# Sqlite doesn't support concurrent creation of indexes.
#
# We assume that sqlite doesn't give us invalid indices; however

View File

@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import itertools
import logging
import time
import types
@@ -56,7 +57,13 @@ from synapse.logging.context import (
from synapse.metrics import LaterGauge, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.engines import (
BaseDatabaseEngine,
PostgresEngine,
Psycopg2Engine,
PsycopgEngine,
Sqlite3Engine,
)
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
from synapse.util.async_helpers import delay_cancellation
from synapse.util.iterutils import batch_iter
@@ -343,7 +350,8 @@ class LoggingTransaction:
def fetchone(self) -> Optional[Tuple]:
return self.txn.fetchone()
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
def fetchmany(self, size: int = 0) -> List[Tuple]:
# XXX This can also be called with no arguments.
return self.txn.fetchmany(size=size)
def fetchall(self) -> List[Tuple]:
@@ -369,7 +377,7 @@ class LoggingTransaction:
More efficient than `executemany` on PostgreSQL
"""
if isinstance(self.database_engine, PostgresEngine):
if isinstance(self.database_engine, Psycopg2Engine):
from psycopg2.extras import execute_batch
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
@@ -378,6 +386,8 @@ class LoggingTransaction:
self._do_execute(
lambda the_sql: execute_batch(self.txn, the_sql, args), sql
)
# TODO Can psycopg3 do anything better?
else:
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://docs.python.org/3/library/sqlite3.html?highlight=sqlite3#sqlite3.Cursor.executemany
@@ -389,7 +399,7 @@ class LoggingTransaction:
def execute_values(
self,
sql: str,
values: Iterable[Iterable[Any]],
values: Sequence[Sequence[Any]],
template: Optional[str] = None,
fetch: bool = True,
) -> List[Tuple]:
@@ -403,17 +413,56 @@ class LoggingTransaction:
compose the query.
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values
return self._do_execute(
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
lambda the_sql, the_values: execute_values(
self.txn, the_sql, the_values, template=template, fetch=fetch
),
sql,
values,
)
if isinstance(self.database_engine, Psycopg2Engine):
from psycopg2.extras import execute_values
return self._do_execute(
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
lambda the_sql, the_values: execute_values(
self.txn, the_sql, the_values, template=template, fetch=fetch
),
sql,
values,
)
else:
# We use fetch = False to mean a writable query. You *might* be able
# to morph that into a COPY (...) FROM STDIN, but it isn't worth the
# effort for the few places we set fetch = False.
assert fetch is True
# execute_values requires a single replacement, but we need to expand it
# for COPY. This assumes all inner sequences are the same length.
value_str = "(" + ", ".join("?" for _ in next(iter(values))) + ")"
sql = sql.replace("?", ", ".join(value_str for _ in values))
# Wrap the SQL in the COPY statement.
sql = f"COPY ({sql}) TO STDOUT"
def f(
the_sql: str, the_args: Sequence[Sequence[Any]]
) -> Iterable[Tuple[Any, ...]]:
with self.txn.copy(the_sql, the_args) as copy:
yield from copy.rows()
# Flatten the values.
return self._do_execute(f, sql, list(itertools.chain.from_iterable(values)))
def copy_write(
self, sql: str, args: Iterable[Any], values: Iterable[Iterable[Any]]
) -> None:
"""Corresponds to a PostgreSQL COPY (...) FROM STDIN call."""
assert isinstance(self.database_engine, PsycopgEngine)
def f(
the_sql: str, the_args: Iterable[Any], the_values: Iterable[Iterable[Any]]
) -> None:
with self.txn.copy(the_sql, the_args) as copy:
for record in the_values:
copy.write_row(record)
self._do_execute(f, sql, args, values)
def execute(self, sql: str, parameters: SQLQueryParameters = ()) -> None:
self._do_execute(self.txn.execute, sql, parameters)
@@ -445,6 +494,12 @@ class LoggingTransaction:
def _make_sql_one_line(self, sql: str) -> str:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
if isinstance(self.database_engine, PsycopgEngine):
import psycopg.sql
if isinstance(sql, psycopg.sql.Composed):
return sql.as_string(None)
return " ".join(line.strip() for line in sql.splitlines() if line.strip())
def _do_execute(
@@ -485,7 +540,7 @@ class LoggingTransaction:
finally:
secs = time.time() - start
sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
sql_query_timer.labels(sql.split()[0]).observe(secs)
sql_query_timer.labels(one_line_sql.split()[0]).observe(secs)
def close(self) -> None:
self.txn.close()
@@ -1116,8 +1171,8 @@ class DatabasePool:
def simple_insert_many_txn(
txn: LoggingTransaction,
table: str,
keys: Sequence[str],
values: Collection[Iterable[Any]],
keys: Collection[str],
values: Sequence[Sequence[Any]],
) -> None:
"""Executes an INSERT query on the named table.
@@ -1134,7 +1189,7 @@ class DatabasePool:
if not values:
return
if isinstance(txn.database_engine, PostgresEngine):
if isinstance(txn.database_engine, Psycopg2Engine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ?" % (
@@ -1143,6 +1198,14 @@ class DatabasePool:
)
txn.execute_values(sql, values, fetch=False)
elif isinstance(txn.database_engine, PsycopgEngine):
sql = "COPY %s (%s) FROM STDIN" % (
table,
", ".join(k for k in keys),
)
txn.copy_write(sql, (), values)
else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
@@ -1566,7 +1629,7 @@ class DatabasePool:
for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))
if isinstance(txn.database_engine, PostgresEngine):
if isinstance(txn.database_engine, Psycopg2Engine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % (
@@ -2323,7 +2386,7 @@ class DatabasePool:
values: for each row, a list of values in the same order as `keys`
"""
if isinstance(txn.database_engine, PostgresEngine):
if isinstance(txn.database_engine, Psycopg2Engine):
# We use `execute_values` as it can be a lot faster than `execute_batch`,
# but it's only available on postgres.
sql = "DELETE FROM %s WHERE (%s) IN (VALUES ?)" % (

View File

@@ -321,7 +321,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
c.chain_id = l.chain_id
AND sequence_number <= max_seq
"""
rows = txn.execute_values(sql, chains.items())
results.update(r for r, in rows)
else:

View File

@@ -96,10 +96,10 @@ from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
PostgresEngine,
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

View File

@@ -46,7 +46,7 @@ from synapse.storage.databases.main.stream import (
generate_pagination_bounds,
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import PostgresEngine, Psycopg2Engine
from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList
@@ -139,7 +139,7 @@ class RelationsWorkerStore(SQLBaseStore):
ON CONFLICT (room_id, thread_id)
DO NOTHING
"""
if isinstance(txn.database_engine, PostgresEngine):
if isinstance(txn.database_engine, Psycopg2Engine):
txn.execute_values(sql % ("?",), rows, fetch=False)
else:
txn.execute_batch(sql % ("(?, ?, ?, ?, ?)",), rows)

View File

@@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
c = conn.cursor()
@@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally:
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)
if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index)
@@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
conn.set_session(autocommit=True)
conn.attempt_to_set_autocommit(True)
c = conn.cursor()
# We create with NULLS FIRST so that when we search *backwards*
@@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
)
conn.set_session(autocommit=False)
conn.attempt_to_set_autocommit(False)
await self.db_pool.runWithConnection(create_index)

View File

@@ -53,7 +53,12 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.state import StateFilter
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.engines import (
PostgresEngine,
Psycopg2Engine,
PsycopgEngine,
Sqlite3Engine,
)
from synapse.types import (
JsonDict,
UserID,
@@ -718,35 +723,65 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
template = """
(
%s,
setweight(to_tsvector('simple', %s), 'A')
|| setweight(to_tsvector('simple', %s), 'D')
|| setweight(to_tsvector('simple', COALESCE(%s, '')), 'B')
)
"""
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute_values(
sql,
[
if isinstance(self.database_engine, Psycopg2Engine):
template = """
(
p.user_id,
get_localpart_from_id(p.user_id),
get_domain_from_id(p.user_id),
_filter_text_for_index(p.display_name)
if p.display_name
else None,
%s,
setweight(to_tsvector('simple', %s), 'A')
|| setweight(to_tsvector('simple', %s), 'D')
|| setweight(to_tsvector('simple', COALESCE(%s, '')), 'B')
)
for p in profiles
],
template=template,
fetch=False,
)
"""
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute_values(
sql,
[
(
p.user_id,
get_localpart_from_id(p.user_id),
get_domain_from_id(p.user_id),
_filter_text_for_index(p.display_name)
if p.display_name
else None,
)
for p in profiles
],
template=template,
fetch=False,
)
elif isinstance(self.database_engine, PsycopgEngine):
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES
(
?,
setweight(to_tsvector('simple', ?), 'A')
|| setweight(to_tsvector('simple', ?), 'D')
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
)
ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.executemany(
sql,
[
(
p.user_id,
get_localpart_from_id(p.user_id),
get_domain_from_id(p.user_id),
_filter_text_for_index(p.display_name)
if p.display_name
else None,
)
for p in profiles
],
)
elif isinstance(self.database_engine, Sqlite3Engine):
values = []
for p in profiles:

View File

@@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
conn.rollback()
if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
txn = conn.cursor()
txn.execute(
@@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
)
txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally:
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)
else:
txn = conn.cursor()
txn.execute(

View File

@@ -11,35 +11,41 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Mapping, NoReturn
from typing import Any, Mapping, NoReturn, cast
from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from .postgres import PostgresEngine
# The classes `PostgresEngine` and `Sqlite3Engine` must always be importable, because
# we use `isinstance(engine, PostgresEngine)` to write different queries for postgres
# and sqlite. But the database driver modules are both optional: they may not be
# installed. To account for this, create dummy classes on import failure so we can
# still run `isinstance()` checks.
try:
from .postgres import PostgresEngine
except ImportError:
class PostgresEngine(BaseDatabaseEngine): # type: ignore[no-redef]
def dummy_engine(name: str, module: str) -> BaseDatabaseEngine:
class Engine(BaseDatabaseEngine):
def __new__(cls, *args: object, **kwargs: object) -> NoReturn:
raise RuntimeError(
f"Cannot create {cls.__name__} -- psycopg2 module is not installed"
f"Cannot create {name} -- {module} module is not installed"
)
return cast(BaseDatabaseEngine, Engine)
try:
from .psycopg2 import Psycopg2Engine
except ImportError:
Psycopg2Engine = dummy_engine("Psycopg2Engine", "psycopg2") # type: ignore[misc,assignment]
try:
from .psycopg import PsycopgEngine
except ImportError:
PsycopgEngine = dummy_engine("PsycopgEngine", "psycopg") # type: ignore[misc,assignment]
try:
from .sqlite import Sqlite3Engine
except ImportError:
class Sqlite3Engine(BaseDatabaseEngine): # type: ignore[no-redef]
def __new__(cls, *args: object, **kwargs: object) -> NoReturn:
raise RuntimeError(
f"Cannot create {cls.__name__} -- sqlite3 module is not installed"
)
Sqlite3Engine = dummy_engine("Sqlite3Engine", "sqlite3") # type: ignore[misc,assignment]
def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine:
@@ -49,7 +55,10 @@ def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine:
return Sqlite3Engine(database_config)
if name == "psycopg2":
return PostgresEngine(database_config)
return Psycopg2Engine(database_config)
if name == "psycopg":
return PsycopgEngine(database_config)
raise RuntimeError("Unsupported database engine '%s'" % (name,))

View File

@@ -12,17 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast
from typing import TYPE_CHECKING, Any, Mapping, Optional, Tuple, Type, cast
import psycopg2.extensions
from psycopg import sql
from synapse.storage.engines._base import (
BaseDatabaseEngine,
ConnectionType,
CursorType,
IncorrectDatabaseSetup,
IsolationLevel,
)
from synapse.storage.types import Cursor
from synapse.storage.types import Cursor, DBAPI2Module
if TYPE_CHECKING:
from synapse.storage.database import LoggingDatabaseConnection
@@ -32,18 +34,15 @@ logger = logging.getLogger(__name__)
class PostgresEngine(
BaseDatabaseEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor]
BaseDatabaseEngine[ConnectionType, CursorType], metaclass=abc.ABCMeta
):
def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
isolation_level_map: Mapping[int, int]
default_isolation_level: int
OperationalError: Type[Exception]
# Disables passing `bytes` to txn.execute, c.f. #6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
def _disable_bytes_adapter(_: bytes) -> NoReturn:
raise Exception("Passing bytes to DB is disabled.")
def __init__(self, module: DBAPI2Module, database_config: Mapping[str, Any]):
super().__init__(module, database_config)
psycopg2.extensions.register_adapter(bytes, _disable_bytes_adapter)
self.synchronous_commit: bool = database_config.get("synchronous_commit", True)
# Set the statement timeout to 1 hour by default.
# Any query taking more than 1 hour should probably be considered a bug;
@@ -56,16 +55,15 @@ class PostgresEngine(
)
self._version: Optional[int] = None # unknown as yet
self.isolation_level_map: Mapping[int, int] = {
IsolationLevel.READ_COMMITTED: psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
IsolationLevel.REPEATABLE_READ: psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
IsolationLevel.SERIALIZABLE: psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE,
}
self.default_isolation_level = (
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
self.config = database_config
@abc.abstractmethod
def get_server_version(self, db_conn: ConnectionType) -> int:
"""Gets called when setting up a brand new database. This allows us to
apply stricter checks on new databases versus existing database.
"""
...
@property
def single_threaded(self) -> bool:
return False
@@ -79,14 +77,14 @@ class PostgresEngine(
def check_database(
self,
db_conn: psycopg2.extensions.connection,
db_conn: ConnectionType,
allow_outdated_version: bool = False,
) -> None:
# Get the version of PostgreSQL that we're using. As per the psycopg2
# docs: The number is formed by converting the major, minor, and
# revision numbers into two-decimal-digit numbers and appending them
# together. For example, version 8.1.5 will be returned as 80105
self._version = db_conn.server_version
self._version = self.get_server_version(db_conn)
allow_unsafe_locale = self.config.get("allow_unsafe_locale", False)
# Are we on a supported PostgreSQL version?
@@ -154,7 +152,7 @@ class PostgresEngine(
return sql.replace("?", "%s")
def on_new_connection(self, db_conn: "LoggingDatabaseConnection") -> None:
db_conn.set_isolation_level(self.default_isolation_level)
self.attempt_to_set_isolation_level(db_conn.conn, self.default_isolation_level)
# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
@@ -168,7 +166,15 @@ class PostgresEngine(
# Abort really long-running statements and turn them into errors.
if self.statement_timeout is not None:
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
# TODO Avoid a circular import, this needs to be abstracted.
if self.__class__.__name__ == "Psycopg2Engine":
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
else:
cursor.execute(
sql.SQL("SET statement_timeout TO {}").format(
self.statement_timeout
)
)
cursor.close()
db_conn.commit()
@@ -183,15 +189,7 @@ class PostgresEngine(
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True
def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
# "40001" serialization_failure
# "40P01" deadlock_detected
return error.pgcode in ["40001", "40P01"]
return False
def is_connection_closed(self, conn: psycopg2.extensions.connection) -> bool:
def is_connection_closed(self, conn: ConnectionType) -> bool:
return bool(conn.closed)
def lock_table(self, txn: Cursor, table: str) -> None:
@@ -215,25 +213,8 @@ class PostgresEngine(
def row_id_name(self) -> str:
return "ctid"
def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY
def attempt_to_set_autocommit(
self, conn: psycopg2.extensions.connection, autocommit: bool
) -> None:
return conn.set_session(autocommit=autocommit)
def attempt_to_set_isolation_level(
self, conn: psycopg2.extensions.connection, isolation_level: Optional[int]
) -> None:
if isolation_level is None:
isolation_level = self.default_isolation_level
else:
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level)
@staticmethod
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
def executescript(cursor: CursorType, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
Psycopg2 seems happy to do this in DBAPI2's `execute()` function.

View File

@@ -0,0 +1,89 @@
# Copyright 2022-2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Mapping, Optional
import psycopg
import psycopg.errors
import psycopg.sql
from twisted.enterprise.adbapi import Connection as TxConnection
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class PsycopgEngine(PostgresEngine[psycopg.Connection, psycopg.Cursor]):
OperationalError = psycopg.OperationalError
def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg, database_config)
# psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
# Disables passing `bytes` to txn.execute, c.f. #6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
# def _disable_bytes_adapter(_: bytes) -> NoReturn:
# raise Exception("Passing bytes to DB is disabled.")
self.isolation_level_map: Mapping[int, psycopg.IsolationLevel] = {
IsolationLevel.READ_COMMITTED: psycopg.IsolationLevel.READ_COMMITTED,
IsolationLevel.REPEATABLE_READ: psycopg.IsolationLevel.REPEATABLE_READ,
IsolationLevel.SERIALIZABLE: psycopg.IsolationLevel.SERIALIZABLE,
}
self.default_isolation_level = psycopg.IsolationLevel.REPEATABLE_READ
def get_server_version(self, db_conn: psycopg.Connection) -> int:
return db_conn.info.server_version
def convert_param_style(self, sql: str) -> str:
if isinstance(sql, psycopg.sql.Composed):
return sql
return sql.replace("?", "%s")
def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg.errors.Error):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
# "40001" serialization_failure
# "40P01" deadlock_detected
return error.sqlstate in ["40001", "40P01"]
return False
def in_transaction(self, conn: psycopg.Connection) -> bool:
return conn.info.transaction_status != psycopg.pq.TransactionStatus.IDLE
def attempt_to_set_autocommit(
self, conn: psycopg.Connection, autocommit: bool
) -> None:
# Sometimes this gets called with a Twisted connection instead, unwrap
# it because it doesn't support __setattr__.
if isinstance(conn, TxConnection):
conn = conn._connection
conn.autocommit = autocommit
def attempt_to_set_isolation_level(
self, conn: psycopg.Connection, isolation_level: Optional[int]
) -> None:
if isolation_level is None:
pg_isolation_level = self.default_isolation_level
else:
pg_isolation_level = self.isolation_level_map[isolation_level]
conn.isolation_level = pg_isolation_level

View File

@@ -0,0 +1,90 @@
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Mapping, Optional
import psycopg2.extensions
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class Psycopg2Engine(
PostgresEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor]
):
OperationalError = psycopg2.OperationalError
def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
self.isolation_level_map: Mapping[int, int] = {
IsolationLevel.READ_COMMITTED: psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
IsolationLevel.REPEATABLE_READ: psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
IsolationLevel.SERIALIZABLE: psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE,
}
self.default_isolation_level = (
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
self.config = database_config
def get_server_version(self, db_conn: psycopg2.extensions.connection) -> int:
return db_conn.server_version
def convert_param_style(self, sql: str) -> str:
return sql.replace("?", "%s")
def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
# "40001" serialization_failure
# "40P01" deadlock_detected
return error.pgcode in ["40001", "40P01"]
return False
def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY
def attempt_to_set_autocommit(
self, conn: psycopg2.extensions.connection, autocommit: bool
) -> None:
return conn.set_session(autocommit=autocommit)
def attempt_to_set_isolation_level(
self, conn: psycopg2.extensions.connection, isolation_level: Optional[int]
) -> None:
if isolation_level is None:
isolation_level = self.default_isolation_level
else:
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level)
@staticmethod
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
Psycopg2 seems happy to do this in DBAPI2's `execute()` function.
For consistency with SQLite, any ongoing transaction is committed before
executing the script in its own transaction. The script transaction is
left open and it is the responsibility of the caller to commit it.
"""
cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}")

View File

@@ -31,7 +31,12 @@ import attr
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.engines import (
BaseDatabaseEngine,
PostgresEngine,
PsycopgEngine,
Sqlite3Engine,
)
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
from synapse.storage.types import Cursor
@@ -270,7 +275,7 @@ def _setup_new_database(
for file_name in os.listdir(directory)
)
if isinstance(database_engine, PostgresEngine):
if isinstance(database_engine, (PostgresEngine, PsycopgEngine)):
specific = "postgres"
else:
specific = "sqlite"
@@ -414,7 +419,7 @@ def _upgrade_existing_database(
logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas)
if isinstance(database_engine, PostgresEngine):
if isinstance(database_engine, (PostgresEngine, PsycopgEngine)):
specific_engine_extension = ".postgres"
else:
specific_engine_extension = ".sqlite"

View File

@@ -18,7 +18,7 @@ Adds a postgres SEQUENCE for generating application service transaction IDs.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, PsycopgEngine
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
@@ -41,7 +41,13 @@ def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) ->
start_val = max(last_txn_max, txn_max) + 1
cur.execute(
"CREATE SEQUENCE application_services_txn_id_seq START WITH %s",
(start_val,),
)
# XXX This is a hack.
sql = f"CREATE SEQUENCE application_services_txn_id_seq START WITH {start_val}"
args = ()
if isinstance(database_engine, PsycopgEngine):
import psycopg.sql
cur.execute(psycopg.sql.SQL(sql).format(args))
else:
cur.execute(sql, args)

View File

@@ -44,7 +44,7 @@ class Cursor(Protocol):
def fetchone(self) -> Optional[Tuple]:
...
def fetchmany(self, size: Optional[int] = ...) -> List[Tuple]:
def fetchmany(self, size: int = ...) -> List[Tuple]:
...
def fetchall(self) -> List[Tuple]:

View File

@@ -916,7 +916,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
"agg": "GREATEST" if self._positive else "LEAST",
}
pos = (self.get_current_token_for_writer(self._instance_name),)
pos = self.get_current_token_for_writer(self._instance_name)
txn.execute(sql, (self._stream_name, self._instance_name, pos))

View File

@@ -971,8 +971,12 @@ def setup_test_homeserver(
if USE_POSTGRES_FOR_TESTS:
test_db = "synapse_test_%s" % uuid.uuid4().hex
if USE_POSTGRES_FOR_TESTS == "psycopg":
db_type = "psycopg"
else:
db_type = "psycopg2"
database_config = {
"name": "psycopg2",
"name": db_type,
"args": {
"dbname": test_db,
"host": POSTGRES_HOST,
@@ -1030,8 +1034,6 @@ def setup_test_homeserver(
# Create the database before we actually try and connect to it, based off
# the template database we generate in setupdb()
if isinstance(db_engine, PostgresEngine):
import psycopg2.extensions
db_conn = db_engine.module.connect(
dbname=POSTGRES_BASE_DB,
user=POSTGRES_USER,
@@ -1039,8 +1041,7 @@ def setup_test_homeserver(
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
assert isinstance(db_conn, psycopg2.extensions.connection)
db_conn.autocommit = True
db_engine.attempt_to_set_autocommit(db_conn, True)
cur = db_conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
cur.execute(
@@ -1070,9 +1071,6 @@ def setup_test_homeserver(
# We need to do cleanup on PostgreSQL
def cleanup() -> None:
import psycopg2
import psycopg2.extensions
# Close all the db pools
database_pool._db_pool.close()
@@ -1086,8 +1084,7 @@ def setup_test_homeserver(
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
assert isinstance(db_conn, psycopg2.extensions.connection)
db_conn.autocommit = True
db_engine.attempt_to_set_autocommit(db_conn, True)
cur = db_conn.cursor()
# Try a few times to drop the DB. Some things may hold on to the
@@ -1099,7 +1096,7 @@ def setup_test_homeserver(
cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
db_conn.commit()
dropped = True
except psycopg2.OperationalError as e:
except db_engine.OperationalError as e:
warnings.warn(
"Couldn't drop old db: " + str(e),
category=UserWarning,

View File

@@ -59,7 +59,10 @@ def setupdb() -> None:
# If we're using PostgreSQL, set up the db once
if USE_POSTGRES_FOR_TESTS:
# create a PostgresEngine
db_engine = create_engine({"name": "psycopg2", "args": {}})
if USE_POSTGRES_FOR_TESTS == "psycopg":
db_engine = create_engine({"name": "psycopg", "args": {}})
else:
db_engine = create_engine({"name": "psycopg2", "args": {}})
# connect to postgres to create the base database.
db_conn = db_engine.module.connect(
user=POSTGRES_USER,