Moved state_compressor setup to util/state_compressor.py
This commit is contained in:
@@ -24,7 +24,6 @@ import traceback
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable
|
||||
|
||||
import auto_compressor
|
||||
from cryptography.utils import CryptographyDeprecationWarning
|
||||
from typing_extensions import NoReturn
|
||||
|
||||
@@ -43,15 +42,13 @@ from synapse.crypto import context_factory
|
||||
from synapse.events.presence_router import load_legacy_presence_router
|
||||
from synapse.events.spamcheck import load_legacy_spam_checkers
|
||||
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
|
||||
from synapse.logging.context import PreserveLoggingContext, defer_to_thread
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
||||
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
|
||||
from synapse.util.daemonize import daemonize_process
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.state_compressor import setup_state_compressor
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -424,49 +421,6 @@ async def start(hs: "HomeServer"):
|
||||
atexit.register(gc.freeze)
|
||||
|
||||
|
||||
def setup_state_compressor(hs):
|
||||
"""Schedules the state compressor to run regularly"""
|
||||
compressor_config = hs.config.statecompressor
|
||||
# Check that compressor is enabled
|
||||
if not compressor_config.enabled:
|
||||
return
|
||||
|
||||
# Check that the database being used is postgres
|
||||
db_config = hs.config.database.get_single_database().config
|
||||
if db_config["name"] != "psycopg2":
|
||||
return
|
||||
|
||||
# construct the database URL from the database config
|
||||
db_args = db_config["args"]
|
||||
db_url = "postgresql://{username}:{password}@{host}:{port}/{database}".format(
|
||||
username=db_args["user"],
|
||||
password=db_args["password"],
|
||||
host=db_args["host"],
|
||||
port=db_args["port"],
|
||||
database=db_args["database"],
|
||||
)
|
||||
|
||||
# The method to be called periodically
|
||||
def run_state_compressor():
|
||||
run_as_background_process(
|
||||
desc="State Compressor",
|
||||
func=defer_to_thread,
|
||||
reactor=hs.get_reactor(),
|
||||
f=auto_compressor.compress_largest_rooms,
|
||||
db_url=db_url,
|
||||
chunk_size=compressor_config.compressor_chunk_size,
|
||||
default_levels=compressor_config.compressor_default_levels,
|
||||
number_of_rooms=compressor_config.compressor_number_of_rooms,
|
||||
)
|
||||
|
||||
# Call the compressor every `time_between_runs` milliseconds
|
||||
clock = hs.get_clock()
|
||||
clock.looping_call(
|
||||
run_state_compressor,
|
||||
compressor_config.time_between_compressor_runs,
|
||||
)
|
||||
|
||||
|
||||
def setup_sentry(hs):
|
||||
"""Enable sentry integration, if enabled in configuration
|
||||
|
||||
|
||||
@@ -42,13 +42,13 @@ class StateCompressorConfig(Config):
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
# The state compressor is an experimental tool which attempts to
|
||||
# The state compressor is an experimental tool which attempts to
|
||||
# reduce the number of rows in the state_groups_state table
|
||||
# of postgres databases.
|
||||
#
|
||||
# For more information please see
|
||||
# https://matrix-org.github.io/synapse/latest/state_compressor.html
|
||||
#
|
||||
#
|
||||
state_compressor:
|
||||
# enabled: true
|
||||
# # The (rough) number of state groups to load at one time
|
||||
@@ -58,5 +58,5 @@ class StateCompressorConfig(Config):
|
||||
# # The default level sizes for the compressor to use
|
||||
# default_levels: 100,50,25
|
||||
# # How frequently to run the state compressor
|
||||
# time_between_runs: 1d
|
||||
# time_between_runs: 1d
|
||||
"""
|
||||
|
||||
68
synapse/util/state_compressor.py
Normal file
68
synapse/util/state_compressor.py
Normal file
@@ -0,0 +1,68 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
|
||||
try:
|
||||
import auto_compressor as state_compressor
|
||||
except ImportError:
|
||||
state_compressor = None
|
||||
|
||||
|
||||
def setup_state_compressor(hs):
|
||||
"""Schedules the state compressor to run regularly"""
|
||||
|
||||
# Return if cannot import auto_compressor
|
||||
if not state_compressor:
|
||||
return
|
||||
|
||||
# Return if compressor isn't enabled
|
||||
compressor_config = hs.config.statecompressor
|
||||
if not compressor_config.compressor_enabled:
|
||||
return
|
||||
|
||||
# Check that the database being used is postgres
|
||||
db_config = hs.config.database.get_single_database().config
|
||||
if db_config["name"] != "psycopg2":
|
||||
return
|
||||
|
||||
# construct the database URL from the database config
|
||||
db_args = db_config["args"]
|
||||
db_url = "postgresql://{username}:{password}@{host}:{port}/{database}".format(
|
||||
username=db_args["user"],
|
||||
password=db_args["password"],
|
||||
host=db_args["host"],
|
||||
port=db_args["port"],
|
||||
database=db_args["database"],
|
||||
)
|
||||
|
||||
# The method to be called periodically
|
||||
def run_state_compressor():
|
||||
run_as_background_process(
|
||||
desc="State Compressor",
|
||||
func=defer_to_thread,
|
||||
reactor=hs.get_reactor(),
|
||||
f=state_compressor.compress_largest_rooms,
|
||||
db_url=db_url,
|
||||
chunk_size=compressor_config.compressor_chunk_size,
|
||||
default_levels=compressor_config.compressor_default_levels,
|
||||
number_of_rooms=compressor_config.compressor_number_of_rooms,
|
||||
)
|
||||
|
||||
# Call the compressor every `time_between_runs` milliseconds
|
||||
clock = hs.get_clock()
|
||||
clock.looping_call(
|
||||
run_state_compressor,
|
||||
compressor_config.time_between_compressor_runs,
|
||||
)
|
||||
Reference in New Issue
Block a user