From d684ec8a2bbb682fde86d3b17c4897d40ac71b58 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 24 Oct 2019 22:09:43 +0300 Subject: [PATCH] benchmarks --- synapse/benchmarks/__init__.py | 63 +++++++++++ synapse/benchmarks/__main__.py | 33 ++++++ synapse/benchmarks/suites/__init__.py | 1 + synapse/benchmarks/suites/logging.py | 147 ++++++++++++++++++++++++++ synapse/logging/_terse_json.py | 8 +- synapse/python_dependencies.py | 1 - 6 files changed, 249 insertions(+), 4 deletions(-) create mode 100644 synapse/benchmarks/__init__.py create mode 100644 synapse/benchmarks/__main__.py create mode 100644 synapse/benchmarks/suites/__init__.py create mode 100644 synapse/benchmarks/suites/logging.py diff --git a/synapse/benchmarks/__init__.py b/synapse/benchmarks/__init__.py new file mode 100644 index 0000000000..d0450c4d9d --- /dev/null +++ b/synapse/benchmarks/__init__.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet.defer import Deferred + +from synapse.config.homeserver import HomeServerConfig +from synapse.util import Clock + +from tests.utils import default_config, setup_test_homeserver, setupdb + +DB_SETUP = False + + +def setup_database(): + global DB_SETUP + if not DB_SETUP: + setupdb() + DB_SETUP = True + + +async def make_homeserver(reactor, config=None): + def wait(time): + d = Deferred() + reactor.callLater(time, d.callback, True) + return d + + cleanup_tasks = [] + + clock = Clock(reactor) + + if not config: + config = default_config("test") + + config_obj = HomeServerConfig() + config_obj.parse_config_dict(config, "", "") + + hs = await setup_test_homeserver( + cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock + ) + stor = hs.get_datastore() + + # Run the database background updates. + if hasattr(stor, "do_next_background_update"): + while not await stor.has_completed_background_updates(): + await stor.do_next_background_update(1) + + def cleanup(): + for i in cleanup_tasks: + i() + + return hs, wait, cleanup diff --git a/synapse/benchmarks/__main__.py b/synapse/benchmarks/__main__.py new file mode 100644 index 0000000000..d67a1cf43a --- /dev/null +++ b/synapse/benchmarks/__main__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyperf + +from twisted.python import reflect + +from synapse.benchmarks import setupdb +from synapse.benchmarks.suites import SUITES + +if __name__ == "__main__": + + runner = pyperf.Runner(processes=5, values=1, warmups=0) + runner.parse_args() + runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] + + for suite, loops in SUITES: + + func = reflect.namedAny("synapse.benchmarks.suites.%s.main" % (suite.lower(),)) + runner.args.loops = loops + runner.bench_time_func(suite + "_" + str(loops), func) diff --git a/synapse/benchmarks/suites/__init__.py b/synapse/benchmarks/suites/__init__.py new file mode 100644 index 0000000000..641e3299c2 --- /dev/null +++ b/synapse/benchmarks/suites/__init__.py @@ -0,0 +1 @@ +SUITES = [("LOGGING", 1000), ("LOGGING", 10000)] diff --git a/synapse/benchmarks/suites/logging.py b/synapse/benchmarks/suites/logging.py new file mode 100644 index 0000000000..37e7229b6c --- /dev/null +++ b/synapse/benchmarks/suites/logging.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import warnings +from contextlib import redirect_stderr +from io import StringIO + +from mock import Mock + +from pyperf import perf_counter + +from twisted.internet.defer import ensureDeferred +from twisted.internet.protocol import ServerFactory +from twisted.logger import ( + LogBeginner, + Logger, + LogPublisher, + globalLogBeginner, + textFileLogObserver, +) +from twisted.protocols.basic import LineOnlyReceiver +from twisted.python.failure import Failure + +from synapse.benchmarks import make_homeserver, setup_database +from synapse.logging._structured import setup_structured_logging + + +class LineCounter(LineOnlyReceiver): + + delimiter = b"\n" + + def __init__(self, *args, **kwargs): + self.count = 0 + super().__init__(*args, **kwargs) + + def lineReceived(self, line): + self.count += 1 + + +async def _main(reactor, loops): + + servers = [] + + def protocol(): + p = LineCounter() + servers.append(p) + return p + + logger_factory = ServerFactory.forProtocol(protocol) + port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") + + hs, wait, cleanup = await make_homeserver(reactor) + + errors = StringIO() + publisher = LogPublisher() + mock_sys = Mock() + beginner = LogBeginner( + publisher, errors, mock_sys, warnings, initialBufferSize=loops + ) + + log_config = { + "loggers": {"synapse": {"level": "DEBUG"}}, + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": port.getHost().port, + "maximum_buffer": 100, + } + }, + } + + logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) + + start = perf_counter() + + logging_system = setup_structured_logging( + hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False + ) + + # Wait for it to connect... + await logging_system._observers[0]._service.whenConnected() + + # Send a bunch of useful messages + for i in range(0, loops): + logger.info("test message %s" % (i,)) + + if ( + len(logging_system._observers[0]._buffer) + == logging_system._observers[0].maximum_buffer + ): + while ( + len(logging_system._observers[0]._buffer) + > logging_system._observers[0].maximum_buffer / 2 + ): + await wait(0.01) + + while servers[0].count != loops: + await wait(0.01) + + end = perf_counter() - start + + logging_system.stop() + port.stopListening() + cleanup() + + return end + + +def main(loops): + + setup_database() + + if globalLogBeginner._temporaryObserver: + globalLogBeginner.beginLoggingTo([lambda event: None]) + + file_out = StringIO() + with redirect_stderr(file_out): + + from twisted.internet import epollreactor + + reactor = epollreactor.EPollReactor() + d = ensureDeferred(_main(reactor, loops)) + + def on_done(_): + if isinstance(_, Failure): + _.printTraceback() + reactor.stop() + return _ + + d.addBoth(on_done) + reactor.run() + + return d.result diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index f3b27d5bfd..b8e8ea5003 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -17,6 +17,7 @@ Log formatters that output terse JSON. """ +import json import sys from collections import deque from ipaddress import IPv4Address, IPv6Address, ip_address @@ -24,7 +25,6 @@ from math import floor from typing import IO import attr -from rapidjson import dumps from zope.interface import implementer from twisted.application.internet import ClientService @@ -37,6 +37,8 @@ from twisted.internet.interfaces import IPushProducer from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger +_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) + def flatten_event(event: dict, metadata: dict, include_time: bool = False): """ @@ -141,7 +143,7 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False) + "\n" + return _encoder.encode(flattened) + "\n" return FileLogObserver(outFile, formatEvent) @@ -167,7 +169,7 @@ class LogProducer(object): while self.paused is False and (self._buffer and self._transport.connected): try: event = self._buffer.popleft() - self._transport.write(dumps(event, ensure_ascii=False).encode("utf8")) + self._transport.write(_encoder.encode(event).encode("utf8")) self._transport.write(b"\n") except Exception: import traceback diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 3c0040482c..aa7da1c543 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -74,7 +74,6 @@ REQUIREMENTS = [ "Jinja2>=2.9", "bleach>=1.4.3", "typing-extensions>=3.7.4", - "python-rapidjson>=0.4" ] CONDITIONAL_REQUIREMENTS = {