1
0

Compare commits

..

48 Commits

Author SHA1 Message Date
Andrew Morgan
0ddaae83c3 Merge branch 'release-v1.12.4' of github.com:matrix-org/synapse into anoa/temp_working_cache_config
* 'release-v1.12.4' of github.com:matrix-org/synapse: (123 commits)
  1.12.4
  formatting for the changelog
  1.12.4rc1
  1.12.4rc1
  Do not treat display names as globs for push rules. (#7271)
  Query missing cross-signing keys on local sig upload (#7289)
  Fix changelog file
  Support GET account_data requests on a worker (#7311)
  Revert "Query missing cross-signing keys on local sig upload"
  Always send the user updates to their own device list (#7160)
  Query missing cross-signing keys on local sig upload
  Only register devices edu handler on the master process (#7255)
  tweak changelog
  1.12.3
  Fix the debian build in a better way. (#7212)
  Fix changelog wording
  1.12.2
  Pin Pillow>=4.3.0,<7.1.0 to fix dep issue
  1.12.1
  Note where bugs were introduced
  ...
2020-04-24 13:36:35 +01:00
Patrick Cloke
ce9b62e13f 1.12.4 2020-04-23 10:59:10 -04:00
Richard van der Hoff
ba0aac5e44 formatting for the changelog 2020-04-22 14:50:51 +01:00
Richard van der Hoff
dc8003f921 1.12.4rc1 2020-04-22 14:29:39 +01:00
Richard van der Hoff
83af1079d6 1.12.4rc1 2020-04-22 14:28:23 +01:00
Patrick Cloke
51f358e2fe Do not treat display names as globs for push rules. (#7271) 2020-04-22 13:07:12 +01:00
Andrew Morgan
f89ad3b6df Query missing cross-signing keys on local sig upload (#7289) 2020-04-22 12:29:36 +01:00
Richard van der Hoff
556566f0b8 Fix changelog file
I updated the PR and forgot to update the changelog.
2020-04-21 13:20:16 +01:00
Richard van der Hoff
974c0d726a Support GET account_data requests on a worker (#7311) 2020-04-21 10:46:30 +01:00
Richard van der Hoff
d41c8f6d4d Revert "Query missing cross-signing keys on local sig upload"
This was incorrectly merged to the release branch before it was ready.

This reverts commit 72fe2affb6.
2020-04-20 17:54:35 +01:00
David Baker
40f79f58bf Always send the user updates to their own device list (#7160) 2020-04-20 17:20:38 +01:00
Andrew Morgan
72fe2affb6 Query missing cross-signing keys on local sig upload
Add changelog

Save retrieved keys to the db

lint

Fix and de-brittle remote result dict processing

Use query_user_devices instead, assume only master, self_signing key types

Make changelog more useful

Remove very specific exception handling

Wrap get_verify_key_from_cross_signing_key in a try/except

Note that _get_e2e_cross_signing_verify_key can raise a SynapseError

lint

Add comment explaining why this is useful

Only fetch master and self_signing key types

Fix log statements, docstrings

Remove extraneous items from remote query try/except

lint

Factor key retrieval out into a separate function

Send device updates, modeled after SigningKeyEduUpdater._handle_signing_key_updates

Update method docstring
2020-04-17 15:47:49 +01:00
Andrew Morgan
ac6a84818f Only register devices edu handler on the master process (#7255) 2020-04-14 11:36:24 +01:00
Amber H. Brown
f004ceed71 Merge remote-tracking branch 'origin/develop' into hawkowl/cache-config-without-synctl 2020-02-27 23:18:16 +11:00
Amber H. Brown
965e259426 fixes 2020-02-27 23:18:12 +11:00
Amber H. Brown
0fc0a7deb8 fix 2020-02-27 23:15:45 +11:00
Amber H. Brown
2619891343 fix 2020-02-17 17:50:35 +11:00
Amber H. Brown
5f508e728a add tests for individual cache sizing, and fix up the individual cache sizing logic that got deleted when resizing-on-the-fly did 2020-02-17 17:47:56 +11:00
Amber H. Brown
4aeb6fbe2c Merge remote-tracking branch 'origin/develop' into hawkowl/cache-config-without-synctl 2020-02-17 17:20:10 +11:00
Amber H. Brown
18c1dbfbac don't need this comment 2020-01-22 02:57:15 +11:00
Amber H. Brown
ac020dee7a fix style 2020-01-22 02:38:50 +11:00
Amber H. Brown
2f4dbfa3e1 document as well as refactor so that CacheMetric is not nested 2020-01-22 00:00:10 +11:00
Amber H. Brown
a96b5d9ca7 Load cache factors from the environment, and add a test. 2020-01-21 23:46:08 +11:00
Amber H. Brown
0b069b70a1 make expiring caches resizeable 2020-01-17 02:14:33 +11:00
Amber H. Brown
125c5a01dd Merge branch 'hawkowl/benchmark-lrucache' into hawkowl/cache-config-without-synctl 2020-01-17 01:50:44 +11:00
Amber H. Brown
a21702fe76 cleanup so it can refer to late-config 2020-01-17 01:48:27 +11:00
Amber H. Brown
0a02b2a1c5 cleanup 2020-01-17 01:05:50 +11:00
Amber H. Brown
c76a412cc3 Merge remote-tracking branch 'origin/develop' into hawkowl/cache-config-without-synctl 2020-01-17 01:03:26 +11:00
Amber H. Brown
9735a08f04 newsfile 2019-12-03 20:24:43 +11:00
Amber H. Brown
946650c65a Merge remote-tracking branch 'origin/develop' into hawkowl/benchmark-lrucache 2019-12-03 20:22:40 +11:00
Amber H. Brown
e174b2d19c fix style 2019-12-02 19:49:13 +11:00
Amber H. Brown
f7ec52670b add some LruCache benchmarks 2019-12-02 19:45:20 +11:00
Amber H. Brown
0e368eec58 more fixes 2019-12-02 19:17:19 +11:00
Amber H. Brown
c11c8ad39f more fixes 2019-12-02 18:30:11 +11:00
Amber H. Brown
d70be1871c Merge remote-tracking branch 'origin/develop' into hawkowl/structured-logging-perf 2019-12-02 17:42:26 +11:00
Amber H. Brown
92d4d1342a fixes 2019-12-02 17:42:24 +11:00
Amber H. Brown
8439ce5e53 Merge remote-tracking branch 'origin/develop' into hawkowl/structured-logging-perf 2019-11-26 03:47:24 +11:00
Amber H. Brown
50fcb4a8c5 Re-sync every so often, in case caches appear 2019-11-22 00:05:12 +11:00
Amber H. Brown
a14831d209 Move cache configuration into a homeserver config option, instead of environment variables. 2019-11-21 06:31:37 +11:00
Amber H. Brown
f888515a3c Merge remote-tracking branch 'origin/develop' into hawkowl/structured-logging-perf 2019-11-05 05:06:20 +11:00
Amber H. Brown
c580eb32ba revert this 2019-11-05 00:51:51 +11:00
Amber H. Brown
e55591dc39 Merge remote-tracking branch 'origin/develop' into hawkowl/structured-logging-perf 2019-11-05 00:49:27 +11:00
Amber Brown
babf4eaece Merge branch 'develop' into hawkowl/structured-logging-perf 2019-11-02 01:39:14 +11:00
Amber H. Brown
73cfdebfec fix 2019-10-31 21:15:07 +11:00
Amber H. Brown
f36f3ab990 Merge remote-tracking branch 'origin/develop' into hawkowl/structured-logging-perf 2019-10-31 21:13:52 +11:00
Amber H. Brown
135fdaae0d update linting 2019-10-31 21:01:37 +11:00
Amber Brown
d684ec8a2b benchmarks 2019-10-24 22:09:43 +03:00
Amber Brown
99ab65af2f fix up logging to use rapidjson 2019-10-24 20:59:06 +03:00
48 changed files with 928 additions and 200 deletions

View File

@@ -1,3 +1,27 @@
Synapse 1.12.4 (2020-04-23)
===========================
No significant changes.
Synapse 1.12.4rc1 (2020-04-22)
==============================
Features
--------
- Always send users their own device updates. ([\#7160](https://github.com/matrix-org/synapse/issues/7160))
- Add support for handling GET requests for `account_data` on a worker. ([\#7311](https://github.com/matrix-org/synapse/issues/7311))
Bugfixes
--------
- Fix a bug that prevented cross-signing with users on worker-mode synapses. ([\#7255](https://github.com/matrix-org/synapse/issues/7255))
- Do not treat display names as globs in push rules. ([\#7271](https://github.com/matrix-org/synapse/issues/7271))
- Fix a bug with cross-signing devices belonging to remote users who did not share a room with any user on the local homeserver. ([\#7289](https://github.com/matrix-org/synapse/issues/7289))
Synapse 1.12.3 (2020-04-03)
===========================

1
changelog.d/6391.feature Normal file
View File

@@ -0,0 +1 @@
Synapse's cache factor can now be configured in `homeserver.yaml` by the `caches.global_factor` setting. Additionally, `caches.per_cache_factors` controls the cache factors for individual caches.

1
changelog.d/6446.misc Normal file
View File

@@ -0,0 +1 @@
Add benchmarks for LruCache.

6
debian/changelog vendored
View File

@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.12.4) stable; urgency=medium
* New synapse release 1.12.4.
-- Synapse Packaging team <packages@matrix.org> Thu, 23 Apr 2020 10:58:14 -0400
matrix-synapse-py3 (1.12.3) stable; urgency=medium
[ Richard van der Hoff ]

View File

@@ -268,6 +268,8 @@ Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/account_data/
^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/rooms/[^/]*/account_data/
Additionally, the following REST endpoints can be handled, but all requests must
be routed to the same instance:

View File

@@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.12.3"
__version__ = "1.12.4"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -38,7 +38,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.config.server import is_threepid_reserved
from synapse.events import EventBase
from synapse.types import StateMap, UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure
@@ -74,7 +74,7 @@ class Auth(object):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
self.token_cache = LruCache(10000)
register_cache("cache", "token_cache", self.token_cache)
self._account_validity = hs.config.account_validity

View File

@@ -98,6 +98,10 @@ from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
@@ -475,6 +479,8 @@ class GenericWorkerServer(HomeServer):
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)

View File

@@ -69,7 +69,6 @@ from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import IncorrectDatabaseSetup
from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.module_loader import load_module
@@ -488,8 +487,8 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR
stats["event_cache_size"] = hs.config.event_cache_size
stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size
#
# Performance statistics

112
synapse/config/cache.py Normal file
View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Dict
from ._base import Config, ConfigError
_CACHES = {}
_CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
DEFAULT_CACHE_SIZE_FACTOR = float(os.environ.get(_CACHE_PREFIX, 0.5))
_DEFAULT_CONFIG = """\
# Cache configuration
#
# 'global_factor' controls the global cache factor. This overrides the
# "SYNAPSE_CACHE_FACTOR" environment variable.
#
# 'per_cache_factors' is a dictionary of cache name to cache factor for that
# individual cache.
#
#caches:
# global_factor: 0.5
# per_cache_factors:
# get_users_who_share_room_with_user: 2
#
"""
# Callback to ensure that all caches are the correct size, registered when the
# configuration has been loaded.
_ENSURE_CORRECT_CACHE_SIZING = None
def add_resizable_cache(cache_name, cache_resize_callback):
_CACHES[cache_name.lower()] = cache_resize_callback
if _ENSURE_CORRECT_CACHE_SIZING:
_ENSURE_CORRECT_CACHE_SIZING()
class CacheConfig(Config):
section = "caches"
_environ = os.environ
@staticmethod
def _reset():
global DEFAULT_CACHE_SIZE_FACTOR
global _ENSURE_CORRECT_CACHE_SIZING
DEFAULT_CACHE_SIZE_FACTOR = float(os.environ.get(_CACHE_PREFIX, 0.5))
_ENSURE_CORRECT_CACHE_SIZING = None
_CACHES.clear()
def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))
global DEFAULT_CACHE_SIZE_FACTOR
global _ENSURE_CORRECT_CACHE_SIZING
cache_config = config.get("caches", {})
self.global_factor = cache_config.get(
"global_factor", DEFAULT_CACHE_SIZE_FACTOR
)
if not isinstance(self.global_factor, (int, float)):
raise ConfigError("caches.global_factor must be a number.")
# Set the global one so that it's reflected in new caches
DEFAULT_CACHE_SIZE_FACTOR = self.global_factor
# Load cache factors from the environment, but override them with the
# ones in the config file if they exist
individual_factors = {
key[len(_CACHE_PREFIX) + 1 :].lower(): float(val)
for key, val in self._environ.items()
if key.startswith(_CACHE_PREFIX + "_")
}
individual_factors_config = cache_config.get("per_cache_factors", {}) or {}
if not isinstance(individual_factors_config, dict):
raise ConfigError("caches.per_cache_factors must be a dictionary")
individual_factors.update(individual_factors_config)
self.cache_factors = dict() # type: Dict[str, float]
for cache, factor in individual_factors.items():
if not isinstance(factor, (int, float)):
raise ConfigError(
"caches.per_cache_factors.%s must be a number" % (cache.lower(),)
)
self.cache_factors[cache.lower()] = factor
# Register the global callback so that the individual cache sizes get set.
def ensure_cache_sizes():
for cache_name, callback in _CACHES.items():
new_factor = self.cache_factors.get(cache_name, self.global_factor)
callback(new_factor)
_ENSURE_CORRECT_CACHE_SIZING = ensure_cache_sizes
_ENSURE_CORRECT_CACHE_SIZING()

View File

@@ -57,8 +57,6 @@ class DatabaseConfig(Config):
section = "database"
def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))
# We *experimentally* support specifying multiple databases via the
# `databases` key. This is a map from a label to database config in the
# same format as the `database` config option, plus an extra

View File

@@ -17,6 +17,7 @@
from ._base import RootConfig
from .api import ApiConfig
from .appservice import AppServiceConfig
from .cache import CacheConfig
from .captcha import CaptchaConfig
from .cas import CasConfig
from .consent_config import ConsentConfig
@@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig,
ThirdPartyRulesConfig,
TracerConfig,
CacheConfig,
]

View File

@@ -399,20 +399,30 @@ class TransportLayerClient(object):
{
"device_keys": {
"<user_id>": ["<device_id>"]
} }
}
}
Response:
{
"device_keys": {
"<user_id>": {
"<device_id>": {...}
} } }
}
},
"master_key": {
"<user_id>": {...}
}
},
"self_signing_key": {
"<user_id>": {...}
}
}
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the device keys.
A dict containing device and cross-signing keys.
"""
path = _create_v1_path("/user/keys/query")
@@ -429,14 +439,30 @@ class TransportLayerClient(object):
Response:
{
"stream_id": "...",
"devices": [ { ... } ]
"devices": [ { ... } ],
"master_key": {
"user_id": "<user_id>",
"usage": [...],
"keys": {...},
"signatures": {
"<user_id>": {...}
}
},
"self_signing_key": {
"user_id": "<user_id>",
"usage": [...],
"keys": {...},
"signatures": {
"<user_id>": {...}
}
}
}
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the device keys.
A dict containing device and cross-signing keys.
"""
path = _create_v1_path("/user/devices/%s", user_id)
@@ -454,8 +480,10 @@ class TransportLayerClient(object):
{
"one_time_keys": {
"<user_id>": {
"<device_id>": "<algorithm>"
} } }
"<device_id>": "<algorithm>"
}
}
}
Response:
{
@@ -463,13 +491,16 @@ class TransportLayerClient(object):
"<user_id>": {
"<device_id>": {
"<algorithm>:<key_id>": "<key_base64>"
} } } }
}
}
}
}
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the one-time keys.
A dict containing the one-time keys.
"""
path = _create_v1_path("/user/keys/claim")

View File

@@ -125,8 +125,14 @@ class DeviceWorkerHandler(BaseHandler):
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
tracked_users = set(users_who_share_room)
# Always tell the user about their own devices
tracked_users.add(user_id)
changed = yield self.store.get_users_whose_devices_changed(
from_token.device_list_key, users_who_share_room
from_token.device_list_key, tracked_users
)
# Then work out if any users have since joined
@@ -456,7 +462,11 @@ class DeviceHandler(DeviceWorkerHandler):
room_ids = yield self.store.get_rooms_for_user(user_id)
yield self.notifier.on_new_event("device_list_key", position, rooms=room_ids)
# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
yield self.notifier.on_new_event(
"device_list_key", position, users=[user_id], rooms=room_ids
)
if hosts:
logger.info(

View File

@@ -54,19 +54,23 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
federation_registry = hs.get_federation_registry()
self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
else:
# Only register this edu handler on master as it requires writing
# device updates to the db
#
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
@@ -170,8 +174,8 @@ class E2eKeysHandler(object):
"""This is called when we are querying the device list of a user on
a remote homeserver and their device list is not in the device list
cache. If we share a room with this user and we're not querying for
specific user we will update the cache
with their device list."""
specific user we will update the cache with their device list.
"""
destination_query = remote_queries_not_in_cache[destination]
@@ -957,13 +961,19 @@ class E2eKeysHandler(object):
return signature_list, failures
@defer.inlineCallbacks
def _get_e2e_cross_signing_verify_key(self, user_id, key_type, from_user_id=None):
"""Fetch the cross-signing public key from storage and interpret it.
def _get_e2e_cross_signing_verify_key(
self, user_id: str, key_type: str, from_user_id: str = None
):
"""Fetch locally or remotely query for a cross-signing public key.
First, attempt to fetch the cross-signing public key from storage.
If that fails, query the keys from the homeserver they belong to
and update our local copy.
Args:
user_id (str): the user whose key should be fetched
key_type (str): the type of key to fetch
from_user_id (str): the user that we are fetching the keys for.
user_id: the user whose key should be fetched
key_type: the type of key to fetch
from_user_id: the user that we are fetching the keys for.
This affects what signatures are fetched.
Returns:
@@ -972,16 +982,140 @@ class E2eKeysHandler(object):
Raises:
NotFoundError: if the key is not found
SynapseError: if `user_id` is invalid
"""
user = UserID.from_string(user_id)
key = yield self.store.get_e2e_cross_signing_key(
user_id, key_type, from_user_id
)
if key is None:
logger.debug("no %s key found for %s", key_type, user_id)
if key:
# We found a copy of this key in our database. Decode and return it
key_id, verify_key = get_verify_key_from_cross_signing_key(key)
return key, key_id, verify_key
# If we couldn't find the key locally, and we're looking for keys of
# another user then attempt to fetch the missing key from the remote
# user's server.
#
# We may run into this in possible edge cases where a user tries to
# cross-sign a remote user, but does not share any rooms with them yet.
# Thus, we would not have their key list yet. We instead fetch the key,
# store it and notify clients of new, associated device IDs.
if self.is_mine(user) or key_type not in ["master", "self_signing"]:
# Note that master and self_signing keys are the only cross-signing keys we
# can request over federation
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
key_id, verify_key = get_verify_key_from_cross_signing_key(key)
(
key,
key_id,
verify_key,
) = yield self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
if key is None:
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
return key, key_id, verify_key
@defer.inlineCallbacks
def _retrieve_cross_signing_keys_for_remote_user(
self, user: UserID, desired_key_type: str,
):
"""Queries cross-signing keys for a remote user and saves them to the database
Only the key specified by `key_type` will be returned, while all retrieved keys
will be saved regardless
Args:
user: The user to query remote keys for
desired_key_type: The type of key to receive. One of "master", "self_signing"
Returns:
Deferred[Tuple[Optional[Dict], Optional[str], Optional[VerifyKey]]]: A tuple
of the retrieved key content, the key's ID and the matching VerifyKey.
If the key cannot be retrieved, all values in the tuple will instead be None.
"""
try:
remote_result = yield self.federation.query_user_devices(
user.domain, user.to_string()
)
except Exception as e:
logger.warning(
"Unable to query %s for cross-signing keys of user %s: %s %s",
user.domain,
user.to_string(),
type(e),
e,
)
return None, None, None
# Process each of the retrieved cross-signing keys
desired_key = None
desired_key_id = None
desired_verify_key = None
retrieved_device_ids = []
for key_type in ["master", "self_signing"]:
key_content = remote_result.get(key_type + "_key")
if not key_content:
continue
# Ensure these keys belong to the correct user
if "user_id" not in key_content:
logger.warning(
"Invalid %s key retrieved, missing user_id field: %s",
key_type,
key_content,
)
continue
if user.to_string() != key_content["user_id"]:
logger.warning(
"Found %s key of user %s when querying for keys of user %s",
key_type,
key_content["user_id"],
user.to_string(),
)
continue
# Validate the key contents
try:
# verify_key is a VerifyKey from signedjson, which uses
# .version to denote the portion of the key ID after the
# algorithm and colon, which is the device ID
key_id, verify_key = get_verify_key_from_cross_signing_key(key_content)
except ValueError as e:
logger.warning(
"Invalid %s key retrieved: %s - %s %s",
key_type,
key_content,
type(e),
e,
)
continue
# Note down the device ID attached to this key
retrieved_device_ids.append(verify_key.version)
# If this is the desired key type, save it and its ID/VerifyKey
if key_type == desired_key_type:
desired_key = key_content
desired_verify_key = verify_key
desired_key_id = key_id
# At the same time, store this key in the db for subsequent queries
yield self.store.set_e2e_cross_signing_key(
user.to_string(), key_type, key_content
)
# Notify clients that new devices for this user have been discovered
if retrieved_device_ids:
# XXX is this necessary?
yield self.device_handler.notify_device_update(
user.to_string(), retrieved_device_ids
)
return desired_key, desired_key_id, desired_verify_key
def _check_cross_signing_key(key, user_id, key_type, signing_key=None):
"""Check a cross-signing key uploaded by a user. Performs some basic sanity

