1
0

Compare commits

..

10 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
00b5bba66e CCCC 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
01747a28ca BBBB 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
c347978164 AAAA# 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
415ba59e8b DBG cache all 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
2b0ebf1b4d DBG cache 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
0b9f7b123e DBG retry until fail 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
4332493df3 DBG set debug 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
b8c6fd979a DBG limit 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
c14dcea4b6 DBG 2022-07-07 16:05:22 +01:00
Olivier Wilkinson (reivilibre)
22a7b762bc TMP flake debug code 2022-07-07 16:05:22 +01:00
12 changed files with 116 additions and 704 deletions

View File

@@ -10,324 +10,15 @@ concurrency:
cancel-in-progress: true
jobs:
check-sampleconfig:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: pip install .
- run: scripts-dev/generate_sample_config.sh --check
- run: scripts-dev/config-lint.sh
check-schema-delta:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
- run: scripts-dev/check_schema_delta.py --force-colors
lint:
uses: "matrix-org/backend-meta/.github/workflows/python-poetry-ci.yml@v1"
with:
typechecking-extras: "all"
lint-crlf:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Check line endings
run: scripts-dev/check_line_terminators.sh
lint-newsfile:
if: ${{ github.base_ref == 'develop' || contains(github.base_ref, 'release-') }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
- uses: actions/setup-python@v2
- run: "pip install 'towncrier>=18.6.0rc1'"
- run: scripts-dev/check-newsfragment.sh
env:
PULL_REQUEST_NUMBER: ${{ github.event.number }}
# Dummy step to gate other tests on without repeating the whole list
linting-done:
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
needs: [lint, lint-crlf, lint-newsfile, check-sampleconfig, check-schema-delta]
runs-on: ubuntu-latest
steps:
- run: "true"
trial:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
database: ["sqlite"]
extras: ["all"]
include:
# Newest Python without optional deps
- python-version: "3.10"
extras: ""
# Oldest Python with PostgreSQL
- python-version: "3.7"
database: "postgres"
postgres-version: "10"
extras: "all"
# Newest Python with newest PostgreSQL
- python-version: "3.10"
database: "postgres"
postgres-version: "14"
extras: "all"
steps:
- uses: actions/checkout@v2
- run: sudo apt-get -qq install xmlsec1
- name: Set up PostgreSQL ${{ matrix.postgres-version }}
if: ${{ matrix.postgres-version }}
run: |
docker run -d -p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
postgres:${{ matrix.postgres-version }}
- uses: matrix-org/setup-python-poetry@v1
with:
python-version: ${{ matrix.python-version }}
extras: ${{ matrix.extras }}
- name: Await PostgreSQL
if: ${{ matrix.postgres-version }}
timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done
- run: poetry run trial --jobs=2 tests
env:
SYNAPSE_POSTGRES: ${{ matrix.database == 'postgres' || '' }}
SYNAPSE_POSTGRES_HOST: localhost
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
- name: Dump logs
# Logs are most useful when the command fails, always include them.
if: ${{ always() }}
# Note: Dumps to workflow logs instead of using actions/upload-artifact
# This keeps logs colocated with failing jobs
# It also ignores find's exit code; this is a best effort affair
run: >-
find _trial_temp -name '*.log'
-exec echo "::group::{}" \;
-exec cat {} \;
-exec echo "::endgroup::" \;
|| true
trial-olddeps:
# Note: sqlite only; no postgres
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Test with old deps
uses: docker://ubuntu:focal # For old python and sqlite
# Note: focal seems to be using 3.8, but the oldest is 3.7?
# See https://github.com/matrix-org/synapse/issues/12343
with:
workdir: /github/workspace
entrypoint: .ci/scripts/test_old_deps.sh
- name: Dump logs
# Logs are most useful when the command fails, always include them.
if: ${{ always() }}
# Note: Dumps to workflow logs instead of using actions/upload-artifact
# This keeps logs colocated with failing jobs
# It also ignores find's exit code; this is a best effort affair
run: >-
find _trial_temp -name '*.log'
-exec echo "::group::{}" \;
-exec cat {} \;
-exec echo "::endgroup::" \;
|| true
trial-pypy:
# Very slow; only run if the branch name includes 'pypy'
# Note: sqlite only; no postgres. Completely untested since poetry move.
if: ${{ contains(github.ref, 'pypy') && !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["pypy-3.7"]
extras: ["all"]
steps:
- uses: actions/checkout@v2
# Install libs necessary for PyPy to build binary wheels for dependencies
- run: sudo apt-get -qq install xmlsec1 libxml2-dev libxslt-dev
- uses: matrix-org/setup-python-poetry@v1
with:
python-version: ${{ matrix.python-version }}
extras: ${{ matrix.extras }}
- run: poetry run trial --jobs=2 tests
- name: Dump logs
# Logs are most useful when the command fails, always include them.
if: ${{ always() }}
# Note: Dumps to workflow logs instead of using actions/upload-artifact
# This keeps logs colocated with failing jobs
# It also ignores find's exit code; this is a best effort affair
run: >-
find _trial_temp -name '*.log'
-exec echo "::group::{}" \;
-exec cat {} \;
-exec echo "::endgroup::" \;
|| true
sytest:
if: ${{ !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
container:
image: matrixdotorg/sytest-synapse:${{ matrix.sytest-tag }}
volumes:
- ${{ github.workspace }}:/src
env:
SYTEST_BRANCH: ${{ github.head_ref }}
POSTGRES: ${{ matrix.postgres && 1}}
MULTI_POSTGRES: ${{ (matrix.postgres == 'multi-postgres') && 1}}
WORKERS: ${{ matrix.workers && 1 }}
REDIS: ${{ matrix.redis && 1 }}
BLACKLIST: ${{ matrix.workers && 'synapse-blacklist-with-workers' }}
TOP: ${{ github.workspace }}
strategy:
fail-fast: false
matrix:
include:
- sytest-tag: focal
- sytest-tag: focal
postgres: postgres
- sytest-tag: testing
postgres: postgres
- sytest-tag: focal
postgres: multi-postgres
workers: workers
- sytest-tag: buster
postgres: multi-postgres
workers: workers
- sytest-tag: buster
postgres: postgres
workers: workers
redis: redis
steps:
- uses: actions/checkout@v2
- name: Prepare test blacklist
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
- name: Run SyTest
run: /bootstrap.sh synapse
working-directory: /src
- name: Summarise results.tap
if: ${{ always() }}
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
- name: Upload SyTest logs
uses: actions/upload-artifact@v2
if: ${{ always() }}
with:
name: Sytest Logs - ${{ job.status }} - (${{ join(matrix.*, ', ') }})
path: |
/logs/results.tap
/logs/**/*.log*
export-data:
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
needs: [linting-done, portdb]
runs-on: ubuntu-latest
env:
TOP: ${{ github.workspace }}
services:
postgres:
image: postgres
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: "postgres"
POSTGRES_INITDB_ARGS: "--lc-collate C --lc-ctype C --encoding UTF8"
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
- run: sudo apt-get -qq install xmlsec1
- uses: matrix-org/setup-python-poetry@v1
with:
python-version: ${{ matrix.python-version }}
extras: "postgres"
- run: .ci/scripts/test_export_data_command.sh
portdb:
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
env:
TOP: ${{ github.workspace }}
strategy:
matrix:
include:
- python-version: "3.7"
postgres-version: "10"
- python-version: "3.10"
postgres-version: "14"
services:
postgres:
image: postgres:${{ matrix.postgres-version }}
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: "postgres"
POSTGRES_INITDB_ARGS: "--lc-collate C --lc-ctype C --encoding UTF8"
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
- run: sudo apt-get -qq install xmlsec1
- uses: matrix-org/setup-python-poetry@v1
with:
python-version: ${{ matrix.python-version }}
extras: "postgres"
- run: .ci/scripts/test_synapse_port_db.sh
complement:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- arrangement: monolith
database: SQLite
- arrangement: monolith
database: Postgres
- arrangement: workers
database: Postgres
@@ -342,7 +33,14 @@ jobs:
- run: |
set -o pipefail
POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
synapse/scripts-dev/complement.sh --build-only
while :; do
POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} SYNAPSE_TEST_LOG_LEVEL=DEBUG COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -f -run TestSendJoinPartialStateResponse -json 2>&1 | gotestfmt | tee /tmp/xxx
if grep ❌ /tmp/xxx; then
break
fi
done
shell: bash
name: Run Complement Tests
@@ -350,15 +48,6 @@ jobs:
tests-done:
if: ${{ always() }}
needs:
- check-sampleconfig
- lint
- lint-crlf
- lint-newsfile
- trial
- trial-olddeps
- sytest
- export-data
- portdb
- complement
runs-on: ubuntu-latest
steps:

View File

@@ -1 +0,0 @@
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.

View File

@@ -36,10 +36,6 @@ class SQLBaseStore(metaclass=ABCMeta):
per data store (and not one per physical database).
"""
# if set to False, we will query the `state_events` and `rejections` tables when
# fetching event data. When True, we rely on it all being in the `events` table.
STATE_KEY_IN_EVENTS = False
def __init__(
self,
database: DatabasePool,

View File

@@ -257,30 +257,17 @@ class PersistEventsStore:
def _get_events_which_are_prevs_txn(
txn: LoggingTransaction, batch: Collection[str]
) -> None:
if self.store.STATE_KEY_IN_EVENTS:
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND events.rejection_reason IS NULL
AND
"""
else:
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND rejections.event_id IS NULL
AND
"""
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND rejections.event_id IS NULL
AND
"""
clause, args = make_in_list_sql_clause(
self.database_engine, "prev_event_id", batch
@@ -324,19 +311,7 @@ class PersistEventsStore:
) -> None:
to_recursively_check = batch
if self.store.STATE_KEY_IN_EVENTS:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
events.rejection_reason IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND
"""
else:
while to_recursively_check:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
@@ -350,7 +325,6 @@ class PersistEventsStore:
AND
"""
while to_recursively_check:
clause, args = make_in_list_sql_clause(
self.database_engine, "event_id", to_recursively_check
)
@@ -556,7 +530,6 @@ class PersistEventsStore:
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
self._add_chain_cover_index(
self.store.STATE_KEY_IN_EVENTS,
txn,
self.db_pool,
self.store.event_chain_id_gen,
@@ -568,7 +541,6 @@ class PersistEventsStore:
@classmethod
def _add_chain_cover_index(
cls,
state_key_in_events: bool,
txn: LoggingTransaction,
db_pool: DatabasePool,
event_chain_id_gen: SequenceGenerator,
@@ -579,8 +551,6 @@ class PersistEventsStore:
"""Calculate the chain cover index for the given events.
Args:
state_key_in_events: whether to use the `state_key` column in the `events`
table in preference to the `state_events` table
event_to_room_id: Event ID to the room ID of the event
event_to_types: Event ID to type and state_key of the event
event_to_auth_chain: Event ID to list of auth event IDs of the
@@ -640,15 +610,7 @@ class PersistEventsStore:
# We loop here in case we find an out of band membership and need to
# fetch their auth event info.
if state_key_in_events:
sql = """
SELECT event_id, events.type, events.state_key, chain_id, sequence_number
FROM events
LEFT JOIN event_auth_chains USING (event_id)
WHERE
events.state_key IS NOT NULL AND
"""
else:
while missing_auth_chains:
sql = """
SELECT event_id, events.type, se.state_key, chain_id, sequence_number
FROM events
@@ -656,8 +618,6 @@ class PersistEventsStore:
LEFT JOIN event_auth_chains USING (event_id)
WHERE
"""
while missing_auth_chains:
clause, args = make_in_list_sql_clause(
txn.database_engine,
"event_id",
@@ -1681,31 +1641,22 @@ class PersistEventsStore:
) -> None:
to_prefill = []
rows = []
ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:
return
if self.store.STATE_KEY_IN_EVENTS:
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" e.rejection_reason as rejects "
" FROM events as e"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
else:
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM events as e"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM events as e"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
clause, args = make_in_list_sql_clause(
self.database_engine, "e.event_id", list(ev_map)

View File

@@ -67,8 +67,6 @@ class _BackgroundUpdates:
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@@ -255,11 +253,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
replaces_index="ev_edges_id",
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
self._background_events_populate_state_key_rejections,
)
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -448,10 +441,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# First, we get `batch_size` events from the table, pulling out
# their successor events, if any, and the successor events'
# rejection status.
# this should happen before the bg update which drops 'rejections'
assert not self.STATE_KEY_IN_EVENTS
txn.execute(
"""SELECT prev_event_id, event_id, internal_metadata,
rejections.event_id IS NOT NULL, events.outlier
@@ -977,9 +966,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
extra_clause = "AND events.room_id = ?"
tuple_args.append(last_room_id)
# this should happen before the bg update which drops 'state_events'
assert not self.STATE_KEY_IN_EVENTS
sql = """
SELECT
event_id, state_events.type, state_events.state_key,
@@ -1048,10 +1034,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# Calculate and persist the chain cover index for this set of events.
#
# Annoyingly we need to gut wrench into the persist event store so that
# Annoyingly we need to gut wrench into the persit event store so that
# we can reuse the function to calculate the chain cover for rooms.
PersistEventsStore._add_chain_cover_index(
False,
txn,
self.db_pool,
self.event_chain_id_gen, # type: ignore[attr-defined]
@@ -1414,95 +1399,3 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return batch_size
async def _background_events_populate_state_key_rejections(
self, progress: JsonDict, batch_size: int
) -> int:
"""Back-populate `events.state_key` and `events.rejection_reason"""
min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
def _populate_txn(txn: LoggingTransaction) -> bool:
"""Returns True if we're done."""
# first we need to find an endpoint.
# we need to find the final row in the batch of batch_size, which means
# we need to skip over (batch_size-1) rows and get the next row.
txn.execute(
"""
SELECT stream_ordering FROM events
WHERE stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering
LIMIT 1 OFFSET ?
""",
(
min_stream_ordering_exclusive,
max_stream_ordering_inclusive,
batch_size - 1,
),
)
endpoint = None
row = txn.fetchone()
if row:
endpoint = row[0]
where_clause = "e.stream_ordering > ?"
args = [min_stream_ordering_exclusive]
if endpoint:
where_clause += " AND e.stream_ordering <= ?"
args.append(endpoint)
# now do the updates. We consider rows within our range of stream orderings,
# but only those with a non-null rejection reason or state_key (since there
# is nothing to update for rows where rejection reason and state_key are
# both null.
txn.execute(
f"""
WITH t AS (
SELECT e.event_id, r.reason, se.state_key
FROM events e
LEFT JOIN rejections r USING (event_id)
LEFT JOIN state_events se USING (event_id)
WHERE ({where_clause}) AND (
r.reason IS NOT NULL OR se.state_key IS NOT NULL
)
)
UPDATE events
SET rejection_reason=t.reason, state_key=t.state_key
FROM t WHERE events.event_id = t.event_id
""",
args,
)
logger.info(
"populated new `events` columns up to %s/%i: updated %i/%i rows",
endpoint,
max_stream_ordering_inclusive,
txn.rowcount,
batch_size,
)
if endpoint is None:
# we're done
return True
progress["min_stream_ordering_exclusive"] = endpoint
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
progress,
)
return False
done = await self.db_pool.runInteraction(
desc="events_populate_state_key_rejections", func=_populate_txn
)
if done:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
)
return batch_size

View File

@@ -1482,35 +1482,20 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
else:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
@@ -1538,36 +1523,21 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
else:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
txn.execute(sql, (last_id, current_id, instance_name))
return cast(
@@ -1611,32 +1581,18 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_backfill_event_rows(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]:
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
else:
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates: List[
Tuple[int, Tuple[str, str, str, str, str, str]]
@@ -1655,34 +1611,19 @@ class EventsWorkerStore(SQLBaseStore):
else:
upper_bound = current_id
if self.STATE_KEY_IN_EVENTS:
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" e.state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
else:
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound, instance_name))
# Type safety: iterating over `txn` yields `Tuple`, i.e.
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a

View File

@@ -122,11 +122,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
logger.info("[purge] looking for events to delete")
should_delete_expr = (
"e.state_key IS NULL"
if self.STATE_KEY_IN_EVENTS
else "state_events.state_key IS NULL"
)
should_delete_expr = "state_events.state_key IS NULL"
should_delete_params: Tuple[Any, ...] = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
@@ -138,23 +134,12 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Note that we insert events that are outliers and aren't going to be
# deleted, as nothing will happen to them.
if self.STATE_KEY_IN_EVENTS:
sqlf = """
INSERT INTO events_to_purge
SELECT event_id, %s
FROM events AS e
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
"""
else:
sqlf = """
INSERT INTO events_to_purge
SELECT event_id, %s
FROM events AS e LEFT JOIN state_events USING (event_id)
WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
"""
txn.execute(
sqlf % (should_delete_expr, should_delete_expr),
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
% (should_delete_expr, should_delete_expr),
should_delete_params,
)

View File

@@ -22,19 +22,10 @@ logger = logging.getLogger(__name__)
class RejectionsStore(SQLBaseStore):
async def get_rejection_reason(self, event_id: str) -> Optional[str]:
if self.STATE_KEY_IN_EVENTS:
return await self.db_pool.simple_select_one_onecol(
table="events",
retcol="rejection_reason",
keyvalues={"event_id": event_id},
allow_none=True,
desc="get_rejection_reason",
)
else:
return await self.db_pool.simple_select_one_onecol(
table="rejections",
retcol="reason",
keyvalues={"event_id": event_id},
allow_none=True,
desc="get_rejection_reason",
)
return await self.db_pool.simple_select_one_onecol(
table="rejections",
retcol="reason",
keyvalues={"event_id": event_id},
allow_none=True,
desc="get_rejection_reason",
)

View File

@@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return results_dict.get("membership"), results_dict.get("event_id")
@cached(max_entries=500000, iterable=True, prune_unread_entries=False)
@cached(max_entries=500000, iterable=True, prune_unread_entries=False, debug_invalidations=True)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:

View File

@@ -1,47 +0,0 @@
# Copyright 2022 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from synapse.storage.types import Cursor
def run_create(cur: Cursor, database_engine, *args, **kwargs):
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
# we know that any new events will have the columns populated (and that has been
# the case since schema_version 68, so there is no chance of rolling back now).
#
# So, we only need to make sure that existing rows are updated. We read the
# current min and max stream orderings, since that is guaranteed to include all
# the events that were stored before the new columns were added.
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
(min_stream_ordering, max_stream_ordering) = cur.fetchone()
if min_stream_ordering is None:
# no rows, nothing to do.
return
cur.execute(
"INSERT into background_updates (ordering, update_name, progress_json)"
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
(
json.dumps(
{
"min_stream_ordering_exclusive": min_stream_ordering - 1,
"max_stream_ordering_inclusive": max_stream_ordering,
}
),
),
)

View File

@@ -15,6 +15,7 @@
# limitations under the License.
import enum
import logging
import threading
from typing import (
Callable,
@@ -66,6 +67,7 @@ class DeferredCache(Generic[KT, VT]):
"cache",
"thread",
"_pending_deferred_cache",
"debug_invalidations"
)
def __init__(
@@ -76,6 +78,7 @@ class DeferredCache(Generic[KT, VT]):
iterable: bool = False,
apply_cache_factor_from_config: bool = True,
prune_unread_entries: bool = True,
debug_invalidations: bool = False,
):
"""
Args:
@@ -119,6 +122,7 @@ class DeferredCache(Generic[KT, VT]):
)
self.thread: Optional[threading.Thread] = None
self.debug_invalidations = debug_invalidations
@property
def max_entries(self) -> int:
@@ -310,6 +314,9 @@ class DeferredCache(Generic[KT, VT]):
self.check_thread()
self.cache.del_multi(key)
if self.debug_invalidations:
logging.debug("Invalidating key %r in cache", key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
# for future queries and (b) stop it being persisted as a proper entry
@@ -326,6 +333,8 @@ class DeferredCache(Generic[KT, VT]):
entry.invalidate()
def invalidate_all(self) -> None:
if self.debug_invalidations:
logging.debug("Invalidating ALL keys")
self.check_thread()
self.cache.clear()
for entry in self._pending_deferred_cache.values():

View File

@@ -301,6 +301,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
debug_invalidations: bool = False
):
super().__init__(
orig,
@@ -318,6 +319,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
self.tree = tree
self.iterable = iterable
self.prune_unread_entries = prune_unread_entries
self.debug_invalidations = debug_invalidations
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
cache: DeferredCache[CacheKey, Any] = DeferredCache(
@@ -326,6 +328,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
tree=self.tree,
iterable=self.iterable,
prune_unread_entries=self.prune_unread_entries,
debug_invalidations=self.debug_invalidations,
)
get_cache_key = self.cache_key_builder
@@ -577,6 +580,7 @@ def cached(
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
debug_invalidations: bool = False,
) -> Callable[[F], _CachedFunction[F]]:
func = lambda orig: DeferredCacheDescriptor(
orig,
@@ -587,6 +591,7 @@ def cached(
cache_context=cache_context,
iterable=iterable,
prune_unread_entries=prune_unread_entries,
debug_invalidations=debug_invalidations
)
return cast(Callable[[F], _CachedFunction[F]], func)