Compare commits
11 Commits
mv/complem
...
anoa/new_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8acd2c01bc | ||
|
|
f1d98d3b70 | ||
|
|
6ff8ba5fc6 | ||
|
|
918c74bfb5 | ||
|
|
957e3d74fc | ||
|
|
666ae87729 | ||
|
|
f2d12ccabe | ||
|
|
6302753012 | ||
|
|
cf65433de2 | ||
|
|
eaed4e6113 | ||
|
|
51a77e990b |
1
changelog.d/13162.misc
Normal file
1
changelog.d/13162.misc
Normal file
@@ -0,0 +1 @@
|
||||
Bump the minimum dependency of `matrix_common` to 1.3.0 to make use of the `MXCUri` class. Use `MXCUri` to simplify media retention test code.
|
||||
1
changelog.d/13589.feature
Normal file
1
changelog.d/13589.feature
Normal file
@@ -0,0 +1 @@
|
||||
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
|
||||
1
changelog.d/13753.misc
Normal file
1
changelog.d/13753.misc
Normal file
@@ -0,0 +1 @@
|
||||
Prepatory work for storing thread IDs for notifications and receipts.
|
||||
1
changelog.d/13780.misc
Normal file
1
changelog.d/13780.misc
Normal file
@@ -0,0 +1 @@
|
||||
Deduplicate `is_server_notices_room`.
|
||||
1
changelog.d/13788.misc
Normal file
1
changelog.d/13788.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove an old, incorrect migration file.
|
||||
1
changelog.d/13795.misc
Normal file
1
changelog.d/13795.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove unused method in `synapse.api.auth.Auth`.
|
||||
1
changelog.d/13798.misc
Normal file
1
changelog.d/13798.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix a memory leak when running the unit tests.
|
||||
1
changelog.d/13802.misc
Normal file
1
changelog.d/13802.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use partial indices on SQLite.
|
||||
@@ -1 +0,0 @@
|
||||
complement tests: put postgres data folder on an host path on /tmp that we bindmount, outside of the container storage that can be quite slow.
|
||||
@@ -17,16 +17,25 @@ ARG SYNAPSE_VERSION=latest
|
||||
# the same debian version as Synapse's docker image (so the versions of the
|
||||
# shared libraries match).
|
||||
|
||||
FROM postgres:13-bullseye AS postgres_base
|
||||
# initialise the database cluster in /var/lib/postgresql
|
||||
RUN gosu postgres initdb --locale=C --encoding=UTF-8 --auth-host password
|
||||
|
||||
# Configure a password and create a database for Synapse
|
||||
RUN echo "ALTER USER postgres PASSWORD 'somesecret'" | gosu postgres postgres --single
|
||||
RUN echo "CREATE DATABASE synapse" | gosu postgres postgres --single
|
||||
|
||||
# now build the final image, based on the Synapse image.
|
||||
|
||||
FROM matrixdotorg/synapse-workers:$SYNAPSE_VERSION
|
||||
# copy the postgres installation over from the image we built above
|
||||
RUN adduser --system --uid 999 postgres --home /var/lib/postgresql
|
||||
COPY --from=postgres:13-bullseye /usr/lib/postgresql /usr/lib/postgresql
|
||||
COPY --from=postgres:13-bullseye /usr/share/postgresql /usr/share/postgresql
|
||||
COPY --from=postgres_base /var/lib/postgresql /var/lib/postgresql
|
||||
COPY --from=postgres_base /usr/lib/postgresql /usr/lib/postgresql
|
||||
COPY --from=postgres_base /usr/share/postgresql /usr/share/postgresql
|
||||
RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql
|
||||
ENV PATH="${PATH}:/usr/lib/postgresql/13/bin"
|
||||
ENV PGDATA=/var/lib/postgresql/data/main
|
||||
ENV PGDATA=/var/lib/postgresql/data
|
||||
|
||||
# Extend the shared homeserver config to disable rate-limiting,
|
||||
# set Complement's static shared secret, enable registration, amongst other
|
||||
|
||||
@@ -25,16 +25,8 @@ case "$SYNAPSE_COMPLEMENT_DATABASE" in
|
||||
# Set postgres authentication details which will be placed in the homeserver config file
|
||||
export POSTGRES_PASSWORD=somesecret
|
||||
export POSTGRES_USER=postgres
|
||||
|
||||
export POSTGRES_HOST=localhost
|
||||
|
||||
if [ ! -f "$PGDATA/PG_VERSION" ]; then
|
||||
gosu postgres initdb --locale=C --encoding=UTF-8 --auth-host password
|
||||
|
||||
echo "ALTER USER postgres PASSWORD 'somesecret'" | gosu postgres postgres --single
|
||||
echo "CREATE DATABASE synapse" | gosu postgres postgres --single
|
||||
fi
|
||||
|
||||
# configure supervisord to start postgres
|
||||
export START_POSTGRES=true
|
||||
;;
|
||||
|
||||
10
poetry.lock
generated
10
poetry.lock
generated
@@ -524,11 +524,11 @@ python-versions = ">=3.7"
|
||||
|
||||
[[package]]
|
||||
name = "matrix-common"
|
||||
version = "1.2.1"
|
||||
version = "1.3.0"
|
||||
description = "Common utilities for Synapse, Sydent and Sygnal"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[package.dependencies]
|
||||
attrs = "*"
|
||||
@@ -1625,7 +1625,7 @@ url_preview = ["lxml"]
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = "^3.7.1"
|
||||
content-hash = "79cfa09d59f9f8b5ef24318fb860df1915f54328692aa56d04331ecbdd92a8cb"
|
||||
content-hash = "1b14fc274d9e2a495a7f864150f3ffcf4d9f585e09a67e53301ae4ef3c2f3e48"
|
||||
|
||||
[metadata.files]
|
||||
attrs = [
|
||||
@@ -2113,8 +2113,8 @@ markupsafe = [
|
||||
{file = "MarkupSafe-2.1.0.tar.gz", hash = "sha256:80beaf63ddfbc64a0452b841d8036ca0611e049650e20afcb882f5d3c266d65f"},
|
||||
]
|
||||
matrix-common = [
|
||||
{file = "matrix_common-1.2.1-py3-none-any.whl", hash = "sha256:946709c405944a0d4b1d73207b77eb064b6dbfc5d70a69471320b06d8ce98b20"},
|
||||
{file = "matrix_common-1.2.1.tar.gz", hash = "sha256:a99dcf02a6bd95b24a5a61b354888a2ac92bf2b4b839c727b8dd9da2cdfa3853"},
|
||||
{file = "matrix_common-1.3.0-py3-none-any.whl", hash = "sha256:524e2785b9b03be4d15f3a8a6b857c5b6af68791ffb1b9918f0ad299abc4db20"},
|
||||
{file = "matrix_common-1.3.0.tar.gz", hash = "sha256:62e121cccd9f243417b57ec37a76dc44aeb198a7a5c67afd6b8275992ff2abd1"},
|
||||
]
|
||||
matrix-synapse-ldap3 = [
|
||||
{file = "matrix-synapse-ldap3-0.2.2.tar.gz", hash = "sha256:b388d95693486eef69adaefd0fd9e84463d52fe17b0214a00efcaa669b73cb74"},
|
||||
|
||||
@@ -164,7 +164,7 @@ typing-extensions = ">=3.10.0.1"
|
||||
cryptography = ">=3.4.7"
|
||||
# ijson 3.1.4 fixes a bug with "." in property names
|
||||
ijson = ">=3.1.4"
|
||||
matrix-common = "^1.2.1"
|
||||
matrix-common = "^1.3.0"
|
||||
# We need packaging.requirements.Requirement, added in 16.1.
|
||||
packaging = ">=16.1"
|
||||
# At the time of writing, we only use functions from the version `importlib.metadata`
|
||||
|
||||
@@ -122,14 +122,7 @@ if [ -n "$skip_complement_run" ]; then
|
||||
exit
|
||||
fi
|
||||
|
||||
PG_DATA_FOLDER=/tmp/postgres-data
|
||||
|
||||
rm -rf $PG_DATA_FOLDER
|
||||
mkdir -p $PG_DATA_FOLDER
|
||||
chmod 777 $PG_DATA_FOLDER
|
||||
|
||||
export COMPLEMENT_BASE_IMAGE=complement-synapse
|
||||
export COMPLEMENT_HOST_MOUNTS=$PG_DATA_FOLDER:/var/lib/postgresql/data
|
||||
|
||||
extra_test_args=()
|
||||
|
||||
@@ -185,5 +178,3 @@ echo "Images built; running complement"
|
||||
cd "$COMPLEMENT_DIR"
|
||||
|
||||
go test -v -tags $test_tags -count=1 "${extra_test_args[@]}" "$@" ./tests/...
|
||||
|
||||
rm -rf $PG_DATA_FOLDER
|
||||
|
||||
@@ -459,15 +459,6 @@ class Auth:
|
||||
)
|
||||
raise InvalidClientTokenError("Invalid access token passed.")
|
||||
|
||||
def get_appservice_by_req(self, request: SynapseRequest) -> ApplicationService:
|
||||
token = self.get_access_token_from_request(request)
|
||||
service = self.store.get_app_service_by_token(token)
|
||||
if not service:
|
||||
logger.warning("Unrecognised appservice access token.")
|
||||
raise InvalidClientTokenError()
|
||||
request.requester = create_requester(service.sender, app_service=service)
|
||||
return service
|
||||
|
||||
async def is_server_admin(self, requester: Requester) -> bool:
|
||||
"""Check if the given user is a local server admin.
|
||||
|
||||
|
||||
@@ -862,6 +862,9 @@ class FederationEventHandler:
|
||||
self._sanity_check_event(event)
|
||||
except SynapseError as err:
|
||||
logger.warning("Event %s failed sanity check: %s", event_id, err)
|
||||
await self._store.record_event_failed_pull_attempt(
|
||||
event.room_id, event_id, str(err)
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -897,6 +900,10 @@ class FederationEventHandler:
|
||||
backfilled=backfilled,
|
||||
)
|
||||
except FederationError as e:
|
||||
await self._store.record_event_failed_pull_attempt(
|
||||
event.room_id, event_id, str(e)
|
||||
)
|
||||
|
||||
if e.code == 403:
|
||||
logger.warning("Pulled event %s failed history check.", event_id)
|
||||
else:
|
||||
|
||||
@@ -752,20 +752,12 @@ class EventCreationHandler:
|
||||
if builder.type == EventTypes.Member:
|
||||
membership = builder.content.get("membership", None)
|
||||
if membership == Membership.JOIN:
|
||||
return await self._is_server_notices_room(builder.room_id)
|
||||
return await self.store.is_server_notice_room(builder.room_id)
|
||||
elif membership == Membership.LEAVE:
|
||||
# the user is always allowed to leave (but not kick people)
|
||||
return builder.state_key == requester.user.to_string()
|
||||
return False
|
||||
|
||||
async def _is_server_notices_room(self, room_id: str) -> bool:
|
||||
if self.config.servernotices.server_notices_mxid is None:
|
||||
return False
|
||||
is_server_notices_room = await self.store.check_local_user_in_room(
|
||||
user_id=self.config.servernotices.server_notices_mxid, room_id=room_id
|
||||
)
|
||||
return is_server_notices_room
|
||||
|
||||
async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
|
||||
"""Check if a user has accepted the privacy policy
|
||||
|
||||
|
||||
@@ -837,7 +837,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
old_membership == Membership.INVITE
|
||||
and effective_membership_state == Membership.LEAVE
|
||||
):
|
||||
is_blocked = await self._is_server_notice_room(room_id)
|
||||
is_blocked = await self.store.is_server_notice_room(room_id)
|
||||
if is_blocked:
|
||||
raise SynapseError(
|
||||
HTTPStatus.FORBIDDEN,
|
||||
@@ -1617,14 +1617,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
return False
|
||||
|
||||
async def _is_server_notice_room(self, room_id: str) -> bool:
|
||||
if self._server_notices_mxid is None:
|
||||
return False
|
||||
is_server_notices_room = await self.store.check_local_user_in_room(
|
||||
user_id=self._server_notices_mxid, room_id=room_id
|
||||
)
|
||||
return is_server_notices_room
|
||||
|
||||
|
||||
class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
|
||||
@@ -198,7 +198,7 @@ class BulkPushRuleEvaluator:
|
||||
return pl_event.content if pl_event else {}, sender_level
|
||||
|
||||
async def _get_mutual_relations(
|
||||
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
|
||||
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
|
||||
) -> Dict[str, Set[Tuple[str, str]]]:
|
||||
"""
|
||||
Fetch event metadata for events which related to the same event as the given event.
|
||||
@@ -206,7 +206,7 @@ class BulkPushRuleEvaluator:
|
||||
If the given event has no relation information, returns an empty dictionary.
|
||||
|
||||
Args:
|
||||
event_id: The event ID which is targeted by relations.
|
||||
parent_id: The event ID which is targeted by relations.
|
||||
rules: The push rules which will be processed for this event.
|
||||
|
||||
Returns:
|
||||
@@ -220,12 +220,6 @@ class BulkPushRuleEvaluator:
|
||||
if not self._relations_match_enabled:
|
||||
return {}
|
||||
|
||||
# If the event does not have a relation, then cannot have any mutual
|
||||
# relations.
|
||||
relation = relation_from_event(event)
|
||||
if not relation:
|
||||
return {}
|
||||
|
||||
# Pre-filter to figure out which relation types are interesting.
|
||||
rel_types = set()
|
||||
for rule, enabled in rules:
|
||||
@@ -246,9 +240,7 @@ class BulkPushRuleEvaluator:
|
||||
return {}
|
||||
|
||||
# If any valid rules were found, fetch the mutual relations.
|
||||
return await self.store.get_mutual_event_relations(
|
||||
relation.parent_id, rel_types
|
||||
)
|
||||
return await self.store.get_mutual_event_relations(parent_id, rel_types)
|
||||
|
||||
@measure_func("action_for_event_by_user")
|
||||
async def action_for_event_by_user(
|
||||
@@ -281,9 +273,17 @@ class BulkPushRuleEvaluator:
|
||||
sender_power_level,
|
||||
) = await self._get_power_levels_and_sender_level(event, context)
|
||||
|
||||
relations = await self._get_mutual_relations(
|
||||
event, itertools.chain(*rules_by_user.values())
|
||||
)
|
||||
relation = relation_from_event(event)
|
||||
# If the event does not have a relation, then cannot have any mutual
|
||||
# relations or thread ID.
|
||||
relations = {}
|
||||
thread_id = "main"
|
||||
if relation:
|
||||
relations = await self._get_mutual_relations(
|
||||
relation.parent_id, itertools.chain(*rules_by_user.values())
|
||||
)
|
||||
if relation.rel_type == RelationTypes.THREAD:
|
||||
thread_id = relation.parent_id
|
||||
|
||||
evaluator = PushRuleEvaluatorForEvent(
|
||||
event,
|
||||
@@ -352,6 +352,7 @@ class BulkPushRuleEvaluator:
|
||||
event.event_id,
|
||||
actions_by_user,
|
||||
count_as_unread,
|
||||
thread_id,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ import shutil
|
||||
from io import BytesIO
|
||||
from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from matrix_common.types.mxc_uri import MXCUri
|
||||
|
||||
import twisted.internet.error
|
||||
import twisted.web.http
|
||||
from twisted.internet.defer import Deferred
|
||||
@@ -186,7 +188,7 @@ class MediaRepository:
|
||||
content: IO,
|
||||
content_length: int,
|
||||
auth_user: UserID,
|
||||
) -> str:
|
||||
) -> MXCUri:
|
||||
"""Store uploaded content for a local user and return the mxc URL
|
||||
|
||||
Args:
|
||||
@@ -219,7 +221,7 @@ class MediaRepository:
|
||||
|
||||
await self._generate_thumbnails(None, media_id, media_id, media_type)
|
||||
|
||||
return "mxc://%s/%s" % (self.server_name, media_id)
|
||||
return MXCUri(self.server_name, media_id)
|
||||
|
||||
async def get_local_media(
|
||||
self, request: SynapseRequest, media_id: str, name: Optional[str]
|
||||
|
||||
@@ -101,6 +101,8 @@ class UploadResource(DirectServeJsonResource):
|
||||
# the default 404, as that would just be confusing.
|
||||
raise SynapseError(400, "Bad content")
|
||||
|
||||
logger.info("Uploaded content with URI %r", content_uri)
|
||||
logger.info("Uploaded content with URI '%s'", content_uri)
|
||||
|
||||
respond_with_json(request, 200, {"content_uri": content_uri}, send_cors=True)
|
||||
respond_with_json(
|
||||
request, 200, {"content_uri": str(content_uri)}, send_cors=True
|
||||
)
|
||||
|
||||
@@ -581,9 +581,6 @@ class BackgroundUpdater:
|
||||
def create_index_sqlite(conn: Connection) -> None:
|
||||
# Sqlite doesn't support concurrent creation of indexes.
|
||||
#
|
||||
# We don't use partial indices on SQLite as it wasn't introduced
|
||||
# until 3.8, and wheezy and CentOS 7 have 3.7
|
||||
#
|
||||
# We assume that sqlite doesn't give us invalid indices; however
|
||||
# we may still end up with the index existing but the
|
||||
# background_updates not having been recorded if synapse got shut
|
||||
@@ -591,12 +588,13 @@ class BackgroundUpdater:
|
||||
# has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
|
||||
sql = (
|
||||
"CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
|
||||
" (%(columns)s)"
|
||||
" (%(columns)s) %(where_clause)s"
|
||||
) % {
|
||||
"unique": "UNIQUE" if unique else "",
|
||||
"name": index_name,
|
||||
"table": table,
|
||||
"columns": ", ".join(columns),
|
||||
"where_clause": "WHERE " + where_clause if where_clause else "",
|
||||
}
|
||||
|
||||
c = conn.cursor()
|
||||
|
||||
@@ -194,7 +194,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
|
||||
# changed its content in the database. We can't call
|
||||
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
|
||||
# right type.
|
||||
self.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
||||
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event.event_id)
|
||||
# Send that invalidation to replication so that other workers also invalidate
|
||||
# the event cache.
|
||||
self._send_invalidation_to_replication(
|
||||
|
||||
@@ -1294,6 +1294,51 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
|
||||
return event_id_results
|
||||
|
||||
@trace
|
||||
async def record_event_failed_pull_attempt(
|
||||
self, room_id: str, event_id: str, cause: str
|
||||
) -> None:
|
||||
"""
|
||||
Record when we fail to pull an event over federation.
|
||||
|
||||
This information allows us to be more intelligent when we decide to
|
||||
retry (we don't need to fail over and over) and we can process that
|
||||
event in the background so we don't block on it each time.
|
||||
|
||||
Args:
|
||||
room_id: The room where the event failed to pull from
|
||||
event_id: The event that failed to be fetched or processed
|
||||
cause: The error message or reason that we failed to pull the event
|
||||
"""
|
||||
await self.db_pool.runInteraction(
|
||||
"record_event_failed_pull_attempt",
|
||||
self._record_event_failed_pull_attempt_upsert_txn,
|
||||
room_id,
|
||||
event_id,
|
||||
cause,
|
||||
db_autocommit=True, # Safe as it's a single upsert
|
||||
)
|
||||
|
||||
def _record_event_failed_pull_attempt_upsert_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
cause: str,
|
||||
) -> None:
|
||||
sql = """
|
||||
INSERT INTO event_failed_pull_attempts (
|
||||
room_id, event_id, num_attempts, last_attempt_ts, last_cause
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT (room_id, event_id) DO UPDATE SET
|
||||
num_attempts=event_failed_pull_attempts.num_attempts + 1,
|
||||
last_attempt_ts=EXCLUDED.last_attempt_ts,
|
||||
last_cause=EXCLUDED.last_cause;
|
||||
"""
|
||||
|
||||
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
|
||||
|
||||
async def get_missing_events(
|
||||
self,
|
||||
room_id: str,
|
||||
|
||||
@@ -98,6 +98,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
@@ -232,6 +233,104 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
replaces_index="event_push_summary_user_rm",
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"event_push_summary_unique_index2",
|
||||
index_name="event_push_summary_unique_index2",
|
||||
table="event_push_summary",
|
||||
columns=["user_id", "room_id", "thread_id"],
|
||||
unique=True,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"event_push_backfill_thread_id",
|
||||
self._background_backfill_thread_id,
|
||||
)
|
||||
|
||||
async def _background_backfill_thread_id(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Fill in the thread_id field for event_push_actions and event_push_summary.
|
||||
|
||||
This is preparatory so that it can be made non-nullable in the future.
|
||||
|
||||
Because all current (null) data is done in an unthreaded manner this
|
||||
simply assumes it is on the "main" timeline. Since event_push_actions
|
||||
are periodically cleared it is not possible to correctly re-calculate
|
||||
the thread_id.
|
||||
"""
|
||||
event_push_actions_done = progress.get("event_push_actions_done", False)
|
||||
|
||||
def add_thread_id_txn(
|
||||
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
|
||||
) -> int:
|
||||
sql = f"""
|
||||
SELECT stream_ordering
|
||||
FROM {table_name}
|
||||
WHERE
|
||||
thread_id IS NULL
|
||||
AND stream_ordering > ?
|
||||
ORDER BY stream_ordering
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (start_stream_ordering, batch_size))
|
||||
|
||||
# No more rows to process.
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
progress[f"{table_name}_done"] = True
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "event_push_backfill_thread_id", progress
|
||||
)
|
||||
return 0
|
||||
|
||||
# Update the thread ID for any of those rows.
|
||||
max_stream_ordering = rows[-1][0]
|
||||
|
||||
sql = f"""
|
||||
UPDATE {table_name}
|
||||
SET thread_id = 'main'
|
||||
WHERE stream_ordering <= ? AND thread_id IS NULL
|
||||
"""
|
||||
txn.execute(sql, (max_stream_ordering,))
|
||||
|
||||
# Update progress.
|
||||
processed_rows = txn.rowcount
|
||||
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "event_push_backfill_thread_id", progress
|
||||
)
|
||||
|
||||
return processed_rows
|
||||
|
||||
# First update the event_push_actions table, then the event_push_summary table.
|
||||
#
|
||||
# Note that the event_push_actions_staging table is ignored since it is
|
||||
# assumed that items in that table will only exist for a short period of
|
||||
# time.
|
||||
if not event_push_actions_done:
|
||||
result = await self.db_pool.runInteraction(
|
||||
"event_push_backfill_thread_id",
|
||||
add_thread_id_txn,
|
||||
"event_push_actions",
|
||||
progress.get("max_event_push_actions_stream_ordering", 0),
|
||||
)
|
||||
else:
|
||||
result = await self.db_pool.runInteraction(
|
||||
"event_push_backfill_thread_id",
|
||||
add_thread_id_txn,
|
||||
"event_push_summary",
|
||||
progress.get("max_event_push_summary_stream_ordering", 0),
|
||||
)
|
||||
|
||||
# Only done after the event_push_summary table is done.
|
||||
if not result:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"event_push_backfill_thread_id"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
@cached(tree=True, max_entries=5000)
|
||||
async def get_unread_event_push_actions_by_room_for_user(
|
||||
self,
|
||||
@@ -670,6 +769,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
event_id: str,
|
||||
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
|
||||
count_as_unread: bool,
|
||||
thread_id: str,
|
||||
) -> None:
|
||||
"""Add the push actions for the event to the push action staging area.
|
||||
|
||||
@@ -678,6 +778,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
user_id_actions: A mapping of user_id to list of push actions, where
|
||||
an action can either be a string or dict.
|
||||
count_as_unread: Whether this event should increment unread counts.
|
||||
thread_id: The thread this event is parent of, if applicable.
|
||||
"""
|
||||
if not user_id_actions:
|
||||
return
|
||||
@@ -686,7 +787,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
# can be used to insert into the `event_push_actions_staging` table.
|
||||
def _gen_entry(
|
||||
user_id: str, actions: Collection[Union[Mapping, str]]
|
||||
) -> Tuple[str, str, str, int, int, int]:
|
||||
) -> Tuple[str, str, str, int, int, int, str]:
|
||||
is_highlight = 1 if _action_has_highlight(actions) else 0
|
||||
notif = 1 if "notify" in actions else 0
|
||||
return (
|
||||
@@ -696,11 +797,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
notif, # notif column
|
||||
is_highlight, # highlight column
|
||||
int(count_as_unread), # unread column
|
||||
thread_id, # thread_id column
|
||||
)
|
||||
|
||||
await self.db_pool.simple_insert_many(
|
||||
"event_push_actions_staging",
|
||||
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
|
||||
keys=(
|
||||
"event_id",
|
||||
"user_id",
|
||||
"actions",
|
||||
"notif",
|
||||
"highlight",
|
||||
"unread",
|
||||
"thread_id",
|
||||
),
|
||||
values=[
|
||||
_gen_entry(user_id, actions)
|
||||
for user_id, actions in user_id_actions.items()
|
||||
@@ -981,6 +1091,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
)
|
||||
|
||||
# Replace the previous summary with the new counts.
|
||||
#
|
||||
# TODO(threads): Upsert per-thread instead of setting them all to main.
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="event_push_summary",
|
||||
@@ -990,6 +1102,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
"unread_count": unread_count,
|
||||
"stream_ordering": old_rotate_stream_ordering,
|
||||
"last_receipt_stream_ordering": stream_ordering,
|
||||
"thread_id": "main",
|
||||
},
|
||||
)
|
||||
|
||||
@@ -1138,17 +1251,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
|
||||
logger.info("Rotating notifications, handling %d rows", len(summaries))
|
||||
|
||||
# TODO(threads): Update on a per-thread basis.
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="event_push_summary",
|
||||
key_names=("user_id", "room_id"),
|
||||
key_values=[(user_id, room_id) for user_id, room_id in summaries],
|
||||
value_names=("notif_count", "unread_count", "stream_ordering"),
|
||||
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
|
||||
value_values=[
|
||||
(
|
||||
summary.notif_count,
|
||||
summary.unread_count,
|
||||
summary.stream_ordering,
|
||||
"main",
|
||||
)
|
||||
for summary in summaries.values()
|
||||
],
|
||||
@@ -1255,7 +1370,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
table="event_push_actions",
|
||||
columns=["highlight", "stream_ordering"],
|
||||
where_clause="highlight=0",
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
async def get_push_actions_for_user(
|
||||
|
||||
@@ -1294,8 +1294,10 @@ class PersistEventsStore:
|
||||
"""
|
||||
depth_updates: Dict[str, int] = {}
|
||||
for event, context in events_and_contexts:
|
||||
# Remove the any existing cache entries for the event_ids
|
||||
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
||||
# Remove any existing cache entries for the event_ids
|
||||
self.store.invalidate_get_event_cache_by_event_id_after_txn(
|
||||
txn, event.event_id
|
||||
)
|
||||
# Then update the `stream_ordering` position to mark the latest
|
||||
# event as the front of the room. This should not be done for
|
||||
# backfilled events because backfilled events have negative
|
||||
@@ -1703,7 +1705,7 @@ class PersistEventsStore:
|
||||
_invalidate_caches_for_event.
|
||||
"""
|
||||
assert event.redacts is not None
|
||||
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
|
||||
self.store.invalidate_get_event_cache_by_event_id_after_txn(txn, event.redacts)
|
||||
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
|
||||
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
|
||||
|
||||
@@ -2192,9 +2194,9 @@ class PersistEventsStore:
|
||||
sql = """
|
||||
INSERT INTO event_push_actions (
|
||||
room_id, event_id, user_id, actions, stream_ordering,
|
||||
topological_ordering, notif, highlight, unread
|
||||
topological_ordering, notif, highlight, unread, thread_id
|
||||
)
|
||||
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
|
||||
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
|
||||
FROM event_push_actions_staging
|
||||
WHERE event_id = ?
|
||||
"""
|
||||
@@ -2435,17 +2437,31 @@ class PersistEventsStore:
|
||||
"DELETE FROM event_backward_extremities"
|
||||
" WHERE event_id = ? AND room_id = ?"
|
||||
)
|
||||
backward_extremity_tuples_to_remove = [
|
||||
(ev.event_id, ev.room_id)
|
||||
for ev in events
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
# If we encountered an event with no prev_events, then we might
|
||||
# as well remove it now because it won't ever have anything else
|
||||
# to backfill from.
|
||||
or len(ev.prev_event_ids()) == 0
|
||||
]
|
||||
txn.execute_batch(
|
||||
query,
|
||||
[
|
||||
(ev.event_id, ev.room_id)
|
||||
for ev in events
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
# If we encountered an event with no prev_events, then we might
|
||||
# as well remove it now because it won't ever have anything else
|
||||
# to backfill from.
|
||||
or len(ev.prev_event_ids()) == 0
|
||||
],
|
||||
backward_extremity_tuples_to_remove,
|
||||
)
|
||||
|
||||
# Clear out the failed backfill attempts after we successfully pulled
|
||||
# the event. Since we no longer need these events as backward
|
||||
# extremities, it also means that they won't be backfilled from again so
|
||||
# we no longer need to store the backfill attempts around it.
|
||||
query = """
|
||||
DELETE FROM event_failed_pull_attempts
|
||||
WHERE event_id = ? and room_id = ?
|
||||
"""
|
||||
txn.execute_batch(
|
||||
query,
|
||||
backward_extremity_tuples_to_remove,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -80,6 +80,7 @@ from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.dual_lookup_cache import DualLookupCache
|
||||
from synapse.util.caches.lrucache import AsyncLruCache
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.iterutils import batch_iter
|
||||
@@ -245,6 +246,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
] = AsyncLruCache(
|
||||
cache_name="*getEvent*",
|
||||
max_size=hs.config.caches.event_cache_size,
|
||||
cache_type=DualLookupCache,
|
||||
dual_lookup_secondary_key_function=lambda v: (v.event.room_id,),
|
||||
)
|
||||
|
||||
# Map from event ID to a deferred that will result in a map from event
|
||||
@@ -733,7 +736,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
return event_entry_map
|
||||
|
||||
def invalidate_get_event_cache_after_txn(
|
||||
def invalidate_get_event_cache_by_event_id_after_txn(
|
||||
self, txn: LoggingTransaction, event_id: str
|
||||
) -> None:
|
||||
"""
|
||||
@@ -747,10 +750,31 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
event_id: the event ID to be invalidated from caches
|
||||
"""
|
||||
|
||||
txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
|
||||
txn.call_after(self._invalidate_local_get_event_cache, event_id)
|
||||
txn.async_call_after(
|
||||
self._invalidate_async_get_event_cache_by_event_id, event_id
|
||||
)
|
||||
txn.call_after(self._invalidate_local_get_event_cache_by_event_id, event_id)
|
||||
|
||||
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
|
||||
def invalidate_get_event_cache_by_room_id_after_txn(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Prepares a database transaction to invalidate the get event cache for a given
|
||||
room ID when executed successfully. This is achieved by attaching two callbacks
|
||||
to the transaction, one to invalidate the async cache and one for the in memory
|
||||
sync cache (importantly called in that order).
|
||||
|
||||
Arguments:
|
||||
txn: the database transaction to attach the callbacks to.
|
||||
room_id: the room ID to invalidate all associated event caches for.
|
||||
"""
|
||||
|
||||
txn.async_call_after(self._invalidate_async_get_event_cache_by_room_id, room_id)
|
||||
txn.call_after(self._invalidate_local_get_event_cache_by_room_id, room_id)
|
||||
|
||||
async def _invalidate_async_get_event_cache_by_event_id(
|
||||
self, event_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Invalidates an event in the asyncronous get event cache, which may be remote.
|
||||
|
||||
@@ -760,7 +784,18 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
await self._get_event_cache.invalidate((event_id,))
|
||||
|
||||
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
|
||||
async def _invalidate_async_get_event_cache_by_room_id(self, room_id: str) -> None:
|
||||
"""
|
||||
Invalidates all events associated with a given room in the asyncronous get event
|
||||
cache, which may be remote.
|
||||
|
||||
Arguments:
|
||||
room_id: the room ID to invalidate associated events of.
|
||||
"""
|
||||
|
||||
await self._get_event_cache.invalidate((room_id,))
|
||||
|
||||
def _invalidate_local_get_event_cache_by_event_id(self, event_id: str) -> None:
|
||||
"""
|
||||
Invalidates an event in local in-memory get event caches.
|
||||
|
||||
@@ -772,6 +807,18 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
self._event_ref.pop(event_id, None)
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
|
||||
def _invalidate_local_get_event_cache_by_room_id(self, room_id: str) -> None:
|
||||
"""
|
||||
Invalidates all events associated with a given room ID in local in-memory
|
||||
get event caches.
|
||||
|
||||
Arguments:
|
||||
room_id: the room ID to invalidate events of.
|
||||
"""
|
||||
self._get_event_cache.invalidate_local((room_id,))
|
||||
|
||||
# TODO: invalidate _event_ref and _current_event_fetches. How?
|
||||
|
||||
async def _get_events_from_cache(
|
||||
self, events: Iterable[str], update_metrics: bool = True
|
||||
) -> Dict[str, EventCacheEntry]:
|
||||
@@ -2284,7 +2331,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
updatevalues={"rejection_reason": rejection_reason},
|
||||
)
|
||||
|
||||
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
||||
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event_id)
|
||||
|
||||
# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
|
||||
# call '_send_invalidation_to_replication', but we actually need the other
|
||||
|
||||
@@ -304,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.have_seen_event, (room_id, event_id)
|
||||
)
|
||||
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
||||
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event_id)
|
||||
|
||||
logger.info("[purge] done")
|
||||
|
||||
@@ -478,6 +478,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# XXX: as with purge_history, this is racy, but no worse than other races
|
||||
# that already exist.
|
||||
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
||||
self._invalidate_local_get_event_cache_by_room_id(room_id)
|
||||
|
||||
logger.info("[purge] done")
|
||||
|
||||
|
||||
@@ -113,6 +113,24 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
prefilled_cache=receipts_stream_prefill,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"receipts_linearized_unique_index",
|
||||
index_name="receipts_linearized_unique_index",
|
||||
table="receipts_linearized",
|
||||
columns=["room_id", "receipt_type", "user_id"],
|
||||
where_clause="thread_id IS NULL",
|
||||
unique=True,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"receipts_graph_unique_index",
|
||||
index_name="receipts_graph_unique_index",
|
||||
table="receipts_graph",
|
||||
columns=["room_id", "receipt_type", "user_id"],
|
||||
where_clause="thread_id IS NULL",
|
||||
unique=True,
|
||||
)
|
||||
|
||||
def get_max_receipt_stream_id(self) -> int:
|
||||
"""Get the current max stream ID for receipts stream"""
|
||||
return self._receipts_id_gen.get_current_token()
|
||||
@@ -677,6 +695,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
"event_id": event_id,
|
||||
"event_stream_ordering": stream_ordering,
|
||||
"data": json_encoder.encode(data),
|
||||
"thread_id": None,
|
||||
},
|
||||
# receipts_linearized has a unique constraint on
|
||||
# (user_id, room_id, receipt_type), so no need to lock
|
||||
@@ -824,6 +843,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
values={
|
||||
"event_ids": json_encoder.encode(event_ids),
|
||||
"data": json_encoder.encode(data),
|
||||
"thread_id": None,
|
||||
},
|
||||
# receipts_graph has a unique constraint on
|
||||
# (user_id, room_id, receipt_type), so no need to lock
|
||||
|
||||
@@ -88,6 +88,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
# at a time. Keyed by room_id.
|
||||
self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
|
||||
|
||||
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
|
||||
|
||||
if (
|
||||
self.hs.config.worker.run_background_tasks
|
||||
and self.hs.config.metrics.metrics_flags.known_servers
|
||||
@@ -504,6 +506,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
return membership == Membership.JOIN
|
||||
|
||||
async def is_server_notice_room(self, room_id: str) -> bool:
|
||||
"""
|
||||
Determines whether the given room is a 'Server Notices' room, used for
|
||||
sending server notices to a user.
|
||||
|
||||
This is determined by seeing whether the server notices user is present
|
||||
in the room.
|
||||
"""
|
||||
if self._server_notices_mxid is None:
|
||||
return False
|
||||
is_server_notices_room = await self.check_local_user_in_room(
|
||||
user_id=self._server_notices_mxid, room_id=room_id
|
||||
)
|
||||
return is_server_notices_room
|
||||
|
||||
async def get_local_current_membership_for_user_in_room(
|
||||
self, user_id: str, room_id: str
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
SCHEMA_VERSION = 72 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 73 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -77,6 +77,12 @@ Changes in SCHEMA_VERSION = 72:
|
||||
- Tables related to groups are dropped.
|
||||
- Unused column application_services_state.last_txn is dropped
|
||||
- Cache invalidation stream id sequence now begins at 2 to match code expectation.
|
||||
|
||||
Changes in SCHEMA_VERSION = 73;
|
||||
- thread_id column is added to event_push_actions, event_push_actions_staging
|
||||
event_push_summary, receipts_linearized, and receipts_graph.
|
||||
- Add table `event_failed_pull_attempts` to keep track when we fail to pull
|
||||
events over federation.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Add a nullable column for thread ID to the event push actions tables; this
|
||||
-- will be filled in with a default value for any previously existing rows.
|
||||
--
|
||||
-- After migration this can be made non-nullable.
|
||||
|
||||
ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT;
|
||||
ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT;
|
||||
ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;
|
||||
|
||||
-- Update the unique index for `event_push_summary`.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(7006, 'event_push_summary_unique_index2', '{}');
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');
|
||||
@@ -0,0 +1,30 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Add a nullable column for thread ID to the receipts table; this allows a
|
||||
-- receipt per user, per room, as well as an unthreaded receipt (corresponding
|
||||
-- to a null thread ID).
|
||||
|
||||
ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT;
|
||||
ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT;
|
||||
|
||||
-- Rebuild the unique constraint with the thread_id.
|
||||
ALTER TABLE receipts_linearized
|
||||
ADD CONSTRAINT receipts_linearized_uniqueness_thread
|
||||
UNIQUE (room_id, receipt_type, user_id, thread_id);
|
||||
|
||||
ALTER TABLE receipts_graph
|
||||
ADD CONSTRAINT receipts_graph_uniqueness_thread
|
||||
UNIQUE (room_id, receipt_type, user_id, thread_id);
|
||||
@@ -0,0 +1,70 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Allow multiple receipts per user per room via a nullable thread_id column.
|
||||
--
|
||||
-- SQLite doesn't support modifying constraints to an existing table, so it must
|
||||
-- be recreated.
|
||||
|
||||
-- Create the new tables.
|
||||
CREATE TABLE receipts_linearized_new (
|
||||
stream_id BIGINT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
receipt_type TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
event_id TEXT NOT NULL,
|
||||
thread_id TEXT,
|
||||
event_stream_ordering BIGINT,
|
||||
data TEXT NOT NULL,
|
||||
CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id),
|
||||
CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
|
||||
);
|
||||
|
||||
CREATE TABLE receipts_graph_new (
|
||||
room_id TEXT NOT NULL,
|
||||
receipt_type TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
event_ids TEXT NOT NULL,
|
||||
thread_id TEXT,
|
||||
data TEXT NOT NULL,
|
||||
CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id),
|
||||
CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
|
||||
);
|
||||
|
||||
-- Drop the old indexes.
|
||||
DROP INDEX IF EXISTS receipts_linearized_id;
|
||||
DROP INDEX IF EXISTS receipts_linearized_room_stream;
|
||||
DROP INDEX IF EXISTS receipts_linearized_user;
|
||||
|
||||
-- Copy the data.
|
||||
INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data)
|
||||
SELECT stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data
|
||||
FROM receipts_linearized;
|
||||
INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
|
||||
SELECT room_id, receipt_type, user_id, event_ids, data
|
||||
FROM receipts_graph;
|
||||
|
||||
-- Drop the old tables.
|
||||
DROP TABLE receipts_linearized;
|
||||
DROP TABLE receipts_graph;
|
||||
|
||||
-- Rename the tables.
|
||||
ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
|
||||
ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
|
||||
|
||||
-- Create the indices.
|
||||
CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
|
||||
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
|
||||
CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
|
||||
20
synapse/storage/schema/main/delta/72/08thread_receipts.sql
Normal file
20
synapse/storage/schema/main/delta/72/08thread_receipts.sql
Normal file
@@ -0,0 +1,20 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(7007, 'receipts_linearized_unique_index', '{}');
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(7007, 'receipts_graph_unique_index', '{}');
|
||||
@@ -0,0 +1,56 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- SQLite needs to rebuild indices which use partial indices on Postgres, but
|
||||
-- previously did not use them on SQLite.
|
||||
|
||||
-- Drop each index that was added with register_background_index_update AND specified
|
||||
-- a where_clause (that existed before this delta).
|
||||
|
||||
-- From events_bg_updates.py
|
||||
DROP INDEX IF EXISTS event_contains_url_index;
|
||||
-- There is also a redactions_censored_redacts index, but that gets dropped.
|
||||
DROP INDEX IF EXISTS redactions_have_censored_ts;
|
||||
-- There is also a PostgreSQL only index (event_contains_url_index2)
|
||||
-- which gets renamed to event_contains_url_index.
|
||||
|
||||
-- From roommember.py
|
||||
DROP INDEX IF EXISTS room_memberships_user_room_forgotten;
|
||||
|
||||
-- From presence.py
|
||||
DROP INDEX IF EXISTS presence_stream_state_not_offline_idx;
|
||||
|
||||
-- From media_repository.py
|
||||
DROP INDEX IF EXISTS local_media_repository_url_idx;
|
||||
|
||||
-- From event_push_actions.py
|
||||
DROP INDEX IF EXISTS event_push_actions_highlights_index;
|
||||
-- There's also a event_push_actions_stream_highlight_index which was previously
|
||||
-- PostgreSQL-only.
|
||||
|
||||
-- From state.py
|
||||
DROP INDEX IF EXISTS current_state_events_member_index;
|
||||
|
||||
-- Re-insert the background jobs to re-create the indices.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(7209, 'event_contains_url_index', '{}', NULL),
|
||||
(7209, 'redactions_have_censored_ts_idx', '{}', NULL),
|
||||
(7209, 'room_membership_forgotten_idx', '{}', NULL),
|
||||
(7209, 'presence_stream_not_offline_index', '{}', NULL),
|
||||
(7209, 'local_media_repository_url_idx', '{}', NULL),
|
||||
(7209, 'event_push_actions_highlights_index', '{}', NULL),
|
||||
(7209, 'event_push_actions_stream_highlight_index', '{}', NULL),
|
||||
(7209, 'current_state_members_idx', '{}', NULL)
|
||||
ON CONFLICT (update_name) DO NOTHING;
|
||||
@@ -0,0 +1,29 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
-- Add a table that keeps track of when we failed to pull an event over
|
||||
-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
|
||||
-- us to be more intelligent when we decide to retry (we don't need to fail over
|
||||
-- and over) and we can process that event in the background so we don't block
|
||||
-- on it each time.
|
||||
CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
|
||||
room_id TEXT NOT NULL REFERENCES rooms (room_id),
|
||||
event_id TEXT NOT NULL,
|
||||
num_attempts INT NOT NULL,
|
||||
last_attempt_ts BIGINT NOT NULL,
|
||||
last_cause TEXT NOT NULL,
|
||||
PRIMARY KEY (room_id, event_id)
|
||||
);
|
||||
@@ -1,37 +0,0 @@
|
||||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
/* We used to create a table called current_state_resets, but this is no
|
||||
* longer used and is removed in delta 54.
|
||||
*/
|
||||
|
||||
/* The outlier events that have aquired a state group typically through
|
||||
* backfill. This is tracked separately to the events table, as assigning a
|
||||
* state group change the position of the existing event in the stream
|
||||
* ordering.
|
||||
* However since a stream_ordering is assigned in persist_event for the
|
||||
* (event, state) pair, we can use that stream_ordering to identify when
|
||||
* the new state was assigned for the event.
|
||||
*/
|
||||
|
||||
/* NB: This table belongs to the `main` logical database; it should not be present
|
||||
* in `state`.
|
||||
*/
|
||||
CREATE TABLE IF NOT EXISTS ex_outlier_stream(
|
||||
event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
|
||||
event_id TEXT NOT NULL,
|
||||
state_group BIGINT NOT NULL
|
||||
);
|
||||
@@ -205,8 +205,9 @@ def register_cache(
|
||||
add_resizable_cache(cache_name, resize_callback)
|
||||
|
||||
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
|
||||
metric_name = "cache_%s_%s" % (cache_type, cache_name)
|
||||
caches_by_name[cache_name] = cache
|
||||
CACHE_METRIC_REGISTRY.register_hook(metric.collect)
|
||||
CACHE_METRIC_REGISTRY.register_hook(metric_name, metric.collect)
|
||||
return metric
|
||||
|
||||
|
||||
|
||||
238
synapse/util/caches/dual_lookup_cache.py
Normal file
238
synapse/util/caches/dual_lookup_cache.py
Normal file
@@ -0,0 +1,238 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import (
|
||||
Callable,
|
||||
Dict,
|
||||
Generic,
|
||||
ItemsView,
|
||||
List,
|
||||
Optional,
|
||||
TypeVar,
|
||||
Union,
|
||||
ValuesView,
|
||||
)
|
||||
|
||||
# Used to discern between a value not existing in a map, or the value being 'None'.
|
||||
SENTINEL = object()
|
||||
|
||||
# The type of the primary dict's keys.
|
||||
PKT = TypeVar("PKT")
|
||||
# The type of the primary dict's values.
|
||||
PVT = TypeVar("PVT")
|
||||
# The type of the secondary dict's keys.
|
||||
SKT = TypeVar("SKT")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SecondarySet(set):
|
||||
"""
|
||||
Used to differentiate between an entry in the secondary_dict, and a set stored
|
||||
in the primary_dict. This is necessary as pop() can return either.
|
||||
"""
|
||||
|
||||
|
||||
class DualLookupCache(Generic[PKT, PVT, SKT]):
|
||||
"""
|
||||
A backing store for LruCache that supports multiple entry points.
|
||||
Allows subsets of data to be deleted efficiently without requiring extra
|
||||
information to query.
|
||||
|
||||
The data structure is two dictionaries:
|
||||
* primary_dict containing a mapping of primary_key -> value.
|
||||
* secondary_dict containing a mapping of secondary_key -> set of primary_key.
|
||||
|
||||
On insert, a mapping in the primary_dict must be created. A mapping in the
|
||||
secondary_dict from a secondary_key to (a set containing) the same
|
||||
primary_key will be made. The secondary_key
|
||||
must be derived from the inserted value via a lambda function provided at cache
|
||||
initialisation. This is so invalidated entries in the primary_dict may automatically
|
||||
invalidate those in the secondary_dict. The secondary_key may be associated with one
|
||||
or more primary_key's.
|
||||
|
||||
This creates an interface which allows for efficient lookups of a value given
|
||||
a primary_key, as well as efficient invalidation of a subset of mapping in the
|
||||
primary_dict given a secondary_key. A primary_key may not be associated with more
|
||||
than one secondary_key.
|
||||
|
||||
As a worked example, consider storing a cache of room events. We could configure
|
||||
the cache to store mappings between EventIDs and EventBase in the primary_dict,
|
||||
while storing a mapping between room IDs and event IDs as the secondary_dict:
|
||||
|
||||
primary_dict: EventID -> EventBase
|
||||
secondary_dict: RoomID -> {EventID, EventID, ...}
|
||||
|
||||
This would be efficient for the following operations:
|
||||
* Given an EventID, look up the associated EventBase, and thus the roomID.
|
||||
* Given a RoomID, invalidate all primary_dict entries for events in that room.
|
||||
|
||||
Since this is intended as a backing store for LRUCache, when it came time to evict
|
||||
an entry from the primary_dict (EventID -> EventBase), the secondary_key could be
|
||||
derived from a provided lambda function:
|
||||
secondary_key = lambda event_base: event_base.room_id
|
||||
|
||||
The EventID set under room_id would then have the appropriate EventID entry evicted.
|
||||
"""
|
||||
|
||||
def __init__(self, secondary_key_function: Callable[[PVT], SKT]) -> None:
|
||||
self._primary_dict: Dict[PKT, PVT] = {}
|
||||
self._secondary_dict: Dict[SKT, SecondarySet] = {}
|
||||
self._secondary_key_function = secondary_key_function
|
||||
|
||||
def __setitem__(self, key: PKT, value: PVT) -> None:
|
||||
self.set(key, value)
|
||||
|
||||
def __contains__(self, key: PKT) -> bool:
|
||||
return key in self._primary_dict
|
||||
|
||||
def set(self, key: PKT, value: PVT) -> None:
|
||||
"""Add an entry to the cache.
|
||||
|
||||
Will add an entry to the primary_dict consisting of key->value, as well as append
|
||||
to the set referred to by secondary_key_function(value) in the secondary_dict.
|
||||
|
||||
Args:
|
||||
key: The key for a new mapping in primary_dict.
|
||||
value: The value for a new mapping in primary_dict.
|
||||
"""
|
||||
# Create an entry in the primary_dict.
|
||||
self._primary_dict[key] = value
|
||||
|
||||
# Derive the secondary_key to use from the given primary_value.
|
||||
secondary_key = self._secondary_key_function(value)
|
||||
|
||||
# TODO: If the lambda function resolves to None, don't insert an entry?
|
||||
|
||||
# And create a mapping in the secondary_dict to a set containing the
|
||||
# primary_key, creating the set if necessary.
|
||||
secondary_key_set = self._secondary_dict.setdefault(
|
||||
secondary_key, SecondarySet()
|
||||
)
|
||||
secondary_key_set.add(key)
|
||||
|
||||
logger.info("*** Insert into primary_dict: %s: %s", key, value)
|
||||
logger.info("*** Insert into secondary_dict: %s: %s", secondary_key, key)
|
||||
|
||||
def get(self, key: PKT, default: Optional[PVT] = None) -> Optional[PVT]:
|
||||
"""Retrieve a value from the cache if it exists. If not, return the default
|
||||
value.
|
||||
|
||||
This method simply pulls entries from the primary_dict.
|
||||
|
||||
# TODO: Any use cases for externally getting entries from the secondary_dict?
|
||||
|
||||
Args:
|
||||
key: The key to search the cache for.
|
||||
default: The default value to return if the given key is not found.
|
||||
|
||||
Returns:
|
||||
The value referenced by the given key, if it exists in the cache. If not,
|
||||
the value of `default` will be returned.
|
||||
"""
|
||||
logger.info("*** Retrieving key from primary_dict: %s", key)
|
||||
return self._primary_dict.get(key, default)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Evicts all entries from the cache."""
|
||||
self._primary_dict.clear()
|
||||
self._secondary_dict.clear()
|
||||
|
||||
def pop(
|
||||
self, key: Union[PKT, SKT], default: Optional[Union[Dict[PKT, PVT], PVT]] = None
|
||||
) -> Optional[Union[Dict[PKT, PVT], PVT]]:
|
||||
"""Remove an entry from either the primary_dict or secondary_dict.
|
||||
|
||||
The primary_dict is checked first for the key. If an entry is found, it is
|
||||
removed from the primary_dict and returned.
|
||||
|
||||
If no entry in the primary_dict exists, then the secondary_dict is checked.
|
||||
If an entry exists, all associated entries in the primary_dict will be
|
||||
deleted, and all primary_dict keys returned from this function in a SecondarySet.
|
||||
|
||||
Args:
|
||||
key: A key to drop from either the primary_dict or secondary_dict.
|
||||
default: The default value if the key does not exist in either dict.
|
||||
|
||||
Returns:
|
||||
Either a matched value from the primary_dict or the secondary_dict. If no
|
||||
value is found for the key, then None.
|
||||
"""
|
||||
# Attempt to remove from the primary_dict first.
|
||||
primary_value = self._primary_dict.pop(key, SENTINEL)
|
||||
if primary_value is not SENTINEL:
|
||||
# We found a value in the primary_dict. Remove it from the corresponding
|
||||
# entry in the secondary_dict, and then return it.
|
||||
logger.info(
|
||||
"*** Popped entry from primary_dict: %s: %s", key, primary_value
|
||||
)
|
||||
|
||||
# Derive the secondary_key from the primary_value
|
||||
secondary_key = self._secondary_key_function(primary_value)
|
||||
|
||||
# Pop the entry from the secondary_dict
|
||||
secondary_key_set = self._secondary_dict[secondary_key]
|
||||
if len(secondary_key_set) > 1:
|
||||
# Delete just the set entry for the given key.
|
||||
secondary_key_set.remove(key)
|
||||
logger.info(
|
||||
"*** Popping from secondary_dict: %s: %s", secondary_key, key
|
||||
)
|
||||
else:
|
||||
# Delete the entire set referenced by the secondary_key, as it only
|
||||
# has one entry.
|
||||
del self._secondary_dict[secondary_key]
|
||||
logger.info("*** Popping from secondary_dict: %s", secondary_key)
|
||||
|
||||
return primary_value
|
||||
|
||||
# There was no matching value in the primary_dict. Attempt the secondary_dict.
|
||||
primary_key_set = self._secondary_dict.pop(key, SENTINEL)
|
||||
if primary_key_set is not SENTINEL:
|
||||
# We found a set in the secondary_dict.
|
||||
logger.info(
|
||||
"*** Found '%s' in secondary_dict: %s: ",
|
||||
key,
|
||||
primary_key_set,
|
||||
)
|
||||
|
||||
popped_primary_dict_values: List[PVT] = []
|
||||
|
||||
# We found an entry in the secondary_dict. Delete all related entries in the
|
||||
# primary_dict.
|
||||
logger.info(
|
||||
"*** Found key in secondary_dict to pop: %s. "
|
||||
"Popping primary_dict entries",
|
||||
key,
|
||||
)
|
||||
for primary_key in primary_key_set:
|
||||
primary_value = self._primary_dict.pop(primary_key)
|
||||
logger.info("*** Popping entry from primary_dict: %s - %s", primary_key, primary_value)
|
||||
logger.info("*** primary_dict: %s", self._primary_dict)
|
||||
popped_primary_dict_values.append(primary_value)
|
||||
|
||||
# Now return the unmodified copy of the set.
|
||||
return popped_primary_dict_values
|
||||
|
||||
# No match in either dict.
|
||||
return default
|
||||
|
||||
def values(self) -> ValuesView:
|
||||
return self._primary_dict.values()
|
||||
|
||||
def items(self) -> ItemsView:
|
||||
return self._primary_dict.items()
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._primary_dict)
|
||||
@@ -46,8 +46,10 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
|
||||
from synapse.metrics.jemalloc import get_jemalloc_stats
|
||||
from synapse.util import Clock, caches
|
||||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
||||
from synapse.util.caches.dual_lookup_cache import DualLookupCache, SecondarySet
|
||||
from synapse.util.caches.treecache import (
|
||||
TreeCache,
|
||||
TreeCacheNode,
|
||||
iterate_tree_cache_entry,
|
||||
iterate_tree_cache_items,
|
||||
)
|
||||
@@ -375,12 +377,13 @@ class LruCache(Generic[KT, VT]):
|
||||
self,
|
||||
max_size: int,
|
||||
cache_name: Optional[str] = None,
|
||||
cache_type: Type[Union[dict, TreeCache]] = dict,
|
||||
cache_type: Type[Union[dict, TreeCache, DualLookupCache]] = dict,
|
||||
size_callback: Optional[Callable[[VT], int]] = None,
|
||||
metrics_collection_callback: Optional[Callable[[], None]] = None,
|
||||
apply_cache_factor_from_config: bool = True,
|
||||
clock: Optional[Clock] = None,
|
||||
prune_unread_entries: bool = True,
|
||||
dual_lookup_secondary_key_function: Optional[Callable[[Any], Any]] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -411,6 +414,10 @@ class LruCache(Generic[KT, VT]):
|
||||
prune_unread_entries: If True, cache entries that haven't been read recently
|
||||
will be evicted from the cache in the background. Set to False to
|
||||
opt-out of this behaviour.
|
||||
|
||||
# TODO: At this point we should probably just pass an initialised cache type
|
||||
# to LruCache, no?
|
||||
dual_lookup_secondary_key_function:
|
||||
"""
|
||||
# Default `clock` to something sensible. Note that we rename it to
|
||||
# `real_clock` so that mypy doesn't think its still `Optional`.
|
||||
@@ -419,7 +426,30 @@ class LruCache(Generic[KT, VT]):
|
||||
else:
|
||||
real_clock = clock
|
||||
|
||||
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
|
||||
# TODO: I've had to make this ugly to appease mypy :(
|
||||
# Perhaps initialise the backing cache and then pass to LruCache?
|
||||
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache, DualLookupCache]
|
||||
if cache_type is DualLookupCache:
|
||||
# The dual_lookup_secondary_key_function is a function that's intended to
|
||||
# extract a key from the value in the cache. Since we wrap values given to
|
||||
# us in a _Node object, this function will actually operate on a _Node,
|
||||
# instead of directly on the object type callers are expecting.
|
||||
#
|
||||
# Thus, we wrap the function given by the caller in another one that
|
||||
# extracts the value from the _Node, before then handing it off to the
|
||||
# given function for processing.
|
||||
def key_function_wrapper(node: Any) -> Any:
|
||||
assert dual_lookup_secondary_key_function is not None
|
||||
return dual_lookup_secondary_key_function(node.value)
|
||||
|
||||
cache = DualLookupCache(
|
||||
secondary_key_function=key_function_wrapper,
|
||||
)
|
||||
elif cache_type is TreeCache:
|
||||
cache = TreeCache()
|
||||
else:
|
||||
cache = {}
|
||||
|
||||
self.cache = cache # Used for introspection.
|
||||
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
||||
|
||||
@@ -722,13 +752,25 @@ class LruCache(Generic[KT, VT]):
|
||||
may be of lower cardinality than the TreeCache - in which case the whole
|
||||
subtree is deleted.
|
||||
"""
|
||||
popped = cache.pop(key, None)
|
||||
if popped is None:
|
||||
# Remove an entry from the cache.
|
||||
# In the case of a 'dict' cache type, we're just removing an entry from the
|
||||
# dict. For a TreeCache, we're removing a subtree which has children.
|
||||
popped_entry: _Node[KT, VT] = cache.pop(key, None)
|
||||
if popped_entry is None:
|
||||
return
|
||||
# for each deleted node, we now need to remove it from the linked list
|
||||
# and run its callbacks.
|
||||
for leaf in iterate_tree_cache_entry(popped):
|
||||
delete_node(leaf)
|
||||
|
||||
if isinstance(popped_entry, TreeCacheNode):
|
||||
# We've popped a subtree from a TreeCache - now we need to clean up
|
||||
# each child node.
|
||||
for leaf in iterate_tree_cache_entry(popped_entry):
|
||||
# For each deleted child node, we remove it from the linked list and
|
||||
# run its callbacks.
|
||||
delete_node(leaf)
|
||||
elif isinstance(popped_entry, SecondarySet):
|
||||
for leaf in popped_entry:
|
||||
delete_node(leaf)
|
||||
else:
|
||||
delete_node(popped_entry)
|
||||
|
||||
@synchronized
|
||||
def cache_clear() -> None:
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from functools import wraps
|
||||
from types import TracebackType
|
||||
from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
|
||||
from typing import Awaitable, Callable, Dict, Generator, Optional, Type, TypeVar
|
||||
|
||||
from prometheus_client import CollectorRegistry, Counter, Metric
|
||||
from typing_extensions import Concatenate, ParamSpec, Protocol
|
||||
@@ -220,21 +220,21 @@ class DynamicCollectorRegistry(CollectorRegistry):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._pre_update_hooks: List[Callable[[], None]] = []
|
||||
self._pre_update_hooks: Dict[str, Callable[[], None]] = {}
|
||||
|
||||
def collect(self) -> Generator[Metric, None, None]:
|
||||
"""
|
||||
Collects metrics, calling pre-update hooks first.
|
||||
"""
|
||||
|
||||
for pre_update_hook in self._pre_update_hooks:
|
||||
for pre_update_hook in self._pre_update_hooks.values():
|
||||
pre_update_hook()
|
||||
|
||||
yield from super().collect()
|
||||
|
||||
def register_hook(self, hook: Callable[[], None]) -> None:
|
||||
def register_hook(self, metric_name: str, hook: Callable[[], None]) -> None:
|
||||
"""
|
||||
Registers a hook that is called before metric collection.
|
||||
"""
|
||||
|
||||
self._pre_update_hooks.append(hook)
|
||||
self._pre_update_hooks[metric_name] = hook
|
||||
|
||||
@@ -227,3 +227,225 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
||||
|
||||
if prev_exists_as_outlier:
|
||||
self.mock_federation_transport_client.get_event.assert_not_called()
|
||||
|
||||
def test_process_pulled_event_records_failed_backfill_attempts(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Test to make sure that failed backfill attempts for an event are
|
||||
recorded in the `event_failed_pull_attempts` table.
|
||||
|
||||
In this test, we pretend we are processing a "pulled" event via
|
||||
backfill. The pulled event has a fake `prev_event` which our server has
|
||||
obviously never seen before so it attempts to request the state at that
|
||||
`prev_event` which expectedly fails because it's a fake event. Because
|
||||
the server can't fetch the state at the missing `prev_event`, the
|
||||
"pulled" event fails the history check and is fails to process.
|
||||
|
||||
We check that we correctly record the number of failed pull attempts
|
||||
of the pulled event and as a sanity check, that the "pulled" event isn't
|
||||
persisted.
|
||||
"""
|
||||
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
|
||||
main_store = self.hs.get_datastores().main
|
||||
|
||||
# Create the room
|
||||
user_id = self.register_user("kermit", "test")
|
||||
tok = self.login("kermit", "test")
|
||||
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||
room_version = self.get_success(main_store.get_room_version(room_id))
|
||||
|
||||
# We expect an outbound request to /state_ids, so stub that out
|
||||
self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
|
||||
{
|
||||
# Mimic the other server not knowing about the state at all.
|
||||
# We want to cause Synapse to throw an error (`Unable to get
|
||||
# missing prev_event $fake_prev_event`) and fail to backfill
|
||||
# the pulled event.
|
||||
"pdu_ids": [],
|
||||
"auth_chain_ids": [],
|
||||
}
|
||||
)
|
||||
# We also expect an outbound request to /state
|
||||
self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
|
||||
StateRequestResponse(
|
||||
# Mimic the other server not knowing about the state at all.
|
||||
# We want to cause Synapse to throw an error (`Unable to get
|
||||
# missing prev_event $fake_prev_event`) and fail to backfill
|
||||
# the pulled event.
|
||||
auth_events=[],
|
||||
state=[],
|
||||
)
|
||||
)
|
||||
|
||||
pulled_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"type": "test_regular_type",
|
||||
"room_id": room_id,
|
||||
"sender": OTHER_USER,
|
||||
"prev_events": [
|
||||
# The fake prev event will make the pulled event fail
|
||||
# the history check (`Unable to get missing prev_event
|
||||
# $fake_prev_event`)
|
||||
"$fake_prev_event"
|
||||
],
|
||||
"auth_events": [],
|
||||
"origin_server_ts": 1,
|
||||
"depth": 12,
|
||||
"content": {"body": "pulled"},
|
||||
}
|
||||
),
|
||||
room_version,
|
||||
)
|
||||
|
||||
# The function under test: try to process the pulled event
|
||||
with LoggingContext("test"):
|
||||
self.get_success(
|
||||
self.hs.get_federation_event_handler()._process_pulled_event(
|
||||
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
|
||||
)
|
||||
)
|
||||
|
||||
# Make sure our failed pull attempt was recorded
|
||||
backfill_num_attempts = self.get_success(
|
||||
main_store.db_pool.simple_select_one_onecol(
|
||||
table="event_failed_pull_attempts",
|
||||
keyvalues={"event_id": pulled_event.event_id},
|
||||
retcol="num_attempts",
|
||||
)
|
||||
)
|
||||
self.assertEqual(backfill_num_attempts, 1)
|
||||
|
||||
# The function under test: try to process the pulled event again
|
||||
with LoggingContext("test"):
|
||||
self.get_success(
|
||||
self.hs.get_federation_event_handler()._process_pulled_event(
|
||||
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
|
||||
)
|
||||
)
|
||||
|
||||
# Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
|
||||
backfill_num_attempts = self.get_success(
|
||||
main_store.db_pool.simple_select_one_onecol(
|
||||
table="event_failed_pull_attempts",
|
||||
keyvalues={"event_id": pulled_event.event_id},
|
||||
retcol="num_attempts",
|
||||
)
|
||||
)
|
||||
self.assertEqual(backfill_num_attempts, 2)
|
||||
|
||||
# And as a sanity check, make sure the event was not persisted through all of this.
|
||||
persisted = self.get_success(
|
||||
main_store.get_event(pulled_event.event_id, allow_none=True)
|
||||
)
|
||||
self.assertIsNone(
|
||||
persisted,
|
||||
"pulled event that fails the history check should not be persisted at all",
|
||||
)
|
||||
|
||||
def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Test to make sure that failed pull attempts
|
||||
(`event_failed_pull_attempts` table) for an event are cleared after the
|
||||
event is successfully persisted.
|
||||
|
||||
In this test, we pretend we are processing a "pulled" event via
|
||||
backfill. The pulled event succesfully processes and the backward
|
||||
extremeties are updated along with clearing out any failed pull attempts
|
||||
for those old extremities.
|
||||
|
||||
We check that we correctly cleared failed pull attempts of the
|
||||
pulled event.
|
||||
"""
|
||||
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
|
||||
main_store = self.hs.get_datastores().main
|
||||
|
||||
# Create the room
|
||||
user_id = self.register_user("kermit", "test")
|
||||
tok = self.login("kermit", "test")
|
||||
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||
room_version = self.get_success(main_store.get_room_version(room_id))
|
||||
|
||||
# allow the remote user to send state events
|
||||
self.helper.send_state(
|
||||
room_id,
|
||||
"m.room.power_levels",
|
||||
{"events_default": 0, "state_default": 0},
|
||||
tok=tok,
|
||||
)
|
||||
|
||||
# add the remote user to the room
|
||||
member_event = self.get_success(
|
||||
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
|
||||
)
|
||||
|
||||
initial_state_map = self.get_success(
|
||||
main_store.get_partial_current_state_ids(room_id)
|
||||
)
|
||||
|
||||
auth_event_ids = [
|
||||
initial_state_map[("m.room.create", "")],
|
||||
initial_state_map[("m.room.power_levels", "")],
|
||||
member_event.event_id,
|
||||
]
|
||||
|
||||
pulled_event = make_event_from_dict(
|
||||
self.add_hashes_and_signatures_from_other_server(
|
||||
{
|
||||
"type": "test_regular_type",
|
||||
"room_id": room_id,
|
||||
"sender": OTHER_USER,
|
||||
"prev_events": [member_event.event_id],
|
||||
"auth_events": auth_event_ids,
|
||||
"origin_server_ts": 1,
|
||||
"depth": 12,
|
||||
"content": {"body": "pulled"},
|
||||
}
|
||||
),
|
||||
room_version,
|
||||
)
|
||||
|
||||
# Fake the "pulled" event failing to backfill once so we can test
|
||||
# if it's cleared out later on.
|
||||
self.get_success(
|
||||
main_store.record_event_failed_pull_attempt(
|
||||
pulled_event.room_id, pulled_event.event_id, "fake cause"
|
||||
)
|
||||
)
|
||||
# Make sure we have a failed pull attempt recorded for the pulled event
|
||||
backfill_num_attempts = self.get_success(
|
||||
main_store.db_pool.simple_select_one_onecol(
|
||||
table="event_failed_pull_attempts",
|
||||
keyvalues={"event_id": pulled_event.event_id},
|
||||
retcol="num_attempts",
|
||||
)
|
||||
)
|
||||
self.assertEqual(backfill_num_attempts, 1)
|
||||
|
||||
# The function under test: try to process the pulled event
|
||||
with LoggingContext("test"):
|
||||
self.get_success(
|
||||
self.hs.get_federation_event_handler()._process_pulled_event(
|
||||
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
|
||||
)
|
||||
)
|
||||
|
||||
# Make sure the failed pull attempts for the pulled event are cleared
|
||||
backfill_num_attempts = self.get_success(
|
||||
main_store.db_pool.simple_select_one_onecol(
|
||||
table="event_failed_pull_attempts",
|
||||
keyvalues={"event_id": pulled_event.event_id},
|
||||
retcol="num_attempts",
|
||||
allow_none=True,
|
||||
)
|
||||
)
|
||||
self.assertIsNone(backfill_num_attempts)
|
||||
|
||||
# And as a sanity check, make sure the "pulled" event was persisted.
|
||||
persisted = self.get_success(
|
||||
main_store.get_event(pulled_event.event_id, allow_none=True)
|
||||
)
|
||||
self.assertIsNotNone(persisted, "pulled event was not persisted at all")
|
||||
|
||||
@@ -404,6 +404,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||
event.event_id,
|
||||
{user_id: actions for user_id, actions in push_actions},
|
||||
False,
|
||||
"main",
|
||||
)
|
||||
)
|
||||
return event, context
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import io
|
||||
from typing import Iterable, Optional, Tuple
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from matrix_common.types.mxc_uri import MXCUri
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
@@ -63,9 +65,9 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
last_accessed_ms: Optional[int],
|
||||
is_quarantined: Optional[bool] = False,
|
||||
is_protected: Optional[bool] = False,
|
||||
) -> str:
|
||||
) -> MXCUri:
|
||||
# "Upload" some media to the local media store
|
||||
mxc_uri = self.get_success(
|
||||
mxc_uri: MXCUri = self.get_success(
|
||||
media_repository.create_content(
|
||||
media_type="text/plain",
|
||||
upload_name=None,
|
||||
@@ -75,13 +77,11 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
)
|
||||
|
||||
media_id = mxc_uri.split("/")[-1]
|
||||
|
||||
# Set the last recently accessed time for this media
|
||||
if last_accessed_ms is not None:
|
||||
self.get_success(
|
||||
self.store.update_cached_last_access_time(
|
||||
local_media=(media_id,),
|
||||
local_media=(mxc_uri.media_id,),
|
||||
remote_media=(),
|
||||
time_ms=last_accessed_ms,
|
||||
)
|
||||
@@ -92,7 +92,7 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
self.get_success(
|
||||
self.store.quarantine_media_by_id(
|
||||
server_name=self.hs.config.server.server_name,
|
||||
media_id=media_id,
|
||||
media_id=mxc_uri.media_id,
|
||||
quarantined_by="@theadmin:test",
|
||||
)
|
||||
)
|
||||
@@ -101,18 +101,18 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
# Mark this media as protected from quarantine
|
||||
self.get_success(
|
||||
self.store.mark_local_media_as_safe(
|
||||
media_id=media_id,
|
||||
media_id=mxc_uri.media_id,
|
||||
safe=True,
|
||||
)
|
||||
)
|
||||
|
||||
return media_id
|
||||
return mxc_uri
|
||||
|
||||
def _cache_remote_media_and_set_attributes(
|
||||
media_id: str,
|
||||
last_accessed_ms: Optional[int],
|
||||
is_quarantined: Optional[bool] = False,
|
||||
) -> str:
|
||||
) -> MXCUri:
|
||||
# Pretend to cache some remote media
|
||||
self.get_success(
|
||||
self.store.store_cached_remote_media(
|
||||
@@ -146,7 +146,7 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
)
|
||||
|
||||
return media_id
|
||||
return MXCUri(self.remote_server_name, media_id)
|
||||
|
||||
# Start with the local media store
|
||||
self.local_recently_accessed_media = _create_media_and_set_attributes(
|
||||
@@ -214,28 +214,16 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
# Remote media should be unaffected.
|
||||
self._assert_if_mxc_uris_purged(
|
||||
purged=[
|
||||
(
|
||||
self.hs.config.server.server_name,
|
||||
self.local_not_recently_accessed_media,
|
||||
),
|
||||
(self.hs.config.server.server_name, self.local_never_accessed_media),
|
||||
self.local_not_recently_accessed_media,
|
||||
self.local_never_accessed_media,
|
||||
],
|
||||
not_purged=[
|
||||
(self.hs.config.server.server_name, self.local_recently_accessed_media),
|
||||
(
|
||||
self.hs.config.server.server_name,
|
||||
self.local_not_recently_accessed_quarantined_media,
|
||||
),
|
||||
(
|
||||
self.hs.config.server.server_name,
|
||||
self.local_not_recently_accessed_protected_media,
|
||||
),
|
||||
(self.remote_server_name, self.remote_recently_accessed_media),
|
||||
(self.remote_server_name, self.remote_not_recently_accessed_media),
|
||||
(
|
||||
self.remote_server_name,
|
||||
self.remote_not_recently_accessed_quarantined_media,
|
||||
),
|
||||
self.local_recently_accessed_media,
|
||||
self.local_not_recently_accessed_quarantined_media,
|
||||
self.local_not_recently_accessed_protected_media,
|
||||
self.remote_recently_accessed_media,
|
||||
self.remote_not_recently_accessed_media,
|
||||
self.remote_not_recently_accessed_quarantined_media,
|
||||
],
|
||||
)
|
||||
|
||||
@@ -261,49 +249,35 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
# Remote media accessed <30 days ago should still exist.
|
||||
self._assert_if_mxc_uris_purged(
|
||||
purged=[
|
||||
(self.remote_server_name, self.remote_not_recently_accessed_media),
|
||||
self.remote_not_recently_accessed_media,
|
||||
],
|
||||
not_purged=[
|
||||
(self.remote_server_name, self.remote_recently_accessed_media),
|
||||
(self.hs.config.server.server_name, self.local_recently_accessed_media),
|
||||
(
|
||||
self.hs.config.server.server_name,
|
||||
self.local_not_recently_accessed_media,
|
||||
),
|
||||
(
|
||||
self.hs.config.server.server_name,
|
||||
self.local_not_recently_accessed_quarantined_media,
|
||||
),
|
||||
(
|
||||
self.hs.config.server.server_name,
|
||||
self.local_not_recently_accessed_protected_media,
|
||||
),
|
||||
(
|
||||
self.remote_server_name,
|
||||
self.remote_not_recently_accessed_quarantined_media,
|
||||
),
|
||||
(self.hs.config.server.server_name, self.local_never_accessed_media),
|
||||
self.remote_recently_accessed_media,
|
||||
self.local_recently_accessed_media,
|
||||
self.local_not_recently_accessed_media,
|
||||
self.local_not_recently_accessed_quarantined_media,
|
||||
self.local_not_recently_accessed_protected_media,
|
||||
self.remote_not_recently_accessed_quarantined_media,
|
||||
self.local_never_accessed_media,
|
||||
],
|
||||
)
|
||||
|
||||
def _assert_if_mxc_uris_purged(
|
||||
self, purged: Iterable[Tuple[str, str]], not_purged: Iterable[Tuple[str, str]]
|
||||
self, purged: Iterable[MXCUri], not_purged: Iterable[MXCUri]
|
||||
) -> None:
|
||||
def _assert_mxc_uri_purge_state(
|
||||
server_name: str, media_id: str, expect_purged: bool
|
||||
) -> None:
|
||||
def _assert_mxc_uri_purge_state(mxc_uri: MXCUri, expect_purged: bool) -> None:
|
||||
"""Given an MXC URI, assert whether it has been purged or not."""
|
||||
if server_name == self.hs.config.server.server_name:
|
||||
if mxc_uri.server_name == self.hs.config.server.server_name:
|
||||
found_media_dict = self.get_success(
|
||||
self.store.get_local_media(media_id)
|
||||
self.store.get_local_media(mxc_uri.media_id)
|
||||
)
|
||||
else:
|
||||
found_media_dict = self.get_success(
|
||||
self.store.get_cached_remote_media(server_name, media_id)
|
||||
self.store.get_cached_remote_media(
|
||||
mxc_uri.server_name, mxc_uri.media_id
|
||||
)
|
||||
)
|
||||
|
||||
mxc_uri = f"mxc://{server_name}/{media_id}"
|
||||
|
||||
if expect_purged:
|
||||
self.assertIsNone(
|
||||
found_media_dict, msg=f"{mxc_uri} unexpectedly not purged"
|
||||
@@ -315,7 +289,7 @@ class MediaRetentionTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
# Assert that the given MXC URIs have either been correctly purged or not.
|
||||
for server_name, media_id in purged:
|
||||
_assert_mxc_uri_purge_state(server_name, media_id, expect_purged=True)
|
||||
for server_name, media_id in not_purged:
|
||||
_assert_mxc_uri_purge_state(server_name, media_id, expect_purged=False)
|
||||
for mxc_uri in purged:
|
||||
_assert_mxc_uri_purge_state(mxc_uri, expect_purged=True)
|
||||
for mxc_uri in not_purged:
|
||||
_assert_mxc_uri_purge_state(mxc_uri, expect_purged=False)
|
||||
|
||||
@@ -115,6 +115,5 @@ class PurgeTests(HomeserverTestCase):
|
||||
)
|
||||
|
||||
# The events aren't found.
|
||||
self.store._invalidate_local_get_event_cache(create_event.event_id)
|
||||
self.get_failure(self.store.get_event(create_event.event_id), NotFoundError)
|
||||
self.get_failure(self.store.get_event(first["event_id"]), NotFoundError)
|
||||
|
||||
Reference in New Issue
Block a user