Compare commits
10 Commits
rav/drop_s
...
rei/synwor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
00b5bba66e | ||
|
|
01747a28ca | ||
|
|
c347978164 | ||
|
|
415ba59e8b | ||
|
|
2b0ebf1b4d | ||
|
|
0b9f7b123e | ||
|
|
4332493df3 | ||
|
|
b8c6fd979a | ||
|
|
c14dcea4b6 | ||
|
|
22a7b762bc |
327
.github/workflows/tests.yml
vendored
327
.github/workflows/tests.yml
vendored
@@ -10,324 +10,15 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
check-sampleconfig:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v2
|
||||
- run: pip install .
|
||||
- run: scripts-dev/generate_sample_config.sh --check
|
||||
- run: scripts-dev/config-lint.sh
|
||||
|
||||
check-schema-delta:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v2
|
||||
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
|
||||
- run: scripts-dev/check_schema_delta.py --force-colors
|
||||
|
||||
lint:
|
||||
uses: "matrix-org/backend-meta/.github/workflows/python-poetry-ci.yml@v1"
|
||||
with:
|
||||
typechecking-extras: "all"
|
||||
|
||||
lint-crlf:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Check line endings
|
||||
run: scripts-dev/check_line_terminators.sh
|
||||
|
||||
lint-newsfile:
|
||||
if: ${{ github.base_ref == 'develop' || contains(github.base_ref, 'release-') }}
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
fetch-depth: 0
|
||||
- uses: actions/setup-python@v2
|
||||
- run: "pip install 'towncrier>=18.6.0rc1'"
|
||||
- run: scripts-dev/check-newsfragment.sh
|
||||
env:
|
||||
PULL_REQUEST_NUMBER: ${{ github.event.number }}
|
||||
|
||||
# Dummy step to gate other tests on without repeating the whole list
|
||||
linting-done:
|
||||
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
|
||||
needs: [lint, lint-crlf, lint-newsfile, check-sampleconfig, check-schema-delta]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: "true"
|
||||
|
||||
trial:
|
||||
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.7", "3.8", "3.9", "3.10"]
|
||||
database: ["sqlite"]
|
||||
extras: ["all"]
|
||||
include:
|
||||
# Newest Python without optional deps
|
||||
- python-version: "3.10"
|
||||
extras: ""
|
||||
|
||||
# Oldest Python with PostgreSQL
|
||||
- python-version: "3.7"
|
||||
database: "postgres"
|
||||
postgres-version: "10"
|
||||
extras: "all"
|
||||
|
||||
# Newest Python with newest PostgreSQL
|
||||
- python-version: "3.10"
|
||||
database: "postgres"
|
||||
postgres-version: "14"
|
||||
extras: "all"
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- run: sudo apt-get -qq install xmlsec1
|
||||
- name: Set up PostgreSQL ${{ matrix.postgres-version }}
|
||||
if: ${{ matrix.postgres-version }}
|
||||
run: |
|
||||
docker run -d -p 5432:5432 \
|
||||
-e POSTGRES_PASSWORD=postgres \
|
||||
-e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
|
||||
postgres:${{ matrix.postgres-version }}
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
extras: ${{ matrix.extras }}
|
||||
- name: Await PostgreSQL
|
||||
if: ${{ matrix.postgres-version }}
|
||||
timeout-minutes: 2
|
||||
run: until pg_isready -h localhost; do sleep 1; done
|
||||
- run: poetry run trial --jobs=2 tests
|
||||
env:
|
||||
SYNAPSE_POSTGRES: ${{ matrix.database == 'postgres' || '' }}
|
||||
SYNAPSE_POSTGRES_HOST: localhost
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
- name: Dump logs
|
||||
# Logs are most useful when the command fails, always include them.
|
||||
if: ${{ always() }}
|
||||
# Note: Dumps to workflow logs instead of using actions/upload-artifact
|
||||
# This keeps logs colocated with failing jobs
|
||||
# It also ignores find's exit code; this is a best effort affair
|
||||
run: >-
|
||||
find _trial_temp -name '*.log'
|
||||
-exec echo "::group::{}" \;
|
||||
-exec cat {} \;
|
||||
-exec echo "::endgroup::" \;
|
||||
|| true
|
||||
|
||||
trial-olddeps:
|
||||
# Note: sqlite only; no postgres
|
||||
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Test with old deps
|
||||
uses: docker://ubuntu:focal # For old python and sqlite
|
||||
# Note: focal seems to be using 3.8, but the oldest is 3.7?
|
||||
# See https://github.com/matrix-org/synapse/issues/12343
|
||||
with:
|
||||
workdir: /github/workspace
|
||||
entrypoint: .ci/scripts/test_old_deps.sh
|
||||
- name: Dump logs
|
||||
# Logs are most useful when the command fails, always include them.
|
||||
if: ${{ always() }}
|
||||
# Note: Dumps to workflow logs instead of using actions/upload-artifact
|
||||
# This keeps logs colocated with failing jobs
|
||||
# It also ignores find's exit code; this is a best effort affair
|
||||
run: >-
|
||||
find _trial_temp -name '*.log'
|
||||
-exec echo "::group::{}" \;
|
||||
-exec cat {} \;
|
||||
-exec echo "::endgroup::" \;
|
||||
|| true
|
||||
|
||||
trial-pypy:
|
||||
# Very slow; only run if the branch name includes 'pypy'
|
||||
# Note: sqlite only; no postgres. Completely untested since poetry move.
|
||||
if: ${{ contains(github.ref, 'pypy') && !failure() && !cancelled() }}
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["pypy-3.7"]
|
||||
extras: ["all"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
# Install libs necessary for PyPy to build binary wheels for dependencies
|
||||
- run: sudo apt-get -qq install xmlsec1 libxml2-dev libxslt-dev
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
extras: ${{ matrix.extras }}
|
||||
- run: poetry run trial --jobs=2 tests
|
||||
- name: Dump logs
|
||||
# Logs are most useful when the command fails, always include them.
|
||||
if: ${{ always() }}
|
||||
# Note: Dumps to workflow logs instead of using actions/upload-artifact
|
||||
# This keeps logs colocated with failing jobs
|
||||
# It also ignores find's exit code; this is a best effort affair
|
||||
run: >-
|
||||
find _trial_temp -name '*.log'
|
||||
-exec echo "::group::{}" \;
|
||||
-exec cat {} \;
|
||||
-exec echo "::endgroup::" \;
|
||||
|| true
|
||||
|
||||
sytest:
|
||||
if: ${{ !failure() && !cancelled() }}
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: matrixdotorg/sytest-synapse:${{ matrix.sytest-tag }}
|
||||
volumes:
|
||||
- ${{ github.workspace }}:/src
|
||||
env:
|
||||
SYTEST_BRANCH: ${{ github.head_ref }}
|
||||
POSTGRES: ${{ matrix.postgres && 1}}
|
||||
MULTI_POSTGRES: ${{ (matrix.postgres == 'multi-postgres') && 1}}
|
||||
WORKERS: ${{ matrix.workers && 1 }}
|
||||
REDIS: ${{ matrix.redis && 1 }}
|
||||
BLACKLIST: ${{ matrix.workers && 'synapse-blacklist-with-workers' }}
|
||||
TOP: ${{ github.workspace }}
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- sytest-tag: focal
|
||||
|
||||
- sytest-tag: focal
|
||||
postgres: postgres
|
||||
|
||||
- sytest-tag: testing
|
||||
postgres: postgres
|
||||
|
||||
- sytest-tag: focal
|
||||
postgres: multi-postgres
|
||||
workers: workers
|
||||
|
||||
- sytest-tag: buster
|
||||
postgres: multi-postgres
|
||||
workers: workers
|
||||
|
||||
- sytest-tag: buster
|
||||
postgres: postgres
|
||||
workers: workers
|
||||
redis: redis
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Prepare test blacklist
|
||||
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
|
||||
- name: Run SyTest
|
||||
run: /bootstrap.sh synapse
|
||||
working-directory: /src
|
||||
- name: Summarise results.tap
|
||||
if: ${{ always() }}
|
||||
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
|
||||
- name: Upload SyTest logs
|
||||
uses: actions/upload-artifact@v2
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: Sytest Logs - ${{ job.status }} - (${{ join(matrix.*, ', ') }})
|
||||
path: |
|
||||
/logs/results.tap
|
||||
/logs/**/*.log*
|
||||
|
||||
export-data:
|
||||
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
|
||||
needs: [linting-done, portdb]
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
TOP: ${{ github.workspace }}
|
||||
|
||||
services:
|
||||
postgres:
|
||||
image: postgres
|
||||
ports:
|
||||
- 5432:5432
|
||||
env:
|
||||
POSTGRES_PASSWORD: "postgres"
|
||||
POSTGRES_INITDB_ARGS: "--lc-collate C --lc-ctype C --encoding UTF8"
|
||||
options: >-
|
||||
--health-cmd pg_isready
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- run: sudo apt-get -qq install xmlsec1
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
extras: "postgres"
|
||||
- run: .ci/scripts/test_export_data_command.sh
|
||||
|
||||
portdb:
|
||||
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
TOP: ${{ github.workspace }}
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- python-version: "3.7"
|
||||
postgres-version: "10"
|
||||
|
||||
- python-version: "3.10"
|
||||
postgres-version: "14"
|
||||
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:${{ matrix.postgres-version }}
|
||||
ports:
|
||||
- 5432:5432
|
||||
env:
|
||||
POSTGRES_PASSWORD: "postgres"
|
||||
POSTGRES_INITDB_ARGS: "--lc-collate C --lc-ctype C --encoding UTF8"
|
||||
options: >-
|
||||
--health-cmd pg_isready
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- run: sudo apt-get -qq install xmlsec1
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
extras: "postgres"
|
||||
- run: .ci/scripts/test_synapse_port_db.sh
|
||||
|
||||
complement:
|
||||
if: "${{ !failure() && !cancelled() }}"
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- arrangement: monolith
|
||||
database: SQLite
|
||||
|
||||
- arrangement: monolith
|
||||
database: Postgres
|
||||
|
||||
- arrangement: workers
|
||||
database: Postgres
|
||||
|
||||
@@ -342,7 +33,14 @@ jobs:
|
||||
|
||||
- run: |
|
||||
set -o pipefail
|
||||
POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
|
||||
synapse/scripts-dev/complement.sh --build-only
|
||||
while :; do
|
||||
POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} SYNAPSE_TEST_LOG_LEVEL=DEBUG COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -f -run TestSendJoinPartialStateResponse -json 2>&1 | gotestfmt | tee /tmp/xxx
|
||||
if grep ❌ /tmp/xxx; then
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
shell: bash
|
||||
name: Run Complement Tests
|
||||
|
||||
@@ -350,15 +48,6 @@ jobs:
|
||||
tests-done:
|
||||
if: ${{ always() }}
|
||||
needs:
|
||||
- check-sampleconfig
|
||||
- lint
|
||||
- lint-crlf
|
||||
- lint-newsfile
|
||||
- trial
|
||||
- trial-olddeps
|
||||
- sytest
|
||||
- export-data
|
||||
- portdb
|
||||
- complement
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
|
||||
@@ -36,10 +36,6 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
per data store (and not one per physical database).
|
||||
"""
|
||||
|
||||
# if set to False, we will query the `state_events` and `rejections` tables when
|
||||
# fetching event data. When True, we rely on it all being in the `events` table.
|
||||
STATE_KEY_IN_EVENTS = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
|
||||
@@ -257,30 +257,17 @@ class PersistEventsStore:
|
||||
def _get_events_which_are_prevs_txn(
|
||||
txn: LoggingTransaction, batch: Collection[str]
|
||||
) -> None:
|
||||
if self.store.STATE_KEY_IN_EVENTS:
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND events.rejection_reason IS NULL
|
||||
AND
|
||||
"""
|
||||
|
||||
else:
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND rejections.event_id IS NULL
|
||||
AND
|
||||
"""
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND rejections.event_id IS NULL
|
||||
AND
|
||||
"""
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "prev_event_id", batch
|
||||
@@ -324,19 +311,7 @@ class PersistEventsStore:
|
||||
) -> None:
|
||||
to_recursively_check = batch
|
||||
|
||||
if self.store.STATE_KEY_IN_EVENTS:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
events.rejection_reason IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
NOT events.outlier
|
||||
AND
|
||||
"""
|
||||
else:
|
||||
while to_recursively_check:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
@@ -350,7 +325,6 @@ class PersistEventsStore:
|
||||
AND
|
||||
"""
|
||||
|
||||
while to_recursively_check:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "event_id", to_recursively_check
|
||||
)
|
||||
@@ -556,7 +530,6 @@ class PersistEventsStore:
|
||||
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
|
||||
|
||||
self._add_chain_cover_index(
|
||||
self.store.STATE_KEY_IN_EVENTS,
|
||||
txn,
|
||||
self.db_pool,
|
||||
self.store.event_chain_id_gen,
|
||||
@@ -568,7 +541,6 @@ class PersistEventsStore:
|
||||
@classmethod
|
||||
def _add_chain_cover_index(
|
||||
cls,
|
||||
state_key_in_events: bool,
|
||||
txn: LoggingTransaction,
|
||||
db_pool: DatabasePool,
|
||||
event_chain_id_gen: SequenceGenerator,
|
||||
@@ -579,8 +551,6 @@ class PersistEventsStore:
|
||||
"""Calculate the chain cover index for the given events.
|
||||
|
||||
Args:
|
||||
state_key_in_events: whether to use the `state_key` column in the `events`
|
||||
table in preference to the `state_events` table
|
||||
event_to_room_id: Event ID to the room ID of the event
|
||||
event_to_types: Event ID to type and state_key of the event
|
||||
event_to_auth_chain: Event ID to list of auth event IDs of the
|
||||
@@ -640,15 +610,7 @@ class PersistEventsStore:
|
||||
|
||||
# We loop here in case we find an out of band membership and need to
|
||||
# fetch their auth event info.
|
||||
if state_key_in_events:
|
||||
sql = """
|
||||
SELECT event_id, events.type, events.state_key, chain_id, sequence_number
|
||||
FROM events
|
||||
LEFT JOIN event_auth_chains USING (event_id)
|
||||
WHERE
|
||||
events.state_key IS NOT NULL AND
|
||||
"""
|
||||
else:
|
||||
while missing_auth_chains:
|
||||
sql = """
|
||||
SELECT event_id, events.type, se.state_key, chain_id, sequence_number
|
||||
FROM events
|
||||
@@ -656,8 +618,6 @@ class PersistEventsStore:
|
||||
LEFT JOIN event_auth_chains USING (event_id)
|
||||
WHERE
|
||||
"""
|
||||
|
||||
while missing_auth_chains:
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine,
|
||||
"event_id",
|
||||
@@ -1681,31 +1641,22 @@ class PersistEventsStore:
|
||||
) -> None:
|
||||
to_prefill = []
|
||||
|
||||
rows = []
|
||||
|
||||
ev_map = {e.event_id: e for e, _ in events_and_contexts}
|
||||
if not ev_map:
|
||||
return
|
||||
|
||||
if self.store.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" e.rejection_reason as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE "
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" rej.event_id as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN rejections as rej USING (event_id)"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE "
|
||||
)
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" rej.event_id as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN rejections as rej USING (event_id)"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE "
|
||||
)
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "e.event_id", list(ev_map)
|
||||
|
||||
@@ -67,8 +67,6 @@ class _BackgroundUpdates:
|
||||
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
||||
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
||||
|
||||
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _CalculateChainCover:
|
||||
@@ -255,11 +253,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
replaces_index="ev_edges_id",
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
|
||||
self._background_events_populate_state_key_rejections,
|
||||
)
|
||||
|
||||
async def _background_reindex_fields_sender(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
@@ -448,10 +441,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
# First, we get `batch_size` events from the table, pulling out
|
||||
# their successor events, if any, and the successor events'
|
||||
# rejection status.
|
||||
|
||||
# this should happen before the bg update which drops 'rejections'
|
||||
assert not self.STATE_KEY_IN_EVENTS
|
||||
|
||||
txn.execute(
|
||||
"""SELECT prev_event_id, event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL, events.outlier
|
||||
@@ -977,9 +966,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
extra_clause = "AND events.room_id = ?"
|
||||
tuple_args.append(last_room_id)
|
||||
|
||||
# this should happen before the bg update which drops 'state_events'
|
||||
assert not self.STATE_KEY_IN_EVENTS
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, state_events.type, state_events.state_key,
|
||||
@@ -1048,10 +1034,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
|
||||
# Calculate and persist the chain cover index for this set of events.
|
||||
#
|
||||
# Annoyingly we need to gut wrench into the persist event store so that
|
||||
# Annoyingly we need to gut wrench into the persit event store so that
|
||||
# we can reuse the function to calculate the chain cover for rooms.
|
||||
PersistEventsStore._add_chain_cover_index(
|
||||
False,
|
||||
txn,
|
||||
self.db_pool,
|
||||
self.event_chain_id_gen, # type: ignore[attr-defined]
|
||||
@@ -1414,95 +1399,3 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
async def _background_events_populate_state_key_rejections(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""Back-populate `events.state_key` and `events.rejection_reason"""
|
||||
|
||||
min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
|
||||
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
|
||||
|
||||
def _populate_txn(txn: LoggingTransaction) -> bool:
|
||||
"""Returns True if we're done."""
|
||||
|
||||
# first we need to find an endpoint.
|
||||
# we need to find the final row in the batch of batch_size, which means
|
||||
# we need to skip over (batch_size-1) rows and get the next row.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering
|
||||
LIMIT 1 OFFSET ?
|
||||
""",
|
||||
(
|
||||
min_stream_ordering_exclusive,
|
||||
max_stream_ordering_inclusive,
|
||||
batch_size - 1,
|
||||
),
|
||||
)
|
||||
|
||||
endpoint = None
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
endpoint = row[0]
|
||||
|
||||
where_clause = "e.stream_ordering > ?"
|
||||
args = [min_stream_ordering_exclusive]
|
||||
if endpoint:
|
||||
where_clause += " AND e.stream_ordering <= ?"
|
||||
args.append(endpoint)
|
||||
|
||||
# now do the updates. We consider rows within our range of stream orderings,
|
||||
# but only those with a non-null rejection reason or state_key (since there
|
||||
# is nothing to update for rows where rejection reason and state_key are
|
||||
# both null.
|
||||
txn.execute(
|
||||
f"""
|
||||
WITH t AS (
|
||||
SELECT e.event_id, r.reason, se.state_key
|
||||
FROM events e
|
||||
LEFT JOIN rejections r USING (event_id)
|
||||
LEFT JOIN state_events se USING (event_id)
|
||||
WHERE ({where_clause}) AND (
|
||||
r.reason IS NOT NULL OR se.state_key IS NOT NULL
|
||||
)
|
||||
)
|
||||
UPDATE events
|
||||
SET rejection_reason=t.reason, state_key=t.state_key
|
||||
FROM t WHERE events.event_id = t.event_id
|
||||
""",
|
||||
args,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"populated new `events` columns up to %s/%i: updated %i/%i rows",
|
||||
endpoint,
|
||||
max_stream_ordering_inclusive,
|
||||
txn.rowcount,
|
||||
batch_size,
|
||||
)
|
||||
|
||||
if endpoint is None:
|
||||
# we're done
|
||||
return True
|
||||
|
||||
progress["min_stream_ordering_exclusive"] = endpoint
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
|
||||
progress,
|
||||
)
|
||||
return False
|
||||
|
||||
done = await self.db_pool.runInteraction(
|
||||
desc="events_populate_state_key_rejections", func=_populate_txn
|
||||
)
|
||||
|
||||
if done:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
@@ -1482,35 +1482,20 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
def get_all_new_forward_event_rows(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||
return cast(
|
||||
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
|
||||
@@ -1538,36 +1523,21 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
def get_ex_outlier_stream_rows_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering ASC"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering ASC"
|
||||
)
|
||||
sql = (
|
||||
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" LEFT JOIN room_memberships USING (event_id)"
|
||||
" LEFT JOIN rejections USING (event_id)"
|
||||
" WHERE ? < event_stream_ordering"
|
||||
" AND event_stream_ordering <= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering ASC"
|
||||
)
|
||||
|
||||
txn.execute(sql, (last_id, current_id, instance_name))
|
||||
return cast(
|
||||
@@ -1611,32 +1581,18 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
def get_all_new_backfill_event_rows(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]:
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
sql = (
|
||||
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||
" AND instance_name = ?"
|
||||
" ORDER BY stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
|
||||
new_event_updates: List[
|
||||
Tuple[int, Tuple[str, str, str, str, str, str]]
|
||||
@@ -1655,34 +1611,19 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
else:
|
||||
upper_bound = current_id
|
||||
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" e.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
|
||||
sql = (
|
||||
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
|
||||
" se.state_key, redacts, relates_to_id"
|
||||
" FROM events AS e"
|
||||
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
|
||||
" LEFT JOIN redactions USING (event_id)"
|
||||
" LEFT JOIN state_events AS se USING (event_id)"
|
||||
" LEFT JOIN event_relations USING (event_id)"
|
||||
" WHERE ? > event_stream_ordering"
|
||||
" AND event_stream_ordering >= ?"
|
||||
" AND out.instance_name = ?"
|
||||
" ORDER BY event_stream_ordering DESC"
|
||||
)
|
||||
txn.execute(sql, (-last_id, -upper_bound, instance_name))
|
||||
# Type safety: iterating over `txn` yields `Tuple`, i.e.
|
||||
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a
|
||||
|
||||
@@ -122,11 +122,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
logger.info("[purge] looking for events to delete")
|
||||
|
||||
should_delete_expr = (
|
||||
"e.state_key IS NULL"
|
||||
if self.STATE_KEY_IN_EVENTS
|
||||
else "state_events.state_key IS NULL"
|
||||
)
|
||||
should_delete_expr = "state_events.state_key IS NULL"
|
||||
should_delete_params: Tuple[Any, ...] = ()
|
||||
if not delete_local_events:
|
||||
should_delete_expr += " AND event_id NOT LIKE ?"
|
||||
@@ -138,23 +134,12 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
# Note that we insert events that are outliers and aren't going to be
|
||||
# deleted, as nothing will happen to them.
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
sqlf = """
|
||||
INSERT INTO events_to_purge
|
||||
SELECT event_id, %s
|
||||
FROM events AS e
|
||||
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
|
||||
"""
|
||||
else:
|
||||
sqlf = """
|
||||
INSERT INTO events_to_purge
|
||||
SELECT event_id, %s
|
||||
FROM events AS e LEFT JOIN state_events USING (event_id)
|
||||
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
sqlf % (should_delete_expr, should_delete_expr),
|
||||
"INSERT INTO events_to_purge"
|
||||
" SELECT event_id, %s"
|
||||
" FROM events AS e LEFT JOIN state_events USING (event_id)"
|
||||
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
|
||||
% (should_delete_expr, should_delete_expr),
|
||||
should_delete_params,
|
||||
)
|
||||
|
||||
|
||||
@@ -22,19 +22,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class RejectionsStore(SQLBaseStore):
|
||||
async def get_rejection_reason(self, event_id: str) -> Optional[str]:
|
||||
if self.STATE_KEY_IN_EVENTS:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
retcol="rejection_reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
allow_none=True,
|
||||
desc="get_rejection_reason",
|
||||
)
|
||||
else:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="rejections",
|
||||
retcol="reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
allow_none=True,
|
||||
desc="get_rejection_reason",
|
||||
)
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="rejections",
|
||||
retcol="reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
allow_none=True,
|
||||
desc="get_rejection_reason",
|
||||
)
|
||||
|
||||
@@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
return results_dict.get("membership"), results_dict.get("event_id")
|
||||
|
||||
@cached(max_entries=500000, iterable=True, prune_unread_entries=False)
|
||||
@cached(max_entries=500000, iterable=True, prune_unread_entries=False, debug_invalidations=True)
|
||||
async def get_rooms_for_user_with_stream_ordering(
|
||||
self, user_id: str
|
||||
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
|
||||
def run_create(cur: Cursor, database_engine, *args, **kwargs):
|
||||
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
|
||||
|
||||
# we know that any new events will have the columns populated (and that has been
|
||||
# the case since schema_version 68, so there is no chance of rolling back now).
|
||||
#
|
||||
# So, we only need to make sure that existing rows are updated. We read the
|
||||
# current min and max stream orderings, since that is guaranteed to include all
|
||||
# the events that were stored before the new columns were added.
|
||||
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
|
||||
(min_stream_ordering, max_stream_ordering) = cur.fetchone()
|
||||
|
||||
if min_stream_ordering is None:
|
||||
# no rows, nothing to do.
|
||||
return
|
||||
|
||||
cur.execute(
|
||||
"INSERT into background_updates (ordering, update_name, progress_json)"
|
||||
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
|
||||
(
|
||||
json.dumps(
|
||||
{
|
||||
"min_stream_ordering_exclusive": min_stream_ordering - 1,
|
||||
"max_stream_ordering_inclusive": max_stream_ordering,
|
||||
}
|
||||
),
|
||||
),
|
||||
)
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import enum
|
||||
import logging
|
||||
import threading
|
||||
from typing import (
|
||||
Callable,
|
||||
@@ -66,6 +67,7 @@ class DeferredCache(Generic[KT, VT]):
|
||||
"cache",
|
||||
"thread",
|
||||
"_pending_deferred_cache",
|
||||
"debug_invalidations"
|
||||
)
|
||||
|
||||
def __init__(
|
||||
@@ -76,6 +78,7 @@ class DeferredCache(Generic[KT, VT]):
|
||||
iterable: bool = False,
|
||||
apply_cache_factor_from_config: bool = True,
|
||||
prune_unread_entries: bool = True,
|
||||
debug_invalidations: bool = False,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -119,6 +122,7 @@ class DeferredCache(Generic[KT, VT]):
|
||||
)
|
||||
|
||||
self.thread: Optional[threading.Thread] = None
|
||||
self.debug_invalidations = debug_invalidations
|
||||
|
||||
@property
|
||||
def max_entries(self) -> int:
|
||||
@@ -310,6 +314,9 @@ class DeferredCache(Generic[KT, VT]):
|
||||
self.check_thread()
|
||||
self.cache.del_multi(key)
|
||||
|
||||
if self.debug_invalidations:
|
||||
logging.debug("Invalidating key %r in cache", key)
|
||||
|
||||
# if we have a pending lookup for this key, remove it from the
|
||||
# _pending_deferred_cache, which will (a) stop it being returned
|
||||
# for future queries and (b) stop it being persisted as a proper entry
|
||||
@@ -326,6 +333,8 @@ class DeferredCache(Generic[KT, VT]):
|
||||
entry.invalidate()
|
||||
|
||||
def invalidate_all(self) -> None:
|
||||
if self.debug_invalidations:
|
||||
logging.debug("Invalidating ALL keys")
|
||||
self.check_thread()
|
||||
self.cache.clear()
|
||||
for entry in self._pending_deferred_cache.values():
|
||||
|
||||
@@ -301,6 +301,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
|
||||
cache_context: bool = False,
|
||||
iterable: bool = False,
|
||||
prune_unread_entries: bool = True,
|
||||
debug_invalidations: bool = False
|
||||
):
|
||||
super().__init__(
|
||||
orig,
|
||||
@@ -318,6 +319,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
|
||||
self.tree = tree
|
||||
self.iterable = iterable
|
||||
self.prune_unread_entries = prune_unread_entries
|
||||
self.debug_invalidations = debug_invalidations
|
||||
|
||||
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
|
||||
cache: DeferredCache[CacheKey, Any] = DeferredCache(
|
||||
@@ -326,6 +328,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
|
||||
tree=self.tree,
|
||||
iterable=self.iterable,
|
||||
prune_unread_entries=self.prune_unread_entries,
|
||||
debug_invalidations=self.debug_invalidations,
|
||||
)
|
||||
|
||||
get_cache_key = self.cache_key_builder
|
||||
@@ -577,6 +580,7 @@ def cached(
|
||||
cache_context: bool = False,
|
||||
iterable: bool = False,
|
||||
prune_unread_entries: bool = True,
|
||||
debug_invalidations: bool = False,
|
||||
) -> Callable[[F], _CachedFunction[F]]:
|
||||
func = lambda orig: DeferredCacheDescriptor(
|
||||
orig,
|
||||
@@ -587,6 +591,7 @@ def cached(
|
||||
cache_context=cache_context,
|
||||
iterable=iterable,
|
||||
prune_unread_entries=prune_unread_entries,
|
||||
debug_invalidations=debug_invalidations
|
||||
)
|
||||
|
||||
return cast(Callable[[F], _CachedFunction[F]], func)
|
||||
|
||||
Reference in New Issue
Block a user