Be able to shutdown homeserver that hasn't setup (#19187)
For example, a homeserver can fail to `setup` if it fails to connect to the database. Fix https://github.com/element-hq/synapse/issues/19188 Follow-up to https://github.com/element-hq/synapse/pull/18828 ### Background As part of Element's plan to support a light form of vhosting (virtual host) (multiple instances of Synapse in the same Python process) (c.f Synapse Pro for small hosts), we're currently diving into the details and implications of running multiple instances of Synapse in the same Python process. "Clean tenant deprovisioning" tracked internally by https://github.com/element-hq/synapse-small-hosts/issues/50
This commit is contained in:
1
changelog.d/19187.misc
Normal file
1
changelog.d/19187.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Fix `HomeServer.shutdown()` failing if the homeserver hasn't been setup yet.
|
||||||
@@ -856,6 +856,12 @@ class HttpResponseException(CodeMessageException):
|
|||||||
return ProxiedRequestError(self.code, errmsg, errcode, j)
|
return ProxiedRequestError(self.code, errmsg, errcode, j)
|
||||||
|
|
||||||
|
|
||||||
|
class HomeServerNotSetupException(Exception):
|
||||||
|
"""
|
||||||
|
Raised when an operation is attempted on the HomeServer before setup() has been called.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class ShadowBanError(Exception):
|
class ShadowBanError(Exception):
|
||||||
"""
|
"""
|
||||||
Raised when a shadow-banned user attempts to perform an action.
|
Raised when a shadow-banned user attempts to perform an action.
|
||||||
|
|||||||
@@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
import abc
|
import abc
|
||||||
import logging
|
import logging
|
||||||
|
from contextlib import ExitStack
|
||||||
from typing import TYPE_CHECKING, Callable, Iterable
|
from typing import TYPE_CHECKING, Callable, Iterable
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
@@ -150,57 +151,81 @@ class Keyring:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, hs: "HomeServer", key_fetchers: "Iterable[KeyFetcher] | None" = None
|
self,
|
||||||
|
hs: "HomeServer",
|
||||||
|
test_only_key_fetchers: "list[KeyFetcher] | None" = None,
|
||||||
):
|
):
|
||||||
self.server_name = hs.hostname
|
"""
|
||||||
|
Args:
|
||||||
|
hs: The HomeServer instance
|
||||||
|
test_only_key_fetchers: Dependency injection for tests only. If provided,
|
||||||
|
these key fetchers will be used instead of the default ones.
|
||||||
|
"""
|
||||||
|
# Clean-up to avoid partial initialization leaving behind references.
|
||||||
|
with ExitStack() as exit:
|
||||||
|
self.server_name = hs.hostname
|
||||||
|
|
||||||
if key_fetchers is None:
|
self._key_fetchers: list[KeyFetcher] = []
|
||||||
# Always fetch keys from the database.
|
if test_only_key_fetchers is None:
|
||||||
mutable_key_fetchers: list[KeyFetcher] = [StoreKeyFetcher(hs)]
|
# Always fetch keys from the database.
|
||||||
# Fetch keys from configured trusted key servers, if any exist.
|
store_key_fetcher = StoreKeyFetcher(hs)
|
||||||
key_servers = hs.config.key.key_servers
|
exit.callback(store_key_fetcher.shutdown)
|
||||||
if key_servers:
|
self._key_fetchers.append(store_key_fetcher)
|
||||||
mutable_key_fetchers.append(PerspectivesKeyFetcher(hs))
|
|
||||||
# Finally, fetch keys from the origin server directly.
|
|
||||||
mutable_key_fetchers.append(ServerKeyFetcher(hs))
|
|
||||||
|
|
||||||
self._key_fetchers: Iterable[KeyFetcher] = tuple(mutable_key_fetchers)
|
# Fetch keys from configured trusted key servers, if any exist.
|
||||||
else:
|
key_servers = hs.config.key.key_servers
|
||||||
self._key_fetchers = key_fetchers
|
if key_servers:
|
||||||
|
perspectives_key_fetcher = PerspectivesKeyFetcher(hs)
|
||||||
|
exit.callback(perspectives_key_fetcher.shutdown)
|
||||||
|
self._key_fetchers.append(perspectives_key_fetcher)
|
||||||
|
|
||||||
self._fetch_keys_queue: BatchingQueue[
|
# Finally, fetch keys from the origin server directly.
|
||||||
_FetchKeyRequest, dict[str, dict[str, FetchKeyResult]]
|
server_key_fetcher = ServerKeyFetcher(hs)
|
||||||
] = BatchingQueue(
|
exit.callback(server_key_fetcher.shutdown)
|
||||||
name="keyring_server",
|
self._key_fetchers.append(server_key_fetcher)
|
||||||
hs=hs,
|
else:
|
||||||
clock=hs.get_clock(),
|
self._key_fetchers = test_only_key_fetchers
|
||||||
# The method called to fetch each key
|
|
||||||
process_batch_callback=self._inner_fetch_key_requests,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._is_mine_server_name = hs.is_mine_server_name
|
self._fetch_keys_queue: BatchingQueue[
|
||||||
|
_FetchKeyRequest, dict[str, dict[str, FetchKeyResult]]
|
||||||
|
] = BatchingQueue(
|
||||||
|
name="keyring_server",
|
||||||
|
hs=hs,
|
||||||
|
clock=hs.get_clock(),
|
||||||
|
# The method called to fetch each key
|
||||||
|
process_batch_callback=self._inner_fetch_key_requests,
|
||||||
|
)
|
||||||
|
exit.callback(self._fetch_keys_queue.shutdown)
|
||||||
|
|
||||||
# build a FetchKeyResult for each of our own keys, to shortcircuit the
|
self._is_mine_server_name = hs.is_mine_server_name
|
||||||
# fetcher.
|
|
||||||
self._local_verify_keys: dict[str, FetchKeyResult] = {}
|
# build a FetchKeyResult for each of our own keys, to shortcircuit the
|
||||||
for key_id, key in hs.config.key.old_signing_keys.items():
|
# fetcher.
|
||||||
self._local_verify_keys[key_id] = FetchKeyResult(
|
self._local_verify_keys: dict[str, FetchKeyResult] = {}
|
||||||
verify_key=key, valid_until_ts=key.expired
|
for key_id, key in hs.config.key.old_signing_keys.items():
|
||||||
|
self._local_verify_keys[key_id] = FetchKeyResult(
|
||||||
|
verify_key=key, valid_until_ts=key.expired
|
||||||
|
)
|
||||||
|
|
||||||
|
vk = get_verify_key(hs.signing_key)
|
||||||
|
self._local_verify_keys[f"{vk.alg}:{vk.version}"] = FetchKeyResult(
|
||||||
|
verify_key=vk,
|
||||||
|
valid_until_ts=2**63, # fake future timestamp
|
||||||
)
|
)
|
||||||
|
|
||||||
vk = get_verify_key(hs.signing_key)
|
# We reached the end of the block which means everything was successful, so
|
||||||
self._local_verify_keys[f"{vk.alg}:{vk.version}"] = FetchKeyResult(
|
# no exit handlers are needed (remove them all).
|
||||||
verify_key=vk,
|
exit.pop_all()
|
||||||
valid_until_ts=2**63, # fake future timestamp
|
|
||||||
)
|
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
"""
|
"""
|
||||||
Prepares the KeyRing for garbage collection by shutting down it's queues.
|
Prepares the KeyRing for garbage collection by shutting down it's queues.
|
||||||
"""
|
"""
|
||||||
self._fetch_keys_queue.shutdown()
|
self._fetch_keys_queue.shutdown()
|
||||||
|
|
||||||
for key_fetcher in self._key_fetchers:
|
for key_fetcher in self._key_fetchers:
|
||||||
key_fetcher.shutdown()
|
key_fetcher.shutdown()
|
||||||
|
self._key_fetchers.clear()
|
||||||
|
|
||||||
async def verify_json_for_server(
|
async def verify_json_for_server(
|
||||||
self,
|
self,
|
||||||
@@ -521,9 +546,21 @@ class StoreKeyFetcher(KeyFetcher):
|
|||||||
"""KeyFetcher impl which fetches keys from our data store"""
|
"""KeyFetcher impl which fetches keys from our data store"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
# Clean-up to avoid partial initialization leaving behind references.
|
||||||
|
with ExitStack() as exit:
|
||||||
|
super().__init__(hs)
|
||||||
|
# `KeyFetcher` keeps a reference to `hs` which we need to clean up if
|
||||||
|
# something goes wrong so we can cleanly shutdown the homeserver.
|
||||||
|
exit.callback(super().shutdown)
|
||||||
|
|
||||||
self.store = hs.get_datastores().main
|
# An error can be raised here if someone tried to create a `StoreKeyFetcher`
|
||||||
|
# before the homeserver is fully set up (`HomeServerNotSetupException:
|
||||||
|
# HomeServer.setup must be called before getting datastores`).
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
|
||||||
|
# We reached the end of the block which means everything was successful, so
|
||||||
|
# no exit handlers are needed (remove them all).
|
||||||
|
exit.pop_all()
|
||||||
|
|
||||||
async def _fetch_keys(
|
async def _fetch_keys(
|
||||||
self, keys_to_fetch: list[_FetchKeyRequest]
|
self, keys_to_fetch: list[_FetchKeyRequest]
|
||||||
@@ -543,9 +580,21 @@ class StoreKeyFetcher(KeyFetcher):
|
|||||||
|
|
||||||
class BaseV2KeyFetcher(KeyFetcher):
|
class BaseV2KeyFetcher(KeyFetcher):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
# Clean-up to avoid partial initialization leaving behind references.
|
||||||
|
with ExitStack() as exit:
|
||||||
|
super().__init__(hs)
|
||||||
|
# `KeyFetcher` keeps a reference to `hs` which we need to clean up if
|
||||||
|
# something goes wrong so we can cleanly shutdown the homeserver.
|
||||||
|
exit.callback(super().shutdown)
|
||||||
|
|
||||||
self.store = hs.get_datastores().main
|
# An error can be raised here if someone tried to create a `StoreKeyFetcher`
|
||||||
|
# before the homeserver is fully set up (`HomeServerNotSetupException:
|
||||||
|
# HomeServer.setup must be called before getting datastores`).
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
|
||||||
|
# We reached the end of the block which means everything was successful, so
|
||||||
|
# no exit handlers are needed (remove them all).
|
||||||
|
exit.pop_all()
|
||||||
|
|
||||||
async def process_v2_response(
|
async def process_v2_response(
|
||||||
self, from_server: str, response_json: JsonDict, time_added_ms: int
|
self, from_server: str, response_json: JsonDict, time_added_ms: int
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ from synapse.api.auth import Auth
|
|||||||
from synapse.api.auth.internal import InternalAuth
|
from synapse.api.auth.internal import InternalAuth
|
||||||
from synapse.api.auth.mas import MasDelegatedAuth
|
from synapse.api.auth.mas import MasDelegatedAuth
|
||||||
from synapse.api.auth_blocking import AuthBlocking
|
from synapse.api.auth_blocking import AuthBlocking
|
||||||
|
from synapse.api.errors import HomeServerNotSetupException
|
||||||
from synapse.api.filtering import Filtering
|
from synapse.api.filtering import Filtering
|
||||||
from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
|
from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
|
||||||
from synapse.app._base import unregister_sighups
|
from synapse.app._base import unregister_sighups
|
||||||
@@ -399,7 +400,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
"""
|
"""
|
||||||
if self._is_shutdown:
|
if self._is_shutdown:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
f"Cannot start background process. HomeServer has been shutdown {len(self._background_processes)} {len(self.get_clock()._looping_calls)} {len(self.get_clock()._call_id_to_delayed_call)}"
|
"Cannot start background process. HomeServer has been shutdown"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ignore linter error as this is the one location this should be called.
|
# Ignore linter error as this is the one location this should be called.
|
||||||
@@ -466,7 +467,17 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
# TODO: Cleanup replication pieces
|
# TODO: Cleanup replication pieces
|
||||||
|
|
||||||
self.get_keyring().shutdown()
|
keyring: Keyring | None = None
|
||||||
|
try:
|
||||||
|
keyring = self.get_keyring()
|
||||||
|
except HomeServerNotSetupException:
|
||||||
|
# If the homeserver wasn't fully setup, keyring won't have existed before
|
||||||
|
# this and will fail to be initialized but it cleans itself up for any
|
||||||
|
# partial initialization problem.
|
||||||
|
pass
|
||||||
|
|
||||||
|
if keyring:
|
||||||
|
keyring.shutdown()
|
||||||
|
|
||||||
# Cleanup metrics associated with the homeserver
|
# Cleanup metrics associated with the homeserver
|
||||||
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
|
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
|
||||||
@@ -478,8 +489,12 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
self.config.server.server_name
|
self.config.server.server_name
|
||||||
)
|
)
|
||||||
|
|
||||||
for db in self.get_datastores().databases:
|
try:
|
||||||
db.stop_background_updates()
|
for db in self.get_datastores().databases:
|
||||||
|
db.stop_background_updates()
|
||||||
|
except HomeServerNotSetupException:
|
||||||
|
# If the homeserver wasn't fully setup, the datastores won't exist
|
||||||
|
pass
|
||||||
|
|
||||||
if self.should_send_federation():
|
if self.should_send_federation():
|
||||||
try:
|
try:
|
||||||
@@ -513,8 +528,12 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
pass
|
pass
|
||||||
self._background_processes.clear()
|
self._background_processes.clear()
|
||||||
|
|
||||||
for db in self.get_datastores().databases:
|
try:
|
||||||
db._db_pool.close()
|
for db in self.get_datastores().databases:
|
||||||
|
db._db_pool.close()
|
||||||
|
except HomeServerNotSetupException:
|
||||||
|
# If the homeserver wasn't fully setup, the datastores won't exist
|
||||||
|
pass
|
||||||
|
|
||||||
def register_async_shutdown_handler(
|
def register_async_shutdown_handler(
|
||||||
self,
|
self,
|
||||||
@@ -677,7 +696,9 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
def get_datastores(self) -> Databases:
|
def get_datastores(self) -> Databases:
|
||||||
if not self.datastores:
|
if not self.datastores:
|
||||||
raise Exception("HomeServer.setup must be called before getting datastores")
|
raise HomeServerNotSetupException(
|
||||||
|
"HomeServer.setup must be called before getting datastores"
|
||||||
|
)
|
||||||
|
|
||||||
return self.datastores
|
return self.datastores
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,10 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import gc
|
import gc
|
||||||
|
import sys
|
||||||
import weakref
|
import weakref
|
||||||
|
from typing import Any
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
from synapse.app.homeserver import SynapseHomeServer
|
from synapse.app.homeserver import SynapseHomeServer
|
||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
@@ -81,45 +84,12 @@ class HomeserverCleanShutdownTestCase(HomeserverTestCase):
|
|||||||
|
|
||||||
# Ensure the `HomeServer` hs been garbage collected by attempting to use the
|
# Ensure the `HomeServer` hs been garbage collected by attempting to use the
|
||||||
# weakref to it.
|
# weakref to it.
|
||||||
if hs_ref() is not None:
|
hs_after_shutdown = hs_ref()
|
||||||
self.fail("HomeServer reference should not be valid at this point")
|
if hs_after_shutdown is not None:
|
||||||
|
self.fail(
|
||||||
# To help debug this test when it fails, it is useful to leverage the
|
"HomeServer reference should not be valid at this point "
|
||||||
# `objgraph` module.
|
f"{get_memory_debug_info_for_object(hs_after_shutdown)}",
|
||||||
# The following code serves as an example of what I have found to be useful
|
)
|
||||||
# when tracking down references holding the `SynapseHomeServer` in memory:
|
|
||||||
#
|
|
||||||
# all_objects = gc.get_objects()
|
|
||||||
# for obj in all_objects:
|
|
||||||
# try:
|
|
||||||
# # These are a subset of types that are typically involved with
|
|
||||||
# # holding the `HomeServer` in memory. You may want to inspect
|
|
||||||
# # other types as well.
|
|
||||||
# if isinstance(obj, DataStore):
|
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
|
||||||
# db_obj = obj
|
|
||||||
# if isinstance(obj, SynapseHomeServer):
|
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
|
||||||
# synapse_hs = obj
|
|
||||||
# if isinstance(obj, SynapseSite):
|
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
|
||||||
# sysite = obj
|
|
||||||
# if isinstance(obj, DatabasePool):
|
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
|
||||||
# dbpool = obj
|
|
||||||
# except Exception:
|
|
||||||
# pass
|
|
||||||
#
|
|
||||||
# print(sys.getrefcount(hs_ref()), "refs to", hs_ref())
|
|
||||||
#
|
|
||||||
# # The following values for `max_depth` and `too_many` have been found to
|
|
||||||
# # render a useful amount of information without taking an overly long time
|
|
||||||
# # to generate the result.
|
|
||||||
# objgraph.show_backrefs(synapse_hs, max_depth=10, too_many=10)
|
|
||||||
|
|
||||||
@logcontext_clean
|
@logcontext_clean
|
||||||
def test_clean_homeserver_shutdown_mid_background_updates(self) -> None:
|
def test_clean_homeserver_shutdown_mid_background_updates(self) -> None:
|
||||||
@@ -165,42 +135,137 @@ class HomeserverCleanShutdownTestCase(HomeserverTestCase):
|
|||||||
|
|
||||||
# Ensure the `HomeServer` hs been garbage collected by attempting to use the
|
# Ensure the `HomeServer` hs been garbage collected by attempting to use the
|
||||||
# weakref to it.
|
# weakref to it.
|
||||||
if hs_ref() is not None:
|
hs_after_shutdown = hs_ref()
|
||||||
self.fail("HomeServer reference should not be valid at this point")
|
if hs_after_shutdown is not None:
|
||||||
|
self.fail(
|
||||||
|
"HomeServer reference should not be valid at this point "
|
||||||
|
f"{get_memory_debug_info_for_object(hs_after_shutdown)}",
|
||||||
|
)
|
||||||
|
|
||||||
# To help debug this test when it fails, it is useful to leverage the
|
@logcontext_clean
|
||||||
# `objgraph` module.
|
def test_clean_homeserver_shutdown_when_failed_to_setup(self) -> None:
|
||||||
# The following code serves as an example of what I have found to be useful
|
"""
|
||||||
# when tracking down references holding the `SynapseHomeServer` in memory:
|
Ensure the `SynapseHomeServer` can be fully shutdown and garbage collected if it
|
||||||
#
|
fails to be `setup`.
|
||||||
# all_objects = gc.get_objects()
|
"""
|
||||||
# for obj in all_objects:
|
self.reactor, self.clock = get_clock()
|
||||||
# try:
|
|
||||||
# # These are a subset of types that are typically involved with
|
# Patch `hs.setup()` to do nothing, so that the homeserver is not fully setup.
|
||||||
# # holding the `HomeServer` in memory. You may want to inspect
|
with patch.object(SynapseHomeServer, "setup", return_value=None) as mock_setup:
|
||||||
# # other types as well.
|
# Patch out the call to `start_test_homeserver` since we want access to the
|
||||||
# if isinstance(obj, DataStore):
|
# homeserver even before the server is setup (let alone started)
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
with patch("tests.server.start_test_homeserver", return_value=None):
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
self.hs = setup_test_homeserver(
|
||||||
# db_obj = obj
|
cleanup_func=self.addCleanup,
|
||||||
# if isinstance(obj, SynapseHomeServer):
|
reactor=self.reactor,
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
homeserver_to_use=SynapseHomeServer,
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
clock=self.clock,
|
||||||
# synapse_hs = obj
|
)
|
||||||
# if isinstance(obj, SynapseSite):
|
# Sanity check that we patched the correct method (make sure it was the
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
# thing that was called)
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
mock_setup.assert_called_once_with()
|
||||||
# sysite = obj
|
|
||||||
# if isinstance(obj, DatabasePool):
|
hs_ref = weakref.ref(self.hs)
|
||||||
# print(sys.getrefcount(obj), "refs to", obj)
|
|
||||||
# if not isinstance(obj, weakref.ProxyType):
|
# Run the reactor so any `callWhenRunning` functions can be cleared out.
|
||||||
# dbpool = obj
|
self.reactor.run()
|
||||||
# except Exception:
|
# This would normally happen as part of `HomeServer.shutdown` but the `MemoryReactor`
|
||||||
# pass
|
# we use in tests doesn't handle this properly (see doc comment)
|
||||||
#
|
cleanup_test_reactor_system_event_triggers(self.reactor)
|
||||||
# print(sys.getrefcount(hs_ref()), "refs to", hs_ref())
|
|
||||||
#
|
async def shutdown() -> None:
|
||||||
# # The following values for `max_depth` and `too_many` have been found to
|
# Use a logcontext just to double-check that we don't mangle the logcontext
|
||||||
# # render a useful amount of information without taking an overly long time
|
# during shutdown.
|
||||||
# # to generate the result.
|
with LoggingContext(name="hs_shutdown", server_name=self.hs.hostname):
|
||||||
# objgraph.show_backrefs(synapse_hs, max_depth=10, too_many=10)
|
await self.hs.shutdown()
|
||||||
|
|
||||||
|
self.get_success(shutdown())
|
||||||
|
|
||||||
|
# Cleanup the internal reference in our test case
|
||||||
|
del self.hs
|
||||||
|
|
||||||
|
# Force garbage collection.
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
# Ensure the `HomeServer` hs been garbage collected by attempting to use the
|
||||||
|
# weakref to it.
|
||||||
|
hs_after_shutdown = hs_ref()
|
||||||
|
if hs_after_shutdown is not None:
|
||||||
|
self.fail(
|
||||||
|
"HomeServer reference should not be valid at this point "
|
||||||
|
f"{get_memory_debug_info_for_object(hs_after_shutdown)}",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_memory_debug_info_for_object(object: Any) -> dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Gathers some useful information to make it easier to figure out why the `object` is
|
||||||
|
still in memory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
object: The object to gather debug information for.
|
||||||
|
"""
|
||||||
|
debug: dict[str, Any] = {}
|
||||||
|
if object is not None:
|
||||||
|
# The simplest tracing we can do is show the reference count for the object.
|
||||||
|
debug["reference_count"] = sys.getrefcount(object)
|
||||||
|
|
||||||
|
# Find the list of objects that directly refer to the object.
|
||||||
|
#
|
||||||
|
# Note: The `ref_count` can be >0 but `referrers` can be empty because
|
||||||
|
# the all of the objects were frozen. Look at the
|
||||||
|
# `frozen_object_count` to detect this scenario.
|
||||||
|
referrers = gc.get_referrers(object)
|
||||||
|
debug["gc_referrer_count"] = len(referrers)
|
||||||
|
debug["gc_referrers"] = referrers
|
||||||
|
|
||||||
|
# We don't expect to see frozen objects in normal operation of the
|
||||||
|
# `multi_synapse` shard.
|
||||||
|
#
|
||||||
|
# We can see frozen objects if you forget to `freeze=False` when
|
||||||
|
# starting the `SynapseHomeServer`. Frozen objects mean they are
|
||||||
|
# never considered for garbage collection. If the
|
||||||
|
# `SynapseHomeServer` (or anything that references the homeserver)
|
||||||
|
# is frozen, the homeserver can never be garbage collected and will
|
||||||
|
# linger in memory forever.
|
||||||
|
freeze_count = gc.get_freeze_count()
|
||||||
|
debug["gc_global_frozen_object_count"] = freeze_count
|
||||||
|
|
||||||
|
# To help debug this test when it fails, it is useful to leverage the
|
||||||
|
# `objgraph` module.
|
||||||
|
# The following code serves as an example of what I have found to be useful
|
||||||
|
# when tracking down references holding the `SynapseHomeServer` in memory:
|
||||||
|
#
|
||||||
|
# all_objects = gc.get_objects()
|
||||||
|
# for obj in all_objects:
|
||||||
|
# try:
|
||||||
|
# # These are a subset of types that are typically involved with
|
||||||
|
# # holding the `HomeServer` in memory. You may want to inspect
|
||||||
|
# # other types as well.
|
||||||
|
# if isinstance(obj, DataStore):
|
||||||
|
# print(sys.getrefcount(obj), "refs to", obj)
|
||||||
|
# if not isinstance(obj, weakref.ProxyType):
|
||||||
|
# db_obj = obj
|
||||||
|
# if isinstance(obj, SynapseHomeServer):
|
||||||
|
# print(sys.getrefcount(obj), "refs to", obj)
|
||||||
|
# if not isinstance(obj, weakref.ProxyType):
|
||||||
|
# synapse_hs = obj
|
||||||
|
# if isinstance(obj, SynapseSite):
|
||||||
|
# print(sys.getrefcount(obj), "refs to", obj)
|
||||||
|
# if not isinstance(obj, weakref.ProxyType):
|
||||||
|
# sysite = obj
|
||||||
|
# if isinstance(obj, DatabasePool):
|
||||||
|
# print(sys.getrefcount(obj), "refs to", obj)
|
||||||
|
# if not isinstance(obj, weakref.ProxyType):
|
||||||
|
# dbpool = obj
|
||||||
|
# except Exception:
|
||||||
|
# pass
|
||||||
|
#
|
||||||
|
# print(sys.getrefcount(hs_ref()), "refs to", hs_ref())
|
||||||
|
#
|
||||||
|
# # The following values for `max_depth` and `too_many` have been found to
|
||||||
|
# # render a useful amount of information without taking an overly long time
|
||||||
|
# # to generate the result.
|
||||||
|
# objgraph.show_backrefs(synapse_hs, max_depth=10, too_many=10)
|
||||||
|
|
||||||
|
return debug
|
||||||
|
|||||||
@@ -95,7 +95,12 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
|||||||
def test_verify_json_objects_for_server_awaits_previous_requests(self) -> None:
|
def test_verify_json_objects_for_server_awaits_previous_requests(self) -> None:
|
||||||
mock_fetcher = Mock()
|
mock_fetcher = Mock()
|
||||||
mock_fetcher.get_keys = Mock()
|
mock_fetcher.get_keys = Mock()
|
||||||
kr = keyring.Keyring(self.hs, key_fetchers=(mock_fetcher,))
|
kr = keyring.Keyring(
|
||||||
|
self.hs,
|
||||||
|
test_only_key_fetchers=[
|
||||||
|
mock_fetcher,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
# a signed object that we are going to try to validate
|
# a signed object that we are going to try to validate
|
||||||
key1 = signedjson.key.generate_signing_key("1")
|
key1 = signedjson.key.generate_signing_key("1")
|
||||||
@@ -286,7 +291,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
|||||||
mock_fetcher = Mock()
|
mock_fetcher = Mock()
|
||||||
mock_fetcher.get_keys = Mock(side_effect=get_keys)
|
mock_fetcher.get_keys = Mock(side_effect=get_keys)
|
||||||
kr = keyring.Keyring(
|
kr = keyring.Keyring(
|
||||||
self.hs, key_fetchers=(StoreKeyFetcher(self.hs), mock_fetcher)
|
self.hs, test_only_key_fetchers=[StoreKeyFetcher(self.hs), mock_fetcher]
|
||||||
)
|
)
|
||||||
|
|
||||||
# sign the json
|
# sign the json
|
||||||
@@ -313,7 +318,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
|||||||
|
|
||||||
mock_fetcher = Mock()
|
mock_fetcher = Mock()
|
||||||
mock_fetcher.get_keys = Mock(side_effect=get_keys)
|
mock_fetcher.get_keys = Mock(side_effect=get_keys)
|
||||||
kr = keyring.Keyring(self.hs, key_fetchers=(mock_fetcher,))
|
kr = keyring.Keyring(self.hs, test_only_key_fetchers=[mock_fetcher])
|
||||||
|
|
||||||
json1: JsonDict = {}
|
json1: JsonDict = {}
|
||||||
signedjson.sign.sign_json(json1, "server1", key1)
|
signedjson.sign.sign_json(json1, "server1", key1)
|
||||||
@@ -363,7 +368,9 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
|||||||
mock_fetcher1.get_keys = Mock(side_effect=get_keys1)
|
mock_fetcher1.get_keys = Mock(side_effect=get_keys1)
|
||||||
mock_fetcher2 = Mock()
|
mock_fetcher2 = Mock()
|
||||||
mock_fetcher2.get_keys = Mock(side_effect=get_keys2)
|
mock_fetcher2.get_keys = Mock(side_effect=get_keys2)
|
||||||
kr = keyring.Keyring(self.hs, key_fetchers=(mock_fetcher1, mock_fetcher2))
|
kr = keyring.Keyring(
|
||||||
|
self.hs, test_only_key_fetchers=[mock_fetcher1, mock_fetcher2]
|
||||||
|
)
|
||||||
|
|
||||||
json1: JsonDict = {}
|
json1: JsonDict = {}
|
||||||
signedjson.sign.sign_json(json1, "server1", key1)
|
signedjson.sign.sign_json(json1, "server1", key1)
|
||||||
|
|||||||
159
tests/server.py
159
tests/server.py
@@ -1074,10 +1074,10 @@ def setup_test_homeserver(
|
|||||||
If no datastore is supplied, one is created and given to the homeserver.
|
If no datastore is supplied, one is created and given to the homeserver.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cleanup_func : The function used to register a cleanup routine for
|
cleanup_func: The function used to register a cleanup routine for
|
||||||
after the test. If the function returns a Deferred, the
|
after the test. If the function returns a Deferred, the
|
||||||
test case will wait until the Deferred has fired before
|
test case will wait until the Deferred has fired before
|
||||||
proceeding to the next cleanup function.
|
proceeding to the next cleanup function.
|
||||||
server_name: Homeserver name
|
server_name: Homeserver name
|
||||||
config: Homeserver config
|
config: Homeserver config
|
||||||
reactor: Twisted reactor
|
reactor: Twisted reactor
|
||||||
@@ -1190,68 +1190,9 @@ def setup_test_homeserver(
|
|||||||
cur.close()
|
cur.close()
|
||||||
db_conn.close()
|
db_conn.close()
|
||||||
|
|
||||||
hs = homeserver_to_use(
|
|
||||||
server_name,
|
|
||||||
config=config,
|
|
||||||
reactor=reactor,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Capture the `hs` as a `weakref` here to ensure there is no scenario where uncalled
|
|
||||||
# cleanup functions result in holding the `hs` in memory.
|
|
||||||
cleanup_hs_ref = weakref.ref(hs)
|
|
||||||
|
|
||||||
def shutdown_hs_on_cleanup() -> "Deferred[None]":
|
|
||||||
cleanup_hs = cleanup_hs_ref()
|
|
||||||
deferred: "Deferred[None]" = defer.succeed(None)
|
|
||||||
if cleanup_hs is not None:
|
|
||||||
deferred = defer.ensureDeferred(cleanup_hs.shutdown())
|
|
||||||
return deferred
|
|
||||||
|
|
||||||
# Register the cleanup hook for the homeserver.
|
|
||||||
# A full `hs.shutdown()` is necessary otherwise CI tests will fail while exhibiting
|
|
||||||
# strange behaviours.
|
|
||||||
cleanup_func(shutdown_hs_on_cleanup)
|
|
||||||
|
|
||||||
# Install @cache_in_self attributes
|
|
||||||
for key, val in extra_homeserver_attributes.items():
|
|
||||||
setattr(hs, "_" + key, val)
|
|
||||||
|
|
||||||
# Mock TLS
|
|
||||||
hs.tls_server_context_factory = Mock()
|
|
||||||
|
|
||||||
# Patch `make_pool` before initialising the database, to make database transactions
|
|
||||||
# synchronous for testing.
|
|
||||||
with patch("synapse.storage.database.make_pool", side_effect=make_fake_db_pool):
|
|
||||||
hs.setup()
|
|
||||||
|
|
||||||
# Register background tasks required by this server. This must be done
|
|
||||||
# somewhat manually due to the background tasks not being registered
|
|
||||||
# unless handlers are instantiated.
|
|
||||||
#
|
|
||||||
# Since, we don't have to worry about `daemonize` (forking the process) in tests, we
|
|
||||||
# can just start the background tasks straight away after `hs.setup`. (compare this
|
|
||||||
# with where we call `hs.start_background_tasks()` outside of the test environment).
|
|
||||||
if hs.config.worker.run_background_tasks:
|
|
||||||
hs.start_background_tasks()
|
|
||||||
|
|
||||||
# Since we've changed the databases to run DB transactions on the same
|
|
||||||
# thread, we need to stop the event fetcher hogging that one thread.
|
|
||||||
hs.get_datastores().main.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING = False
|
|
||||||
|
|
||||||
if USE_POSTGRES_FOR_TESTS:
|
|
||||||
# Capture the `database_pool` as a `weakref` here to ensure there is no scenario where uncalled
|
|
||||||
# cleanup functions result in holding the `hs` in memory.
|
|
||||||
database_pool = weakref.ref(hs.get_datastores().databases[0])
|
|
||||||
|
|
||||||
# We need to do cleanup on PostgreSQL
|
|
||||||
def cleanup() -> None:
|
def cleanup() -> None:
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
|
||||||
# Close all the db pools
|
|
||||||
db_pool = database_pool()
|
|
||||||
if db_pool is not None:
|
|
||||||
db_pool._db_pool.close()
|
|
||||||
|
|
||||||
dropped = False
|
dropped = False
|
||||||
|
|
||||||
# Drop the test database
|
# Drop the test database
|
||||||
@@ -1296,6 +1237,96 @@ def setup_test_homeserver(
|
|||||||
# Register the cleanup hook
|
# Register the cleanup hook
|
||||||
cleanup_func(cleanup)
|
cleanup_func(cleanup)
|
||||||
|
|
||||||
|
hs = homeserver_to_use(
|
||||||
|
server_name,
|
||||||
|
config=config,
|
||||||
|
reactor=reactor,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Capture the `hs` as a `weakref` here to ensure there is no scenario where uncalled
|
||||||
|
# cleanup functions result in holding the `hs` in memory.
|
||||||
|
cleanup_hs_ref = weakref.ref(hs)
|
||||||
|
|
||||||
|
def shutdown_hs_on_cleanup() -> "Deferred[None]":
|
||||||
|
cleanup_hs = cleanup_hs_ref()
|
||||||
|
deferred: "Deferred[None]" = defer.succeed(None)
|
||||||
|
if cleanup_hs is not None:
|
||||||
|
deferred = defer.ensureDeferred(cleanup_hs.shutdown())
|
||||||
|
return deferred
|
||||||
|
|
||||||
|
# Register the cleanup hook for the homeserver.
|
||||||
|
# A full `hs.shutdown()` is necessary otherwise CI tests will fail while exhibiting
|
||||||
|
# strange behaviours.
|
||||||
|
cleanup_func(shutdown_hs_on_cleanup)
|
||||||
|
|
||||||
|
# Install @cache_in_self attributes
|
||||||
|
for key, val in extra_homeserver_attributes.items():
|
||||||
|
setattr(hs, "_" + key, val)
|
||||||
|
|
||||||
|
# Mock TLS
|
||||||
|
hs.tls_server_context_factory = Mock()
|
||||||
|
|
||||||
|
# Patch `make_pool` before initialising the database, to make database transactions
|
||||||
|
# synchronous for testing.
|
||||||
|
with patch("synapse.storage.database.make_pool", side_effect=make_fake_db_pool):
|
||||||
|
hs.setup()
|
||||||
|
|
||||||
|
# Ideally, setup/start would be separated but since this is historically used
|
||||||
|
# throughout tests, we keep the existing behavior for now. We probably just need to
|
||||||
|
# rename this function.
|
||||||
|
start_test_homeserver(hs=hs, cleanup_func=cleanup_func, reactor=reactor)
|
||||||
|
|
||||||
|
return hs
|
||||||
|
|
||||||
|
|
||||||
|
def start_test_homeserver(
|
||||||
|
*,
|
||||||
|
hs: HomeServer,
|
||||||
|
cleanup_func: Callable[[Callable[[], Optional["Deferred[None]"]]], None],
|
||||||
|
reactor: ISynapseReactor,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Start a homeserver for testing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
hs: The homeserver to start.
|
||||||
|
cleanup_func: The function used to register a cleanup routine for
|
||||||
|
after the test. If the function returns a Deferred, the
|
||||||
|
test case will wait until the Deferred has fired before
|
||||||
|
proceeding to the next cleanup function.
|
||||||
|
reactor: Twisted reactor
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Register background tasks required by this server. This must be done
|
||||||
|
# somewhat manually due to the background tasks not being registered
|
||||||
|
# unless handlers are instantiated.
|
||||||
|
#
|
||||||
|
# Since, we don't have to worry about `daemonize` (forking the process) in tests, we
|
||||||
|
# can just start the background tasks straight away after `hs.setup`. (compare this
|
||||||
|
# with where we call `hs.start_background_tasks()` outside of the test environment).
|
||||||
|
if hs.config.worker.run_background_tasks:
|
||||||
|
hs.start_background_tasks()
|
||||||
|
|
||||||
|
# Since we've changed the databases to run DB transactions on the same
|
||||||
|
# thread, we need to stop the event fetcher hogging that one thread.
|
||||||
|
hs.get_datastores().main.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING = False
|
||||||
|
|
||||||
|
if USE_POSTGRES_FOR_TESTS:
|
||||||
|
# Capture the `database_pool` as a `weakref` here to ensure there is no scenario where uncalled
|
||||||
|
# cleanup functions result in holding the `hs` in memory.
|
||||||
|
database_pool = weakref.ref(hs.get_datastores().databases[0])
|
||||||
|
|
||||||
|
# We need to do cleanup on PostgreSQL
|
||||||
|
def cleanup() -> None:
|
||||||
|
# Close all the db pools
|
||||||
|
db_pool = database_pool()
|
||||||
|
if db_pool is not None:
|
||||||
|
db_pool._db_pool.close()
|
||||||
|
|
||||||
|
if not LEAVE_DB:
|
||||||
|
# Register the cleanup hook
|
||||||
|
cleanup_func(cleanup)
|
||||||
|
|
||||||
# bcrypt is far too slow to be doing in unit tests
|
# bcrypt is far too slow to be doing in unit tests
|
||||||
# Need to let the HS build an auth handler and then mess with it
|
# Need to let the HS build an auth handler and then mess with it
|
||||||
# because AuthHandler's constructor requires the HS, so we can't make one
|
# because AuthHandler's constructor requires the HS, so we can't make one
|
||||||
@@ -1330,5 +1361,3 @@ def setup_test_homeserver(
|
|||||||
load_legacy_third_party_event_rules(hs)
|
load_legacy_third_party_event_rules(hs)
|
||||||
load_legacy_presence_router(hs)
|
load_legacy_presence_router(hs)
|
||||||
load_legacy_password_auth_providers(hs)
|
load_legacy_password_auth_providers(hs)
|
||||||
|
|
||||||
return hs
|
|
||||||
|
|||||||
Reference in New Issue
Block a user