From d3d228ea2d0a93a404d44b1ce375a36e14a8ab48 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 19 May 2025 16:59:17 -0500 Subject: [PATCH] More cache updates (`LruCache` not finished) --- synapse/handlers/typing.py | 4 +- .../federation/matrix_federation_agent.py | 6 +++ .../http/federation/well_known_resolver.py | 21 ++++---- synapse/http/matrixfederationclient.py | 11 ++-- .../storage/databases/main/account_data.py | 5 +- synapse/storage/databases/main/deviceinbox.py | 10 ++-- synapse/storage/databases/main/devices.py | 20 +++++--- .../storage/databases/main/events_worker.py | 5 +- synapse/storage/databases/main/presence.py | 5 +- synapse/storage/databases/main/push_rule.py | 5 +- synapse/storage/databases/main/receipts.py | 5 +- synapse/storage/databases/main/stream.py | 9 ++-- synapse/util/caches/descriptors.py | 5 +- synapse/util/caches/lrucache.py | 7 ++- synapse/util/caches/stream_change_cache.py | 6 ++- synapse/util/caches/ttlcache.py | 13 +++-- tests/handlers/test_typing.py | 5 +- .../test_matrix_federation_agent.py | 17 +++++-- tests/replication/tcp/streams/test_typing.py | 4 +- .../test_federation_sender_shard.py | 2 + tests/util/caches/test_ttlcache.py | 7 ++- tests/util/test_stream_change_cache.py | 51 ++++++++++++++++--- 22 files changed, 160 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8d693fee30..c083b105ff 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -280,7 +280,9 @@ class TypingWriterHandler(FollowerTypingHandler): # caches which room_ids changed at which serials self._typing_stream_change_cache = StreamChangeCache( - "TypingStreamChangeCache", self._latest_room_serial + name="TypingStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=self._latest_room_serial, ) def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None: diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index a7742fcea8..4ac9d3728a 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -50,6 +50,7 @@ from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.types import ISynapseReactor from synapse.util import Clock +from synapse.util.caches import CacheManager logger = logging.getLogger(__name__) @@ -81,6 +82,8 @@ class MatrixFederationAgent: reactor might have some blocking applied (i.e. for DNS queries), but we need unblocked access to the proxy. + cache_manager: The cache manager to handle metrics + _srv_resolver: SrvResolver implementation to use for looking up SRV records. None to use a default implementation. @@ -92,11 +95,13 @@ class MatrixFederationAgent: def __init__( self, + *, reactor: ISynapseReactor, tls_client_options_factory: Optional[FederationPolicyForHTTPS], user_agent: bytes, ip_allowlist: Optional[IPSet], ip_blocklist: IPSet, + cache_manager: CacheManager, _srv_resolver: Optional[SrvResolver] = None, _well_known_resolver: Optional[WellKnownResolver] = None, ): @@ -139,6 +144,7 @@ class MatrixFederationAgent: ip_blocklist=ip_blocklist, ), user_agent=self.user_agent, + cache_manager=cache_manager, ) self._well_known_resolver = _well_known_resolver diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 9a6bac7281..18dfd2cf09 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -36,6 +36,7 @@ from twisted.web.iweb import IAgent, IResponse from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock, json_decoder +from synapse.util.caches import CacheManager from synapse.util.caches.ttlcache import TTLCache from synapse.util.metrics import Measure @@ -77,10 +78,6 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3 logger = logging.getLogger(__name__) -_well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache("well-known") -_had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache("had-valid-well-known") - - @attr.s(slots=True, frozen=True, auto_attribs=True) class WellKnownLookupResult: delegated_server: Optional[bytes] @@ -94,20 +91,22 @@ class WellKnownResolver: reactor: IReactorTime, agent: IAgent, user_agent: bytes, + cache_manager: CacheManager, well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None, had_well_known_cache: Optional[TTLCache[bytes, bool]] = None, ): self._reactor = reactor self._clock = Clock(reactor) - if well_known_cache is None: - well_known_cache = _well_known_cache + self._well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache( + "well-known", + cache_manager=cache_manager, + ) + self._had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache( + "had-valid-well-known", + cache_manager=cache_manager, + ) - if had_well_known_cache is None: - had_well_known_cache = _had_valid_well_known_cache - - self._well_known_cache = well_known_cache - self._had_valid_well_known_cache = had_well_known_cache self._well_known_agent = RedirectAgent(agent) self.user_agent = user_agent diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index f6d2536957..5e6bdddaa4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -417,11 +417,12 @@ class MatrixFederationHttpClient: if hs.get_instance_name() in outbound_federation_restricted_to: # Talk to federation directly federation_agent: IAgent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_allowlist, - hs.config.server.federation_ip_range_blocklist, + reactor=self.reactor, + tls_client_options_factory=tls_client_options_factory, + user_agent=user_agent.encode("ascii"), + ip_allowlist=hs.config.server.federation_ip_range_allowlist, + ip_blocklist=hs.config.server.federation_ip_range_blocklist, + cache_manager=hs.get_cache_manager(), ) else: proxy_authorization_secret = hs.config.worker.worker_replication_secret diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index e583c182ba..2f571219de 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -63,6 +63,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.cache_manager = hs.get_cache_manager() self._can_write_to_account_data = ( self._instance_name in hs.config.worker.writers.account_data @@ -87,7 +88,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) account_max = self.get_max_account_data_stream_id() self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", account_max + name="AccountDataAndTagsChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=account_max, ) self.db_pool.updates.register_background_index_update( diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 1d4af8d3d4..38267f5e3a 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -114,8 +114,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit=1000, ) self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", - min_device_inbox_id, + name="DeviceInboxStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_device_inbox_id, prefilled_cache=device_inbox_prefill, ) @@ -130,8 +131,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit=1000, ) self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", - min_device_outbox_id, + name="DeviceFederationOutboxStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_device_outbox_id, prefilled_cache=device_outbox_prefill, ) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 84c1d6d374..7680853b67 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -128,8 +128,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): limit=10000, ) self._device_list_stream_cache = StreamChangeCache( - "DeviceListStreamChangeCache", - min_device_list_id, + name="DeviceListStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_device_list_id, prefilled_cache=device_list_prefill, ) @@ -142,8 +143,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): limit=10000, ) self._device_list_room_stream_cache = StreamChangeCache( - "DeviceListRoomStreamChangeCache", - min_device_list_room_id, + name="DeviceListRoomStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_device_list_room_id, prefilled_cache=device_list_room_prefill, ) @@ -159,8 +161,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): limit=1000, ) self._user_signature_stream_cache = StreamChangeCache( - "UserSignatureStreamChangeCache", - user_signature_stream_list_id, + name="UserSignatureStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=user_signature_stream_list_id, prefilled_cache=user_signature_stream_prefill, ) @@ -178,8 +181,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): limit=10000, ) self._device_list_federation_stream_cache = StreamChangeCache( - "DeviceListFederationStreamChangeCache", - device_list_federation_list_id, + name="DeviceListFederationStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=device_list_federation_list_id, prefilled_cache=device_list_federation_prefill, ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 8757dd34d3..38e2640be3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -269,8 +269,9 @@ class EventsWorkerStore(SQLBaseStore): limit=1000, ) self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache( - "_curr_state_delta_stream_cache", - min_curr_state_delta_id, + name="_curr_state_delta_stream_cache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_curr_state_delta_id, prefilled_cache=curr_state_delta_prefill, ) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 065c885603..6e7dbd901a 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -108,8 +108,9 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) max_value=self._presence_id_gen.get_current_token(), ) self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", - min_presence_val, + name="PresenceStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_presence_val, prefilled_cache=presence_cache_prefill, ) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 86c87f78bf..1dab330d52 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -163,8 +163,9 @@ class PushRulesWorkerStore( ) self.push_rules_stream_cache = StreamChangeCache( - "PushRulesStreamChangeCache", - push_rules_id, + name="PushRulesStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=push_rules_id, prefilled_cache=push_rules_prefill, ) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 9964331510..6174559546 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -158,8 +158,9 @@ class ReceiptsWorkerStore(SQLBaseStore): limit=10000, ) self._receipts_stream_cache = StreamChangeCache( - "ReceiptsRoomChangeCache", - min_receipts_stream_id, + name="ReceiptsRoomChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_receipts_stream_id, prefilled_cache=receipts_stream_prefill, ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 3fda49f31f..1995e6b13b 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -617,12 +617,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): max_value=events_max, ) self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", - min_event_val, + name="EventsRoomStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=min_event_val, prefilled_cache=event_cache_prefill, ) self._membership_stream_cache = StreamChangeCache( - "MembershipStreamChangeCache", events_max + name="MembershipStreamChangeCache", + cache_manager=hs.get_cache_manager(), + current_stream_pos=events_max, ) self._stream_order_on_start = self.get_room_max_stream_ordering() diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 4fc75fc5df..29a9586710 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -233,7 +233,6 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): ) -> Callable[..., "defer.Deferred[Any]"]: cache: DeferredCache[CacheKey, Any] = DeferredCache( name=self.name, - cache_manager= max_entries=self.max_entries, tree=self.tree, iterable=self.iterable, @@ -487,6 +486,7 @@ class _CachedFunctionDescriptor: cache_context: bool iterable: bool prune_unread_entries: bool + name: Optional[str] def __call__(self, orig: F) -> CachedFunction[F]: d = DeferredCacheDescriptor( @@ -498,6 +498,7 @@ class _CachedFunctionDescriptor: cache_context=self.cache_context, iterable=self.iterable, prune_unread_entries=self.prune_unread_entries, + name=self.name, ) return cast(CachedFunction[F], d) @@ -511,6 +512,7 @@ def cached( cache_context: bool = False, iterable: bool = False, prune_unread_entries: bool = True, + name: Optional[str] = None, ) -> _CachedFunctionDescriptor: return _CachedFunctionDescriptor( max_entries=max_entries, @@ -520,6 +522,7 @@ def cached( cache_context=cache_context, iterable=iterable, prune_unread_entries=prune_unread_entries, + name=name, ) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 15b067fe2c..0a87a79228 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -51,7 +51,7 @@ from twisted.internet.interfaces import IReactorTime from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.util import Clock, caches -from synapse.util.caches import CacheMetric, EvictionReason, CacheManager +from synapse.util.caches import CacheManager, CacheMetric, EvictionReason from synapse.util.caches.treecache import ( TreeCache, iterate_tree_cache_entry, @@ -119,7 +119,10 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node() @wrap_as_background_process("LruCache._expire_old_entries") async def _expire_old_entries( - hs: HomeServer, clock: Clock, expiry_seconds: float, autotune_config: Optional[dict] + hs: "HomeServer", + clock: Clock, + expiry_seconds: float, + autotune_config: Optional[dict], ) -> None: """Walks the global cache list to find cache entries that haven't been accessed in the given number of seconds, or if a given memory threshold has been breached. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 5ac8643eef..f081972309 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -26,7 +26,7 @@ from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Un import attr from sortedcontainers import SortedDict -from synapse.util import caches +from synapse.util.caches import CacheManager logger = logging.getLogger(__name__) @@ -73,7 +73,9 @@ class StreamChangeCache: def __init__( self, + *, name: str, + cache_manager: CacheManager, current_stream_pos: int, max_size: int = 10000, prefilled_cache: Optional[Mapping[EntityType, int]] = None, @@ -95,7 +97,7 @@ class StreamChangeCache: self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = caches.register_cache( + self.metrics = cache_manager.register_cache( "cache", self.name, self._cache, resize_callback=self.set_cache_factor ) diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index 26a088603a..d3ff9a3bc9 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -26,7 +26,7 @@ from typing import Any, Callable, Dict, Generic, Tuple, TypeVar, Union import attr from sortedcontainers import SortedList -from synapse.util.caches import register_cache +from synapse.util.caches import CacheManager logger = logging.getLogger(__name__) @@ -40,7 +40,12 @@ VT = TypeVar("VT") class TTLCache(Generic[KT, VT]): """A key/value cache implementation where each entry has its own TTL""" - def __init__(self, cache_name: str, timer: Callable[[], float] = time.time): + def __init__( + self, + cache_name: str, + cache_manager: CacheManager, + timer: Callable[[], float] = time.time, + ): # map from key to _CacheEntry self._data: Dict[KT, _CacheEntry[KT, VT]] = {} @@ -49,7 +54,9 @@ class TTLCache(Generic[KT, VT]): self._timer = timer - self._metrics = register_cache("ttl", cache_name, self, resizable=False) + self._metrics = cache_manager.register_cache( + "ttl", cache_name, self, resizable=False + ) def set(self, key: KT, value: VT, ttl: float) -> None: """Add/update an entry in the cache diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 9d8960315f..a7c0125f80 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -37,6 +37,7 @@ from synapse.http.federation.matrix_federation_agent import MatrixFederationAgen from synapse.server import HomeServer from synapse.types import JsonDict, Requester, StreamKeyType, UserID, create_requester from synapse.util import Clock +from synapse.util.caches import CacheManager from tests import unittest from tests.server import ThreadedMemoryReactorClock @@ -86,11 +87,12 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.mock_federation_client = AsyncMock(spec=["put_json"]) self.mock_federation_client.put_json.return_value = (200, "OK") self.mock_federation_client.agent = MatrixFederationAgent( - reactor, + reactor=reactor, tls_client_options_factory=None, user_agent=b"SynapseInTrialTest/0.0.0", ip_allowlist=None, ip_blocklist=IPSet(), + cache_manager=Mock(spec=CacheManager), ) # the tests assume that we are starting at unix time 1000 @@ -103,6 +105,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): keyring=mock_keyring, replication_streams={}, ) + hs._federation_http_client = self.mock_federation_client return hs diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 0fbb4db2f7..d9c568801e 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -21,7 +21,7 @@ import base64 import logging import os from typing import Generator, List, Optional, cast -from unittest.mock import AsyncMock, call, patch +from unittest.mock import AsyncMock, Mock, call, patch import treq from netaddr import IPSet @@ -60,6 +60,7 @@ from synapse.logging.context import ( current_context, ) from synapse.types import ISynapseReactor +from synapse.util.caches import CacheManager from synapse.util.caches.ttlcache import TTLCache from tests import unittest @@ -84,16 +85,23 @@ class MatrixFederationAgentTests(unittest.TestCase): self.tls_factory = FederationPolicyForHTTPS(config) + cache_manager = Mock(spec=CacheManager) + self.well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache( - "test_cache", timer=self.reactor.seconds + "test_cache", + timer=self.reactor.seconds, + cache_manager=cache_manager, ) self.had_well_known_cache: TTLCache[bytes, bool] = TTLCache( - "test_cache", timer=self.reactor.seconds + "test_cache", + timer=self.reactor.seconds, + cache_manager=cache_manager, ) self.well_known_resolver = WellKnownResolver( self.reactor, Agent(self.reactor, contextFactory=self.tls_factory), b"test-agent", + cache_manager=cache_manager, well_known_cache=self.well_known_cache, had_well_known_cache=self.had_well_known_cache, ) @@ -274,6 +282,7 @@ class MatrixFederationAgentTests(unittest.TestCase): user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided. ip_allowlist=IPSet(), ip_blocklist=IPSet(), + cache_manager=Mock(spec=CacheManager), _srv_resolver=self.mock_resolver, _well_known_resolver=self.well_known_resolver, ) @@ -1016,11 +1025,13 @@ class MatrixFederationAgentTests(unittest.TestCase): user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below. ip_allowlist=IPSet(), ip_blocklist=IPSet(), + cache_manager=Mock(spec=CacheManager), _srv_resolver=self.mock_resolver, _well_known_resolver=WellKnownResolver( cast(ISynapseReactor, self.reactor), Agent(self.reactor, contextFactory=tls_factory), b"test-agent", + cache_manager=Mock(spec=CacheManager), well_known_cache=self.well_known_cache, had_well_known_cache=self.had_well_known_cache, ), diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py index b1c2f5b03b..296da49523 100644 --- a/tests/replication/tcp/streams/test_typing.py +++ b/tests/replication/tcp/streams/test_typing.py @@ -139,7 +139,9 @@ class TypingStreamTestCase(BaseStreamTestCase): self.hs.get_replication_command_handler()._streams["typing"].last_token = 0 typing._latest_room_serial = 0 typing._typing_stream_change_cache = StreamChangeCache( - "TypingStreamChangeCache", typing._latest_room_serial + name="TypingStreamChangeCache", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=typing._latest_room_serial, ) typing._reset() diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 58a7a9dc72..2cee35b725 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -42,6 +42,7 @@ from synapse.server import HomeServer from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock +from synapse.util.caches import CacheManager from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.server import get_clock @@ -73,6 +74,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): user_agent=b"SynapseInTrialTest/0.0.0", ip_allowlist=None, ip_blocklist=IPSet(), + cache_manager=Mock(spec=CacheManager), ) def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: diff --git a/tests/util/caches/test_ttlcache.py b/tests/util/caches/test_ttlcache.py index ae73df1841..459c13e6ce 100644 --- a/tests/util/caches/test_ttlcache.py +++ b/tests/util/caches/test_ttlcache.py @@ -20,6 +20,7 @@ from unittest.mock import Mock +from synapse.util.caches import CacheManager from synapse.util.caches.ttlcache import TTLCache from tests import unittest @@ -28,7 +29,11 @@ from tests import unittest class CacheTestCase(unittest.TestCase): def setUp(self) -> None: self.mock_timer = Mock(side_effect=lambda: 100.0) - self.cache: TTLCache[str, str] = TTLCache("test_cache", self.mock_timer) + self.cache: TTLCache[str, str] = TTLCache( + "test_cache", + cache_manager=Mock(spec=CacheManager), + timer=self.mock_timer, + ) def test_get(self) -> None: """simple set/get tests""" diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index 9254bff79b..53451a4399 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -15,7 +15,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): Providing a prefilled cache to StreamChangeCache will result in a cache with the prefilled-cache entered in. """ - cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2}) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + prefilled_cache={"user@foo.com": 2}, + ) self.assertTrue(cache.has_entity_changed("user@foo.com", 1)) def test_has_entity_changed(self) -> None: @@ -23,7 +28,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): StreamChangeCache.entity_has_changed will mark entities as changed, and has_entity_changed will observe the changed entities. """ - cache = StreamChangeCache("#test", 3) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=3, + ) cache.entity_has_changed("user@foo.com", 6) cache.entity_has_changed("bar@baz.net", 7) @@ -61,7 +70,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): StreamChangeCache.entity_has_changed will respect the max size and purge the oldest items upon reaching that max size. """ - cache = StreamChangeCache("#test", 1, max_size=2) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + max_size=2, + ) cache.entity_has_changed("user@foo.com", 2) cache.entity_has_changed("bar@baz.net", 3) @@ -100,7 +114,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): entities since the given position. If the position is before the start of the known stream, it returns None instead. """ - cache = StreamChangeCache("#test", 1) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + ) cache.entity_has_changed("user@foo.com", 2) cache.entity_has_changed("bar@baz.net", 3) @@ -148,7 +166,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): stream position is before it, it will return True, otherwise False if the cache has no entries. """ - cache = StreamChangeCache("#test", 1) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + ) # With no entities, it returns True for the past, present, and False for # the future. @@ -175,7 +197,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): stream position is earlier than the earliest known position, it will return all of the entities queried for. """ - cache = StreamChangeCache("#test", 1) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + ) cache.entity_has_changed("user@foo.com", 2) cache.entity_has_changed("bar@baz.net", 3) @@ -242,7 +268,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): recent point where the entity could have changed. If the entity is not known, the stream start is provided instead. """ - cache = StreamChangeCache("#test", 1) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + ) cache.entity_has_changed("user@foo.com", 2) cache.entity_has_changed("bar@baz.net", 3) @@ -260,7 +290,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase): """ `StreamChangeCache.all_entities_changed(...)` will mark all entites as changed. """ - cache = StreamChangeCache("#test", 1, max_size=10) + cache = StreamChangeCache( + name="#test", + cache_manager=self.hs.get_cache_manager(), + current_stream_pos=1, + max_size=10, + ) cache.entity_has_changed("user@foo.com", 2) cache.entity_has_changed("bar@baz.net", 3)