View File

@@ -1143,9 +1143,14 @@ class SyncHandler(object):
user_id
)
tracked_users = set(users_who_share_room)
# Always tell the user about their own devices
tracked_users.add(user_id)
# Step 1a, check for changes in devices of users we share a room with
users_that_have_changed = await self.store.get_users_whose_devices_changed(
since_token.device_list_key, users_who_share_room
since_token.device_list_key, tracked_users
)
# Step 1b, check for newly joined rooms

View File

@@ -49,7 +49,6 @@ from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
logger = logging.getLogger(__name__)
@@ -241,7 +240,8 @@ class SimpleHttpClient(object):
# tends to do so in batches, so we need to allow the pool to keep
# lots of idle connections around.
pool = HTTPConnectionPool(self.reactor)
pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
# XXX: Why does this use the cache factor????
pool.maxPersistentPerHost = max((100 * hs.config.caches.global_factor, 5))
pool.cachedConnectionTimeout = 2 * 60
self.agent = ProxyAgent(

View File

@@ -33,6 +33,8 @@ from prometheus_client import REGISTRY
from twisted.web.resource import Resource
from synapse.util import caches
try:
from prometheus_client.samples import Sample
except ImportError:
@@ -103,13 +105,15 @@ def nameify_sample(sample):
def generate_latest(registry, emit_help=False):
# Trigger the cache metrics to be rescraped, which updates the common
# metrics but do not produce metrics themselves
for collector in caches.collectors_by_name.values():
collector.collect()
output = []
for metric in registry.collect():
if metric.name.startswith("__unused"):
continue
if not metric.samples:
# No samples, don't bother.
continue

View File

@@ -51,6 +51,7 @@ push_rules_delta_state_cache_metric = register_cache(
"cache",
"push_rules_delta_state_cache_metric",
cache=[], # Meaningless size, as this isn't a cache that stores values
resizable=False,
)
@@ -67,7 +68,8 @@ class BulkPushRuleEvaluator(object):
self.room_push_rule_cache_metrics = register_cache(
"cache",
"room_push_rule_cache",
cache=[], # Meaningless size, as this isn't a cache that stores values
cache=[], # Meaningless size, as this isn't a cache that stores values,
resizable=False,
)
@defer.inlineCallbacks

View File

@@ -16,11 +16,13 @@
import logging
import re
from typing import Pattern
from six import string_types
from synapse.events import EventBase
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -56,18 +58,18 @@ def _test_ineq_condition(condition, number):
rhs = m.group(2)
if not rhs.isdigit():
return False
rhs = int(rhs)
rhs_int = int(rhs)
if ineq == "" or ineq == "==":
return number == rhs
return number == rhs_int
elif ineq == "<":
return number < rhs
return number < rhs_int
elif ineq == ">":
return number > rhs
return number > rhs_int
elif ineq == ">=":
return number >= rhs
return number >= rhs_int
elif ineq == "<=":
return number <= rhs
return number <= rhs_int
else:
return False
@@ -83,7 +85,13 @@ def tweaks_for_actions(actions):
class PushRuleEvaluatorForEvent(object):
def __init__(self, event, room_member_count, sender_power_level, power_levels):
def __init__(
self,
event: EventBase,
room_member_count: int,
sender_power_level: int,
power_levels: dict,
):
self._event = event
self._room_member_count = room_member_count
self._sender_power_level = sender_power_level
@@ -92,7 +100,7 @@ class PushRuleEvaluatorForEvent(object):
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
self._value_cache = _flatten_dict(event)
def matches(self, condition, user_id, display_name):
def matches(self, condition: dict, user_id: str, display_name: str) -> bool:
if condition["kind"] == "event_match":
return self._event_match(condition, user_id)
elif condition["kind"] == "contains_display_name":
@@ -106,7 +114,7 @@ class PushRuleEvaluatorForEvent(object):
else:
return True
def _event_match(self, condition, user_id):
def _event_match(self, condition: dict, user_id: str) -> bool:
pattern = condition.get("pattern", None)
if not pattern:
@@ -134,7 +142,7 @@ class PushRuleEvaluatorForEvent(object):
return _glob_matches(pattern, haystack)
def _contains_display_name(self, display_name):
def _contains_display_name(self, display_name: str) -> bool:
if not display_name:
return False
@@ -142,51 +150,52 @@ class PushRuleEvaluatorForEvent(object):
if not body:
return False
return _glob_matches(display_name, body, word_boundary=True)
# Similar to _glob_matches, but do not treat display_name as a glob.
r = regex_cache.get((display_name, False, True), None)
if not r:
r = re.escape(display_name)
r = _re_word_boundary(r)
r = re.compile(r, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r
def _get_value(self, dotted_key):
return r.search(body)
def _get_value(self, dotted_key: str) -> str:
return self._value_cache.get(dotted_key, None)
# Caches (glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000)
register_cache("cache", "regex_push_cache", regex_cache)
def _glob_matches(glob, value, word_boundary=False):
def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
"""Tests if value matches glob.
Args:
glob (string)
value (string): String to test against glob.
word_boundary (bool): Whether to match against word boundaries or entire
glob
value: String to test against glob.
word_boundary: Whether to match against word boundaries or entire
string. Defaults to False.
Returns:
bool
"""
try:
r = regex_cache.get((glob, word_boundary), None)
r = regex_cache.get((glob, True, word_boundary), None)
if not r:
r = _glob_to_re(glob, word_boundary)
regex_cache[(glob, word_boundary)] = r
regex_cache[(glob, True, word_boundary)] = r
return r.search(value)
except re.error:
logger.warning("Failed to parse glob to regex: %r", glob)
return False
def _glob_to_re(glob, word_boundary):
def _glob_to_re(glob: str, word_boundary: bool) -> Pattern:
"""Generates regex for a given glob.
Args:
glob (string)
word_boundary (bool): Whether to match against word boundaries or entire
string. Defaults to False.
Returns:
regex object
glob
word_boundary: Whether to match against word boundaries or entire string.
"""
if IS_GLOB.search(glob):
r = re.escape(glob)
@@ -219,7 +228,7 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
def _re_word_boundary(r):
def _re_word_boundary(r: str) -> str:
"""
Adds word boundary characters to the start and end of an
expression to require that the match occur as a whole word,

View File

@@ -15,7 +15,6 @@
from synapse.storage.data_stores.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.storage.database import Database
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache
from ._base import BaseSlavedStore
@@ -26,7 +25,7 @@ class SlavedClientIpStore(BaseSlavedStore):
super(SlavedClientIpStore, self).__init__(database, db_conn, hs)
self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
name="client_ip_last_seen", keylen=4, max_entries=50000
)
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):

View File

@@ -38,8 +38,12 @@ class AccountDataServlet(RestServlet):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self._is_worker = hs.config.worker_app is not None
async def on_PUT(self, request, user_id, account_data_type):
if self._is_worker:
raise Exception("Cannot handle PUT /account_data on worker")
requester = await self.auth.get_user_by_req(request)
if user_id != requester.user.to_string():
raise AuthError(403, "Cannot add account data for other users.")
@@ -86,8 +90,12 @@ class RoomAccountDataServlet(RestServlet):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self._is_worker = hs.config.worker_app is not None
async def on_PUT(self, request, user_id, room_id, account_data_type):
if self._is_worker:
raise Exception("Cannot handle PUT /account_data on worker")
requester = await self.auth.get_user_by_req(request)
if user_id != requester.user.to_string():
raise AuthError(403, "Cannot add account data for other users.")

View File

@@ -35,7 +35,6 @@ from synapse.state import v1, v2
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import StateMap
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
@@ -53,7 +52,6 @@ state_groups_histogram = Histogram(
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
SIZE_OF_CACHE = 100000 * get_cache_factor_for("state_cache")
EVICTION_TIMEOUT_SECONDS = 60 * 60
@@ -447,7 +445,7 @@ class StateResolutionHandler(object):
self._state_cache = ExpiringCache(
cache_name="state_cache",
clock=self.clock,
max_len=SIZE_OF_CACHE,
max_len=100000,
expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
iterable=True,
reset_expiry_on_get=True,

View File

@@ -22,7 +22,6 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import Database
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache
logger = logging.getLogger(__name__)
@@ -367,7 +366,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
name="client_ip_last_seen", keylen=4, max_entries=50000
)
super(ClientIpStore, self).__init__(database, db_conn, hs)

View File

@@ -75,7 +75,7 @@ class EventsWorkerStore(SQLBaseStore):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
self._get_event_cache = Cache(
"*getEvent*", keylen=3, max_entries=hs.config.event_cache_size
"*getEvent*", keylen=3, max_entries=hs.config.caches.event_cache_size
)
self._event_fetch_lock = threading.Condition()

View File

@@ -28,7 +28,6 @@ from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateSt
from synapse.storage.database import Database
from synapse.storage.state import StateFilter
from synapse.types import StateMap
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.descriptors import cached
from synapse.util.caches.dictionary_cache import DictionaryCache
@@ -90,11 +89,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
self._state_group_cache = DictionaryCache(
"*stateGroupCache*",
# TODO: this hasn't been tuned yet
50000 * get_cache_factor_for("stateGroupCache"),
50000,
)
self._state_group_members_cache = DictionaryCache(
"*stateGroupMembersCache*",
500000 * get_cache_factor_for("stateGroupMembersCache"),
"*stateGroupMembersCache*", 500000,
)
@cached(max_entries=10000, iterable=True)

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019, 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,28 +15,18 @@
# limitations under the License.
import logging
import os
from typing import Dict
from typing import Callable, Dict, Optional
import six
from six.moves import intern
from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
import attr
from prometheus_client.core import Gauge
from synapse.config.cache import add_resizable_cache
logger = logging.getLogger(__name__)
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
def get_cache_factor_for(cache_name):
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
factor = os.environ.get(env_var)
if factor:
return float(factor)
return CACHE_SIZE_FACTOR
caches_by_name = {}
collectors_by_name = {} # type: Dict
@@ -44,6 +34,7 @@ cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
@@ -53,67 +44,82 @@ response_cache_evicted = Gauge(
response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
def register_cache(cache_type, cache_name, cache, collect_callback=None):
"""Register a cache object for metric collection.
@attr.s
class CacheMetric(object):
_cache = attr.ib()
_cache_type = attr.ib(type=str)
_cache_name = attr.ib(type=str)
_collect_callback = attr.ib(type=Optional[Callable])
hits = attr.ib(default=0)
misses = attr.ib(default=0)
evicted_size = attr.ib(default=0)
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def describe(self):
return []
def collect(self):
try:
if self._cache_type == "response_cache":
response_cache_size.labels(self._cache_name).set(len(self._cache))
response_cache_hits.labels(self._cache_name).set(self.hits)
response_cache_evicted.labels(self._cache_name).set(self.evicted_size)
response_cache_total.labels(self._cache_name).set(
self.hits + self.misses
)
else:
cache_size.labels(self._cache_name).set(len(self._cache))
cache_hits.labels(self._cache_name).set(self.hits)
cache_evicted.labels(self._cache_name).set(self.evicted_size)
cache_total.labels(self._cache_name).set(self.hits + self.misses)
if getattr(self._cache, "max_size", None):
cache_max_size.labels(self._cache_name).set(self._cache.max_size)
if self._collect_callback:
self._collect_callback()
except Exception as e:
logger.warning("Error calculating metrics for %s: %s", self._cache_name, e)
raise
def register_cache(
cache_type: str,
cache_name: str,
cache,
collect_callback: Optional[Callable] = None,
resizable: bool = True,
resize_callback: Optional[Callable] = None,
) -> CacheMetric:
"""Register a cache object for metric collection and resizing.
Args:
cache_type (str):
cache_name (str): name of the cache
cache (object): cache itself
collect_callback (callable|None): if not None, a function which is called during
metric collection to update additional metrics.
cache_type
cache_name: name of the cache
cache: cache itself
collect_callback: If given, a function which is called during metric
collection to update additional metrics.
resizable: Whether this cache supports being resized.
resize_callback: A function which can be called to resize the cache.
Returns:
CacheMetric: an object which provides inc_{hits,misses,evictions} methods
"""
if resizable:
if not resize_callback:
resize_callback = getattr(cache, "set_cache_factor")
add_resizable_cache(cache_name, resize_callback)
# Check if the metric is already registered. Unregister it, if so.
# This usually happens during tests, as at runtime these caches are
# effectively singletons.
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
metric_name = "cache_%s_%s" % (cache_type, cache_name)
if metric_name in collectors_by_name.keys():
REGISTRY.unregister(collectors_by_name[metric_name])
class CacheMetric(object):
hits = 0
misses = 0
evicted_size = 0
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def describe(self):
return []
def collect(self):
try:
if cache_type == "response_cache":
response_cache_size.labels(cache_name).set(len(cache))
response_cache_hits.labels(cache_name).set(self.hits)
response_cache_evicted.labels(cache_name).set(self.evicted_size)
response_cache_total.labels(cache_name).set(self.hits + self.misses)
else:
cache_size.labels(cache_name).set(len(cache))
cache_hits.labels(cache_name).set(self.hits)
cache_evicted.labels(cache_name).set(self.evicted_size)
cache_total.labels(cache_name).set(self.hits + self.misses)
if collect_callback:
collect_callback()
except Exception as e:
logger.warning("Error calculating metrics for %s: %s", cache_name, e)
raise
yield GaugeMetricFamily("__unused", "")
metric = CacheMetric()
REGISTRY.register(metric)
caches_by_name[cache_name] = cache
collectors_by_name[metric_name] = metric
return metric

View File

@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import inspect
import logging
@@ -30,7 +31,6 @@ from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -81,7 +81,6 @@ class CacheEntry(object):
class Cache(object):
__slots__ = (
"cache",
"max_entries",
"name",
"keylen",
"thread",
@@ -111,6 +110,10 @@ class Cache(object):
collect_callback=self._metrics_collection_callback,
)
@property
def max_entries(self):
return self.cache.max_size
def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)
@@ -370,13 +373,11 @@ class CacheDescriptor(_CacheDescriptorBase):
cache_context=cache_context,
)
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree
self.iterable = iterable
def __get__(self, obj, objtype=None):
def __get__(self, obj, owner):
cache = Cache(
name=self.orig.__name__,
max_entries=self.max_entries,

View File

@@ -18,6 +18,7 @@ from collections import OrderedDict
from six import iteritems, itervalues
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@@ -55,11 +56,12 @@ class ExpiringCache(object):
"""
self._cache_name = cache_name
self.max_size = int(max_len * cache_config.DEFAULT_CACHE_SIZE_FACTOR)
self._original_max_size = max_len
self._clock = clock
self._max_len = max_len
self._expiry_ms = expiry_ms
self._reset_expiry_on_get = reset_expiry_on_get
self._cache = OrderedDict()
@@ -82,9 +84,11 @@ class ExpiringCache(object):
def __setitem__(self, key, value):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
self.evict()
def evict(self):
# Evict if there are now too many items
while self._max_len and len(self) > self._max_len:
while self.max_size and len(self) > self.max_size:
_key, value = self._cache.popitem(last=False)
if self.iterable:
self.metrics.inc_evictions(len(value.value))
@@ -170,6 +174,23 @@ class ExpiringCache(object):
else:
return len(self._cache)
def set_cache_factor(self, factor: float) -> bool:
"""
Set the cache factor for this individual cache.
This will trigger a resize if it changes, which may require evicting
items from the cache.
Returns:
bool: Whether the cache changed size or not.
"""
new_size = int(self._original_max_size * factor)
if new_size != self.max_size:
self.max_size = new_size
self.evict()
return True
return False
class _CacheEntry(object):
__slots__ = ["time", "value"]

View File

@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from functools import wraps
@@ -76,6 +75,11 @@ class LruCache(object):
"""
cache = cache_type()
self.cache = cache # Used for introspection.
# Save the original max size, and apply the default size factor.
self._original_max_size = max_size
self.max_size = int(max_size)
list_root = _Node(None, None, None, None)
list_root.next_node = list_root
list_root.prev_node = list_root
@@ -83,7 +87,7 @@ class LruCache(object):
lock = threading.Lock()
def evict():
while cache_len() > max_size:
while cache_len() > self.max_size:
todelete = list_root.prev_node
evicted_len = delete_node(todelete)
cache.pop(todelete.key, None)
@@ -236,6 +240,7 @@ class LruCache(object):
return key in cache
self.sentinel = object()
self._on_resize = evict
self.get = cache_get
self.set = cache_set
self.setdefault = cache_set_default
@@ -266,3 +271,20 @@ class LruCache(object):
def __contains__(self, key):
return self.contains(key)
def set_cache_factor(self, factor: float) -> bool:
"""
Set the cache factor for this individual cache.
This will trigger a resize if it changes, which may require evicting
items from the cache.
Returns:
bool: Whether the cache changed size or not.
"""
new_size = int(self._original_max_size * factor)
if new_size != self.max_size:
self.max_size = new_size
self._on_resize()
return True
return False

View File

@@ -38,7 +38,7 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000.0
self._name = name
self._metrics = register_cache("response_cache", name, self)
self._metrics = register_cache("response_cache", name, self, resizable=False)
def size(self):
return len(self.pending_result_cache)

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import logging
import math
from six import integer_types
@@ -35,17 +36,37 @@ class StreamChangeCache(object):
"""
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._original_max_size = max_size
self.max_size = math.floor(max_size)
self._entity_to_key = {}
self._cache = SortedDict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = caches.register_cache("cache", self.name, self._cache)
self.metrics = caches.register_cache(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
)
if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
def set_cache_factor(self, factor: float) -> bool:
"""
Set the cache factor for this individual cache.
This will trigger a resize if it changes, which may require evicting
items from the cache.
Returns:
bool: Whether the cache changed size or not.
"""
new_size = math.floor(self._original_max_size * factor)
if new_size != self.max_size:
self.max_size = new_size
self._evict()
return True
return False
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
@@ -133,13 +154,13 @@ class StreamChangeCache(object):
self._cache.pop(old_pos, None)
self._cache[stream_pos] = entity
self._entity_to_key[entity] = stream_pos
self._evict()
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos
)
self._entity_to_key.pop(r, None)
def _evict(self):
while len(self._cache) > self.max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
self._entity_to_key.pop(r, None)
def get_max_pos_of_last_change(self, entity):
"""Returns an upper bound of the stream id of the last change to an

View File

@@ -38,7 +38,7 @@ class TTLCache(object):
self._timer = timer
self._metrics = register_cache("ttl", cache_name, self)
self._metrics = register_cache("ttl", cache_name, self, resizable=False)
def set(self, key, value, ttl):
"""Add/update an entry in the cache

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import sys
from argparse import REMAINDER
from contextlib import redirect_stderr
from io import StringIO
@@ -21,7 +22,7 @@ import pyperf
from synmark import make_reactor
from synmark.suites import SUITES
from twisted.internet.defer import ensureDeferred
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.logger import globalLogBeginner, textFileLogObserver
from twisted.python.failure import Failure
@@ -40,7 +41,8 @@ def make_test(main):
file_out = StringIO()
with redirect_stderr(file_out):
d = ensureDeferred(main(reactor, loops))
d = Deferred()
d.addCallback(lambda _: ensureDeferred(main(reactor, loops)))
def on_done(_):
if isinstance(_, Failure):
@@ -50,6 +52,7 @@ def make_test(main):
return _
d.addBoth(on_done)
reactor.callWhenRunning(lambda: d.callback(True))
reactor.run()
return d.result
@@ -62,11 +65,13 @@ if __name__ == "__main__":
def add_cmdline_args(cmd, args):
if args.log:
cmd.extend(["--log"])
cmd.extend(args.tests)
runner = pyperf.Runner(
processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args
processes=3, min_time=1.5, show_name=True, add_cmdline_args=add_cmdline_args
)
runner.argparser.add_argument("--log", action="store_true")
runner.argparser.add_argument("tests", nargs=REMAINDER)
runner.parse_args()
orig_loops = runner.args.loops
@@ -79,6 +84,11 @@ if __name__ == "__main__":
)
setupdb()
if runner.args.tests:
SUITES = list(
filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES)
)
for suite, loops in SUITES:
if loops:
runner.args.loops = loops

View File

@@ -1,3 +1,9 @@
from . import logging
from . import logging, lrucache, lrucache_evict
SUITES = [(logging, 1000), (logging, 10000), (logging, None)]
SUITES = [
(logging, 1000),
(logging, 10000),
(logging, None),
(lrucache, None),
(lrucache_evict, None),
]

View File

@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyperf import perf_counter
from synapse.util.caches.lrucache import LruCache
async def main(reactor, loops):
"""
Benchmark `loops` number of insertions into LruCache without eviction.
"""
cache = LruCache(loops)
start = perf_counter()
for i in range(loops):
cache[i] = True
end = perf_counter() - start
return end

View File

@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyperf import perf_counter
from synapse.util.caches.lrucache import LruCache
async def main(reactor, loops):
"""
Benchmark `loops` number of insertions into LruCache where half of them are
evicted.
"""
cache = LruCache(loops // 2)
start = perf_counter()
for i in range(loops):
cache[i] = True
end = perf_counter() - start
return end

125
tests/config/test_cache.py Normal file
View File

@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
# Copyright 2020 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config, RootConfig
from synapse.config.cache import CacheConfig, add_resizable_cache
from synapse.util.caches.lrucache import LruCache
from tests.unittest import TestCase
class FakeServer(Config):
section = "server"
class TestConfig(RootConfig):
config_classes = [FakeServer, CacheConfig]
class CacheConfigTests(TestCase):
def setUp(self):
CacheConfig._reset()
def test_individual_caches_from_environ(self):
"""
Individual cache factors will be loaded from the environment.
"""
config = {}
t = TestConfig()
t.caches._environ = {
"SYNAPSE_CACHE_FACTOR_SOMETHING_OR_OTHER": "2",
"SYNAPSE_NOT_CACHE": "BLAH",
}
t.read_config(config, config_dir_path="", data_dir_path="")
self.assertEqual(dict(t.caches.cache_factors), {"something_or_other": 2.0})
def test_config_overrides_environ(self):
"""
Individual cache factors defined in config will take precedence over
ones in the environment.
"""
config = {"caches": {"per_cache_factors": {"foo": 2, "bar": 3}}}
t = TestConfig()
t.caches._environ = {
"SYNAPSE_CACHE_FACTOR_SOMETHING_OR_OTHER": "2",
"SYNAPSE_CACHE_FACTOR_FOO": 1,
}
t.read_config(config, config_dir_path="", data_dir_path="")
self.assertEqual(
dict(t.caches.cache_factors),
{"foo": 2.0, "bar": 3.0, "something_or_other": 2.0},
)
def test_individual_instantiated_before_config_load(self):
"""
If a cache is instantiated before the config is read, it will be given
the default cache size in the interim, and then resized once the config
is loaded.
"""
cache = LruCache(100)
add_resizable_cache("foo", cache.set_cache_factor)
self.assertEqual(cache.max_size, 50)
config = {"caches": {"per_cache_factors": {"foo": 3}}}
t = TestConfig()
t.read_config(config, config_dir_path="", data_dir_path="")
self.assertEqual(cache.max_size, 300)
def test_individual_instantiated_after_config_load(self):
"""
If a cache is instantiated after the config is read, it will be
immediately resized to the correct size given the per_cache_factor if
there is one.
"""
config = {"caches": {"per_cache_factors": {"foo": 2}}}
t = TestConfig()
t.read_config(config, config_dir_path="", data_dir_path="")
cache = LruCache(100)
add_resizable_cache("foo", cache.set_cache_factor)
self.assertEqual(cache.max_size, 200)
def test_global_instantiated_before_config_load(self):
"""
If a cache is instantiated before the config is read, it will be given
the default cache size in the interim, and then resized to the new
default cache size once the config is loaded.
"""
cache = LruCache(100)
add_resizable_cache("foo", cache.set_cache_factor)
self.assertEqual(cache.max_size, 50)
config = {"caches": {"global_factor": 4}}
t = TestConfig()
t.read_config(config, config_dir_path="", data_dir_path="")
self.assertEqual(cache.max_size, 400)
def test_global_instantiated_after_config_load(self):
"""
If a cache is instantiated after the config is read, it will be
immediately resized to the correct size given the global factor if there
is no per-cache factor.
"""
config = {"caches": {"global_factor": 1.5}}
t = TestConfig()
t.read_config(config, config_dir_path="", data_dir_path="")
cache = LruCache(100)
add_resizable_cache("foo", cache.set_cache_factor)
self.assertEqual(cache.max_size, 150)

View File

@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent
from synapse.push.push_rule_evaluator import PushRuleEvaluatorForEvent
from tests import unittest
class PushRuleEvaluatorTestCase(unittest.TestCase):
def setUp(self):
event = FrozenEvent(
{
"event_id": "$event_id",
"type": "m.room.history_visibility",
"sender": "@user:test",
"state_key": "",
"room_id": "@room:test",
"content": {"body": "foo bar baz"},
},
RoomVersions.V1,
)
room_member_count = 0
sender_power_level = 0
power_levels = {}
self.evaluator = PushRuleEvaluatorForEvent(
event, room_member_count, sender_power_level, power_levels
)
def test_display_name(self):
"""Check for a matching display name in the body of the event."""
condition = {
"kind": "contains_display_name",
}
# Blank names are skipped.
self.assertFalse(self.evaluator.matches(condition, "@user:test", ""))
# Check a display name that doesn't match.
self.assertFalse(self.evaluator.matches(condition, "@user:test", "not found"))
# Check a display name which matches.
self.assertTrue(self.evaluator.matches(condition, "@user:test", "foo"))
# A display name that matches, but not a full word does not result in a match.
self.assertFalse(self.evaluator.matches(condition, "@user:test", "ba"))
# A display name should not be interpreted as a regular expression.
self.assertFalse(self.evaluator.matches(condition, "@user:test", "ba[rz]"))
# A display name with spaces should work fine.
self.assertTrue(self.evaluator.matches(condition, "@user:test", "foo bar"))

View File

@@ -25,8 +25,8 @@ from synapse.util.caches.descriptors import Cache, cached
from tests import unittest
class CacheTestCase(unittest.TestCase):
def setUp(self):
class CacheTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.cache = Cache("test")
def test_empty(self):
@@ -96,7 +96,7 @@ class CacheTestCase(unittest.TestCase):
cache.get(3)
class CacheDecoratorTestCase(unittest.TestCase):
class CacheDecoratorTestCase(unittest.HomeserverTestCase):
@defer.inlineCallbacks
def test_passthrough(self):
class A(object):
@@ -239,7 +239,7 @@ class CacheDecoratorTestCase(unittest.TestCase):
callcount2 = [0]
class A(object):
@cached(max_entries=4) # HACK: This makes it 2 due to cache factor
@cached(max_entries=2)
def func(self, key):
callcount[0] += 1
return key

View File

@@ -43,7 +43,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
)
hs.config.app_service_config_files = self.as_yaml_files
hs.config.event_cache_size = 1
hs.config.caches.event_cache_size = 1
hs.config.password_providers = []
self.as_token = "token1"
@@ -110,7 +110,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
)
hs.config.app_service_config_files = self.as_yaml_files
hs.config.event_cache_size = 1
hs.config.caches.event_cache_size = 1
hs.config.password_providers = []
self.as_list = [
@@ -422,7 +422,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
)
hs.config.app_service_config_files = [f1, f2]
hs.config.event_cache_size = 1
hs.config.caches.event_cache_size = 1
hs.config.password_providers = []
database = hs.get_datastores().databases[0]
@@ -440,7 +440,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
)
hs.config.app_service_config_files = [f1, f2]
hs.config.event_cache_size = 1
hs.config.caches.event_cache_size = 1
hs.config.password_providers = []
with self.assertRaises(ConfigError) as cm:
@@ -464,7 +464,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
)
hs.config.app_service_config_files = [f1, f2]
hs.config.event_cache_size = 1
hs.config.caches.event_cache_size = 1
hs.config.password_providers = []
with self.assertRaises(ConfigError) as cm:

View File

@@ -51,7 +51,8 @@ class SQLBaseStoreTestCase(unittest.TestCase):
config = Mock()
config._disable_native_upserts = True
config.event_cache_size = 1
config.caches = Mock()
config.caches.event_cache_size = 1
hs = TestHomeServer("test", config=config)
sqlite_config = {"name": "sqlite3"}

View File

@@ -15,6 +15,7 @@
# limitations under the License.
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.util.caches.descriptors import Cache
from tests import unittest
@@ -129,3 +130,36 @@ class BuildInfoTests(unittest.TestCase):
self.assertTrue(b"osversion=" in items[0])
self.assertTrue(b"pythonversion=" in items[0])
self.assertTrue(b"version=" in items[0])
class CacheMetricsTests(unittest.HomeserverTestCase):
def test_cache_metric(self):
"""
Caches produce metrics reflecting their state when scraped.
"""
CACHE_NAME = "cache_metrics_test_fgjkbdfg"
cache = Cache(CACHE_NAME, max_entries=777)
items = {
x.split(b"{")[0].decode("ascii"): x.split(b" ")[1].decode("ascii")
for x in filter(
lambda x: b"cache_metrics_test_fgjkbdfg" in x,
generate_latest(REGISTRY).split(b"\n"),
)
}
self.assertEqual(items["synapse_util_caches_cache_size"], "0.0")
self.assertEqual(items["synapse_util_caches_cache_max_size"], "777.0")
cache.prefill("1", "hi")
items = {
x.split(b"{")[0].decode("ascii"): x.split(b" ")[1].decode("ascii")
for x in filter(
lambda x: b"cache_metrics_test_fgjkbdfg" in x,
generate_latest(REGISTRY).split(b"\n"),
)
}
self.assertEqual(items["synapse_util_caches_cache_size"], "1.0")
self.assertEqual(items["synapse_util_caches_cache_max_size"], "777.0")

View File

@@ -21,7 +21,7 @@ from tests.utils import MockClock
from .. import unittest
class ExpiringCacheTestCase(unittest.TestCase):
class ExpiringCacheTestCase(unittest.HomeserverTestCase):
def test_get_set(self):
clock = MockClock()
cache = ExpiringCache("test", clock, max_len=1)

View File

@@ -22,7 +22,7 @@ from synapse.util.caches.treecache import TreeCache
from .. import unittest
class LruCacheTestCase(unittest.TestCase):
class LruCacheTestCase(unittest.HomeserverTestCase):
def test_get_set(self):
cache = LruCache(1)
cache["key"] = "value"
@@ -84,7 +84,7 @@ class LruCacheTestCase(unittest.TestCase):
self.assertEquals(len(cache), 0)
class LruCacheCallbacksTestCase(unittest.TestCase):
class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
def test_get(self):
m = Mock()
cache = LruCache(1)
@@ -233,7 +233,7 @@ class LruCacheCallbacksTestCase(unittest.TestCase):
self.assertEquals(m3.call_count, 1)
class LruCacheSizedTestCase(unittest.TestCase):
class LruCacheSizedTestCase(unittest.HomeserverTestCase):
def test_evict(self):
cache = LruCache(5, size_callback=len)
cache["key1"] = [0]

View File

@@ -1,11 +1,9 @@
from mock import patch
from synapse.util.caches.stream_change_cache import StreamChangeCache
from tests import unittest
class StreamChangeCacheTests(unittest.TestCase):
class StreamChangeCacheTests(unittest.HomeserverTestCase):
"""
Tests for StreamChangeCache.
"""
@@ -46,7 +44,6 @@ class StreamChangeCacheTests(unittest.TestCase):
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
@patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0)
def test_has_entity_changed_pops_off_start(self):
"""
StreamChangeCache.entity_has_changed will respect the max size and

View File

@@ -164,6 +164,7 @@ def default_config(name, parse=False):
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
"update_user_directory": False,
"caches": {"global_factor": 1},
}
if parse:

View File

@@ -194,6 +194,7 @@ commands = mypy \
synapse/metrics \
synapse/module_api \
synapse/push/pusherpool.py \
synapse/push/push_rule_evaluator.py \
synapse/replication \
synapse/rest \
synapse/spam_checker_api \