Compare commits
3 Commits
v1.140.0rc
...
erikj/fewe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d97a894e1 | ||
|
|
c7d897def7 | ||
|
|
74dd17dad4 |
1
changelog.d/17128.misc
Normal file
1
changelog.d/17128.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improve DB usage when fetching related events.
|
||||
@@ -148,6 +148,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
||||
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user_with_stream_ordering", None
|
||||
)
|
||||
|
||||
@@ -43,6 +43,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import StrCollection
|
||||
from synapse.util.caches.descriptors import CachedFunction
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
@@ -233,8 +234,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
room_id = row.keys[0]
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
self._invalidate_caches_for_room(room_id)
|
||||
server_joined = bool(row.keys.get(1, "true"))
|
||||
self._invalidate_caches_for_room(room_id, server_joined)
|
||||
else:
|
||||
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
|
||||
|
||||
@@ -388,7 +389,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._invalidate_local_get_event_cache_room_id(room_id) # type: ignore[attr-defined]
|
||||
|
||||
self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
@@ -398,11 +398,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_id", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user_with_stream_ordering", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", None)
|
||||
@@ -417,17 +413,28 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
|
||||
|
||||
def _invalidate_caches_for_room_and_stream(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
server_in_room: bool,
|
||||
) -> None:
|
||||
"""Invalidate caches associated with rooms, and stream to replication.
|
||||
|
||||
Used when we delete rooms.
|
||||
|
||||
Args:
|
||||
txn
|
||||
room_id
|
||||
server_in_room: Whether the server was joined or invited to the
|
||||
room when we deleted it.
|
||||
"""
|
||||
|
||||
self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
|
||||
txn.call_after(self._invalidate_caches_for_room, room_id)
|
||||
self._send_invalidation_to_replication(
|
||||
txn, DELETE_ROOM_CACHE_NAME, [room_id, "true" if server_in_room else ""]
|
||||
)
|
||||
txn.call_after(self._invalidate_caches_for_room, room_id, server_in_room)
|
||||
|
||||
def _invalidate_caches_for_room(self, room_id: str) -> None:
|
||||
def _invalidate_caches_for_room(self, room_id: str, server_in_room: bool) -> None:
|
||||
"""Invalidate caches associated with rooms.
|
||||
|
||||
Used when we delete rooms.
|
||||
@@ -439,8 +446,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
|
||||
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
|
||||
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
|
||||
|
||||
if server_in_room:
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_latest_event_ids_in_room", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
|
||||
|
||||
# And delete state caches.
|
||||
self._invalidate_state_caches_all(room_id)
|
||||
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
@@ -453,19 +468,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"_get_partial_state_servers_at_join", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_current_hosts_in_room_ordered", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
|
||||
|
||||
# And delete state caches.
|
||||
|
||||
self._invalidate_state_caches_all(room_id)
|
||||
|
||||
async def invalidate_cache_and_stream(
|
||||
self, cache_name: str, keys: Tuple[Any, ...]
|
||||
) -> None:
|
||||
@@ -560,7 +569,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
for chunk in batch_iter(members_changed, 50):
|
||||
keys = itertools.chain([room_id], chunk)
|
||||
self._send_invalidation_to_replication(
|
||||
txn, CURRENT_STATE_CACHE_NAME, keys
|
||||
txn, CURRENT_STATE_CACHE_NAME, list(keys)
|
||||
)
|
||||
else:
|
||||
# if no members changed, we still need to invalidate the other caches.
|
||||
@@ -579,7 +588,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
def _send_invalidation_to_replication(
|
||||
self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]]
|
||||
self, txn: LoggingTransaction, cache_name: str, keys: Optional[StrCollection]
|
||||
) -> None:
|
||||
"""Notifies replication that given cache has been invalidated.
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
import logging
|
||||
from typing import Any, List, Set, Tuple, cast
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main import CacheInvalidationWorkerStore
|
||||
@@ -376,6 +377,20 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
|
||||
txn,
|
||||
room_id,
|
||||
host=self.hs.hostname,
|
||||
membership=Membership.JOIN,
|
||||
)
|
||||
if not server_in_room:
|
||||
server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
|
||||
txn,
|
||||
room_id,
|
||||
host=self.hs.hostname,
|
||||
membership=Membership.INVITE,
|
||||
)
|
||||
|
||||
# First, fetch all the state groups that should be deleted, before
|
||||
# we delete that information.
|
||||
txn.execute(
|
||||
@@ -503,6 +518,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# index on them. In any case we should be clearing out 'stream' tables
|
||||
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
|
||||
|
||||
self._invalidate_caches_for_room_and_stream(txn, room_id)
|
||||
self._invalidate_caches_for_room_and_stream(txn, room_id, server_in_room)
|
||||
|
||||
return state_groups
|
||||
|
||||
@@ -962,6 +962,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
async def _check_host_room_membership(
|
||||
self, room_id: str, host: str, membership: str
|
||||
) -> bool:
|
||||
return await self.db_pool.runInteraction(
|
||||
"is_host_joined",
|
||||
self._check_host_room_membership_txn,
|
||||
room_id,
|
||||
host,
|
||||
membership,
|
||||
)
|
||||
|
||||
def _check_host_room_membership_txn(
|
||||
self, txn: LoggingTransaction, room_id: str, host: str, membership: str
|
||||
) -> bool:
|
||||
if "%" in host or "_" in host:
|
||||
raise Exception("Invalid host name")
|
||||
@@ -980,14 +991,14 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
# the returned user actually has the correct domain.
|
||||
like_clause = "%:" + host
|
||||
|
||||
rows = await self.db_pool.execute(
|
||||
"is_host_joined", sql, membership, room_id, like_clause
|
||||
)
|
||||
txn.execute(sql, (membership, room_id, like_clause))
|
||||
|
||||
if not rows:
|
||||
row = txn.fetchone()
|
||||
|
||||
if not row:
|
||||
return False
|
||||
|
||||
user_id = rows[0][0]
|
||||
user_id = row[0]
|
||||
if get_domain_from_id(user_id) != host:
|
||||
# This can only happen if the host name has something funky in it
|
||||
raise Exception("Invalid host name")
|
||||
|
||||
Reference in New Issue
Block a user