1
0

More cache updates (LruCache not finished)

This commit is contained in:
Eric Eastwood
2025-05-19 16:59:17 -05:00
parent d3c23f8235
commit d3d228ea2d
22 changed files with 160 additions and 63 deletions
+3 -1
View File
@@ -280,7 +280,9 @@ class TypingWriterHandler(FollowerTypingHandler):
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial
name="TypingStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=self._latest_room_serial,
)
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
@@ -50,6 +50,7 @@ from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.caches import CacheManager
logger = logging.getLogger(__name__)
@@ -81,6 +82,8 @@ class MatrixFederationAgent:
reactor might have some blocking applied (i.e. for DNS queries),
but we need unblocked access to the proxy.
cache_manager: The cache manager to handle metrics
_srv_resolver:
SrvResolver implementation to use for looking up SRV records. None
to use a default implementation.
@@ -92,11 +95,13 @@ class MatrixFederationAgent:
def __init__(
self,
*,
reactor: ISynapseReactor,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_allowlist: Optional[IPSet],
ip_blocklist: IPSet,
cache_manager: CacheManager,
_srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None,
):
@@ -139,6 +144,7 @@ class MatrixFederationAgent:
ip_blocklist=ip_blocklist,
),
user_agent=self.user_agent,
cache_manager=cache_manager,
)
self._well_known_resolver = _well_known_resolver
+10 -11
View File
@@ -36,6 +36,7 @@ from twisted.web.iweb import IAgent, IResponse
from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock, json_decoder
from synapse.util.caches import CacheManager
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
@@ -77,10 +78,6 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3
logger = logging.getLogger(__name__)
_well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache("well-known")
_had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache("had-valid-well-known")
@attr.s(slots=True, frozen=True, auto_attribs=True)
class WellKnownLookupResult:
delegated_server: Optional[bytes]
@@ -94,20 +91,22 @@ class WellKnownResolver:
reactor: IReactorTime,
agent: IAgent,
user_agent: bytes,
cache_manager: CacheManager,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
):
self._reactor = reactor
self._clock = Clock(reactor)
if well_known_cache is None:
well_known_cache = _well_known_cache
self._well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
"well-known",
cache_manager=cache_manager,
)
self._had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache(
"had-valid-well-known",
cache_manager=cache_manager,
)
if had_well_known_cache is None:
had_well_known_cache = _had_valid_well_known_cache
self._well_known_cache = well_known_cache
self._had_valid_well_known_cache = had_well_known_cache
self._well_known_agent = RedirectAgent(agent)
self.user_agent = user_agent
+6 -5
View File
@@ -417,11 +417,12 @@ class MatrixFederationHttpClient:
if hs.get_instance_name() in outbound_federation_restricted_to:
# Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
reactor=self.reactor,
tls_client_options_factory=tls_client_options_factory,
user_agent=user_agent.encode("ascii"),
ip_allowlist=hs.config.server.federation_ip_range_allowlist,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
cache_manager=hs.get_cache_manager(),
)
else:
proxy_authorization_secret = hs.config.worker.worker_replication_secret
@@ -63,6 +63,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.cache_manager = hs.get_cache_manager()
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
@@ -87,7 +88,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max
name="AccountDataAndTagsChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=account_max,
)
self.db_pool.updates.register_background_index_update(
@@ -114,8 +114,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
min_device_inbox_id,
name="DeviceInboxStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
)
@@ -130,8 +131,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
min_device_outbox_id,
name="DeviceFederationOutboxStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)
+12 -8
View File
@@ -128,8 +128,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=10000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
name="DeviceListStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_list_id,
prefilled_cache=device_list_prefill,
)
@@ -142,8 +143,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=10000,
)
self._device_list_room_stream_cache = StreamChangeCache(
"DeviceListRoomStreamChangeCache",
min_device_list_room_id,
name="DeviceListRoomStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_list_room_id,
prefilled_cache=device_list_room_prefill,
)
@@ -159,8 +161,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=1000,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache",
user_signature_stream_list_id,
name="UserSignatureStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=user_signature_stream_list_id,
prefilled_cache=user_signature_stream_prefill,
)
@@ -178,8 +181,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=10000,
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache",
device_list_federation_list_id,
name="DeviceListFederationStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=device_list_federation_list_id,
prefilled_cache=device_list_federation_prefill,
)
@@ -269,8 +269,9 @@ class EventsWorkerStore(SQLBaseStore):
limit=1000,
)
self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
"_curr_state_delta_stream_cache",
min_curr_state_delta_id,
name="_curr_state_delta_stream_cache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)
+3 -2
View File
@@ -108,8 +108,9 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache",
min_presence_val,
name="PresenceStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_presence_val,
prefilled_cache=presence_cache_prefill,
)
+3 -2
View File
@@ -163,8 +163,9 @@ class PushRulesWorkerStore(
)
self.push_rules_stream_cache = StreamChangeCache(
"PushRulesStreamChangeCache",
push_rules_id,
name="PushRulesStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=push_rules_id,
prefilled_cache=push_rules_prefill,
)
+3 -2
View File
@@ -158,8 +158,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
limit=10000,
)
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache",
min_receipts_stream_id,
name="ReceiptsRoomChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_receipts_stream_id,
prefilled_cache=receipts_stream_prefill,
)
+6 -3
View File
@@ -617,12 +617,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache",
min_event_val,
name="EventsRoomStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max
name="MembershipStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=events_max,
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
+4 -1
View File
@@ -233,7 +233,6 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
) -> Callable[..., "defer.Deferred[Any]"]:
cache: DeferredCache[CacheKey, Any] = DeferredCache(
name=self.name,
cache_manager=
max_entries=self.max_entries,
tree=self.tree,
iterable=self.iterable,
@@ -487,6 +486,7 @@ class _CachedFunctionDescriptor:
cache_context: bool
iterable: bool
prune_unread_entries: bool
name: Optional[str]
def __call__(self, orig: F) -> CachedFunction[F]:
d = DeferredCacheDescriptor(
@@ -498,6 +498,7 @@ class _CachedFunctionDescriptor:
cache_context=self.cache_context,
iterable=self.iterable,
prune_unread_entries=self.prune_unread_entries,
name=self.name,
)
return cast(CachedFunction[F], d)
@@ -511,6 +512,7 @@ def cached(
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
name: Optional[str] = None,
) -> _CachedFunctionDescriptor:
return _CachedFunctionDescriptor(
max_entries=max_entries,
@@ -520,6 +522,7 @@ def cached(
cache_context=cache_context,
iterable=iterable,
prune_unread_entries=prune_unread_entries,
name=name,
)
+5 -2
View File
@@ -51,7 +51,7 @@ from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, CacheManager
from synapse.util.caches import CacheManager, CacheMetric, EvictionReason
from synapse.util.caches.treecache import (
TreeCache,
iterate_tree_cache_entry,
@@ -119,7 +119,10 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
@wrap_as_background_process("LruCache._expire_old_entries")
async def _expire_old_entries(
hs: HomeServer, clock: Clock, expiry_seconds: float, autotune_config: Optional[dict]
hs: "HomeServer",
clock: Clock,
expiry_seconds: float,
autotune_config: Optional[dict],
) -> None:
"""Walks the global cache list to find cache entries that haven't been
accessed in the given number of seconds, or if a given memory threshold has been breached.
+4 -2
View File
@@ -26,7 +26,7 @@ from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Un
import attr
from sortedcontainers import SortedDict
from synapse.util import caches
from synapse.util.caches import CacheManager
logger = logging.getLogger(__name__)
@@ -73,7 +73,9 @@ class StreamChangeCache:
def __init__(
self,
*,
name: str,
cache_manager: CacheManager,
current_stream_pos: int,
max_size: int = 10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
@@ -95,7 +97,7 @@ class StreamChangeCache:
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = caches.register_cache(
self.metrics = cache_manager.register_cache(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
)
+10 -3
View File
@@ -26,7 +26,7 @@ from typing import Any, Callable, Dict, Generic, Tuple, TypeVar, Union
import attr
from sortedcontainers import SortedList
from synapse.util.caches import register_cache
from synapse.util.caches import CacheManager
logger = logging.getLogger(__name__)
@@ -40,7 +40,12 @@ VT = TypeVar("VT")
class TTLCache(Generic[KT, VT]):
"""A key/value cache implementation where each entry has its own TTL"""
def __init__(self, cache_name: str, timer: Callable[[], float] = time.time):
def __init__(
self,
cache_name: str,
cache_manager: CacheManager,
timer: Callable[[], float] = time.time,
):
# map from key to _CacheEntry
self._data: Dict[KT, _CacheEntry[KT, VT]] = {}
@@ -49,7 +54,9 @@ class TTLCache(Generic[KT, VT]):
self._timer = timer
self._metrics = register_cache("ttl", cache_name, self, resizable=False)
self._metrics = cache_manager.register_cache(
"ttl", cache_name, self, resizable=False
)
def set(self, key: KT, value: VT, ttl: float) -> None:
"""Add/update an entry in the cache
+4 -1
View File
@@ -37,6 +37,7 @@ from synapse.http.federation.matrix_federation_agent import MatrixFederationAgen
from synapse.server import HomeServer
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, create_requester
from synapse.util import Clock
from synapse.util.caches import CacheManager
from tests import unittest
from tests.server import ThreadedMemoryReactorClock
@@ -86,11 +87,12 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.mock_federation_client = AsyncMock(spec=["put_json"])
self.mock_federation_client.put_json.return_value = (200, "OK")
self.mock_federation_client.agent = MatrixFederationAgent(
reactor,
reactor=reactor,
tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0",
ip_allowlist=None,
ip_blocklist=IPSet(),
cache_manager=Mock(spec=CacheManager),
)
# the tests assume that we are starting at unix time 1000
@@ -103,6 +105,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
keyring=mock_keyring,
replication_streams={},
)
hs._federation_http_client = self.mock_federation_client
return hs
@@ -21,7 +21,7 @@ import base64
import logging
import os
from typing import Generator, List, Optional, cast
from unittest.mock import AsyncMock, call, patch
from unittest.mock import AsyncMock, Mock, call, patch
import treq
from netaddr import IPSet
@@ -60,6 +60,7 @@ from synapse.logging.context import (
current_context,
)
from synapse.types import ISynapseReactor
from synapse.util.caches import CacheManager
from synapse.util.caches.ttlcache import TTLCache
from tests import unittest
@@ -84,16 +85,23 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.tls_factory = FederationPolicyForHTTPS(config)
cache_manager = Mock(spec=CacheManager)
self.well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
"test_cache", timer=self.reactor.seconds
"test_cache",
timer=self.reactor.seconds,
cache_manager=cache_manager,
)
self.had_well_known_cache: TTLCache[bytes, bool] = TTLCache(
"test_cache", timer=self.reactor.seconds
"test_cache",
timer=self.reactor.seconds,
cache_manager=cache_manager,
)
self.well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
b"test-agent",
cache_manager=cache_manager,
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
)
@@ -274,6 +282,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
ip_allowlist=IPSet(),
ip_blocklist=IPSet(),
cache_manager=Mock(spec=CacheManager),
_srv_resolver=self.mock_resolver,
_well_known_resolver=self.well_known_resolver,
)
@@ -1016,11 +1025,13 @@ class MatrixFederationAgentTests(unittest.TestCase):
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
ip_allowlist=IPSet(),
ip_blocklist=IPSet(),
cache_manager=Mock(spec=CacheManager),
_srv_resolver=self.mock_resolver,
_well_known_resolver=WellKnownResolver(
cast(ISynapseReactor, self.reactor),
Agent(self.reactor, contextFactory=tls_factory),
b"test-agent",
cache_manager=Mock(spec=CacheManager),
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
),
+3 -1
View File
@@ -139,7 +139,9 @@ class TypingStreamTestCase(BaseStreamTestCase):
self.hs.get_replication_command_handler()._streams["typing"].last_token = 0
typing._latest_room_serial = 0
typing._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", typing._latest_room_serial
name="TypingStreamChangeCache",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=typing._latest_room_serial,
)
typing._reset()
@@ -42,6 +42,7 @@ from synapse.server import HomeServer
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock
from synapse.util.caches import CacheManager
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import get_clock
@@ -73,6 +74,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
user_agent=b"SynapseInTrialTest/0.0.0",
ip_allowlist=None,
ip_blocklist=IPSet(),
cache_manager=Mock(spec=CacheManager),
)
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+6 -1
View File
@@ -20,6 +20,7 @@
from unittest.mock import Mock
from synapse.util.caches import CacheManager
from synapse.util.caches.ttlcache import TTLCache
from tests import unittest
@@ -28,7 +29,11 @@ from tests import unittest
class CacheTestCase(unittest.TestCase):
def setUp(self) -> None:
self.mock_timer = Mock(side_effect=lambda: 100.0)
self.cache: TTLCache[str, str] = TTLCache("test_cache", self.mock_timer)
self.cache: TTLCache[str, str] = TTLCache(
"test_cache",
cache_manager=Mock(spec=CacheManager),
timer=self.mock_timer,
)
def test_get(self) -> None:
"""simple set/get tests"""
+43 -8
View File
@@ -15,7 +15,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
Providing a prefilled cache to StreamChangeCache will result in a cache
with the prefilled-cache entered in.
"""
cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
prefilled_cache={"user@foo.com": 2},
)
self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
def test_has_entity_changed(self) -> None:
@@ -23,7 +28,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
StreamChangeCache.entity_has_changed will mark entities as changed, and
has_entity_changed will observe the changed entities.
"""
cache = StreamChangeCache("#test", 3)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=3,
)
cache.entity_has_changed("user@foo.com", 6)
cache.entity_has_changed("bar@baz.net", 7)
@@ -61,7 +70,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
StreamChangeCache.entity_has_changed will respect the max size and
purge the oldest items upon reaching that max size.
"""
cache = StreamChangeCache("#test", 1, max_size=2)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
max_size=2,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -100,7 +114,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
entities since the given position. If the position is before the start
of the known stream, it returns None instead.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -148,7 +166,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
stream position is before it, it will return True, otherwise False if
the cache has no entries.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
# With no entities, it returns True for the past, present, and False for
# the future.
@@ -175,7 +197,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
stream position is earlier than the earliest known position, it will
return all of the entities queried for.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -242,7 +268,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
recent point where the entity could have changed. If the entity is not
known, the stream start is provided instead.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -260,7 +290,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
max_size=10,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)