Merge remote-tracking branch 'origin/develop' into shhs
This commit is contained in:
@@ -173,11 +173,12 @@ steps:
|
||||
queue: "medium"
|
||||
command:
|
||||
- "bash .buildkite/merge_base_branch.sh"
|
||||
- "bash .buildkite/synapse_sytest.sh"
|
||||
- "bash /synapse_sytest.sh"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "matrixdotorg/sytest-synapse:py35"
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -192,11 +193,12 @@ steps:
|
||||
POSTGRES: "1"
|
||||
command:
|
||||
- "bash .buildkite/merge_base_branch.sh"
|
||||
- "bash .buildkite/synapse_sytest.sh"
|
||||
- "bash /synapse_sytest.sh"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "matrixdotorg/sytest-synapse:py35"
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -212,11 +214,12 @@ steps:
|
||||
WORKERS: "1"
|
||||
command:
|
||||
- "bash .buildkite/merge_base_branch.sh"
|
||||
- "bash .buildkite/synapse_sytest.sh"
|
||||
- "bash /synapse_sytest.sh"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "matrixdotorg/sytest-synapse:py35"
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
soft_fail: true
|
||||
retry:
|
||||
automatic:
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Fetch sytest, and then run the tests for synapse. The entrypoint for the
|
||||
# sytest-synapse docker images.
|
||||
|
||||
set -ex
|
||||
|
||||
if [ -n "$BUILDKITE" ]
|
||||
then
|
||||
SYNAPSE_DIR=`pwd`
|
||||
else
|
||||
SYNAPSE_DIR="/src"
|
||||
fi
|
||||
|
||||
# Attempt to find a sytest to use.
|
||||
# If /sytest exists, it means that a SyTest checkout has been mounted into the Docker image.
|
||||
if [ -d "/sytest" ]; then
|
||||
# If the user has mounted in a SyTest checkout, use that.
|
||||
echo "Using local sytests..."
|
||||
|
||||
# create ourselves a working directory and dos2unix some scripts therein
|
||||
mkdir -p /work/jenkins
|
||||
for i in install-deps.pl run-tests.pl tap-to-junit-xml.pl jenkins/prep_sytest_for_postgres.sh; do
|
||||
dos2unix -n "/sytest/$i" "/work/$i"
|
||||
done
|
||||
ln -sf /sytest/tests /work
|
||||
ln -sf /sytest/keys /work
|
||||
SYTEST_LIB="/sytest/lib"
|
||||
else
|
||||
if [ -n "BUILDKITE_BRANCH" ]
|
||||
then
|
||||
branch_name=$BUILDKITE_BRANCH
|
||||
else
|
||||
# Otherwise, try and find out what the branch that the Synapse checkout is using. Fall back to develop if it's not a branch.
|
||||
branch_name="$(git --git-dir=/src/.git symbolic-ref HEAD 2>/dev/null)" || branch_name="develop"
|
||||
fi
|
||||
|
||||
# Try and fetch the branch
|
||||
echo "Trying to get same-named sytest branch..."
|
||||
wget -q https://github.com/matrix-org/sytest/archive/$branch_name.tar.gz -O sytest.tar.gz || {
|
||||
# Probably a 404, fall back to develop
|
||||
echo "Using develop instead..."
|
||||
wget -q https://github.com/matrix-org/sytest/archive/develop.tar.gz -O sytest.tar.gz
|
||||
}
|
||||
|
||||
mkdir -p /work
|
||||
tar -C /work --strip-components=1 -xf sytest.tar.gz
|
||||
SYTEST_LIB="/work/lib"
|
||||
fi
|
||||
|
||||
cd /work
|
||||
|
||||
# PostgreSQL setup
|
||||
if [ -n "$POSTGRES" ]
|
||||
then
|
||||
export PGUSER=postgres
|
||||
export POSTGRES_DB_1=pg1
|
||||
export POSTGRES_DB_2=pg2
|
||||
|
||||
# Start the database
|
||||
su -c 'eatmydata /usr/lib/postgresql/9.6/bin/pg_ctl -w -D /var/lib/postgresql/data start' postgres
|
||||
|
||||
# Use the Jenkins script to write out the configuration for a PostgreSQL using Synapse
|
||||
jenkins/prep_sytest_for_postgres.sh
|
||||
|
||||
# Make the test databases for the two Synapse servers that will be spun up
|
||||
su -c 'psql -c "CREATE DATABASE pg1;"' postgres
|
||||
su -c 'psql -c "CREATE DATABASE pg2;"' postgres
|
||||
|
||||
fi
|
||||
|
||||
if [ -n "$OFFLINE" ]; then
|
||||
# if we're in offline mode, just put synapse into the virtualenv, and
|
||||
# hope that the deps are up-to-date.
|
||||
#
|
||||
# (`pip install -e` likes to reinstall setuptools even if it's already installed,
|
||||
# so we just run setup.py explicitly.)
|
||||
#
|
||||
(cd $SYNAPSE_DIR && /venv/bin/python setup.py -q develop)
|
||||
else
|
||||
# We've already created the virtualenv, but lets double check we have all
|
||||
# deps.
|
||||
/venv/bin/pip install -q --upgrade --no-cache-dir -e $SYNAPSE_DIR
|
||||
/venv/bin/pip install -q --upgrade --no-cache-dir \
|
||||
lxml psycopg2 coverage codecov tap.py
|
||||
|
||||
# Make sure all Perl deps are installed -- this is done in the docker build
|
||||
# so will only install packages added since the last Docker build
|
||||
./install-deps.pl
|
||||
fi
|
||||
|
||||
|
||||
# Run the tests
|
||||
>&2 echo "+++ Running tests"
|
||||
|
||||
RUN_TESTS=(
|
||||
perl -I "$SYTEST_LIB" ./run-tests.pl --python=/venv/bin/python --synapse-directory=$SYNAPSE_DIR --coverage -O tap --all
|
||||
)
|
||||
|
||||
TEST_STATUS=0
|
||||
|
||||
if [ -n "$WORKERS" ]; then
|
||||
RUN_TESTS+=(-I Synapse::ViaHaproxy --dendron-binary=/pydron.py)
|
||||
else
|
||||
RUN_TESTS+=(-I Synapse)
|
||||
fi
|
||||
|
||||
"${RUN_TESTS[@]}" "$@" > results.tap || TEST_STATUS=$?
|
||||
|
||||
if [ $TEST_STATUS -ne 0 ]; then
|
||||
>&2 echo -e "run-tests \e[31mFAILED\e[0m: exit code $TEST_STATUS"
|
||||
else
|
||||
>&2 echo -e "run-tests \e[32mPASSED\e[0m"
|
||||
fi
|
||||
|
||||
>&2 echo "--- Copying assets"
|
||||
|
||||
# Copy out the logs
|
||||
mkdir -p /logs
|
||||
cp results.tap /logs/results.tap
|
||||
rsync --ignore-missing-args --min-size=1B -av server-0 server-1 /logs --include "*/" --include="*.log.*" --include="*.log" --exclude="*"
|
||||
|
||||
# Upload coverage to codecov and upload files, if running on Buildkite
|
||||
if [ -n "$BUILDKITE" ]
|
||||
then
|
||||
/venv/bin/coverage combine || true
|
||||
/venv/bin/coverage xml || true
|
||||
/venv/bin/codecov -X gcov -f coverage.xml
|
||||
|
||||
wget -O buildkite.tar.gz https://github.com/buildkite/agent/releases/download/v3.13.0/buildkite-agent-linux-amd64-3.13.0.tar.gz
|
||||
tar xvf buildkite.tar.gz
|
||||
chmod +x ./buildkite-agent
|
||||
|
||||
# Upload the files
|
||||
./buildkite-agent artifact upload "/logs/**/*.log*"
|
||||
./buildkite-agent artifact upload "/logs/results.tap"
|
||||
|
||||
if [ $TEST_STATUS -ne 0 ]; then
|
||||
# Annotate, if failure
|
||||
/venv/bin/python $SYNAPSE_DIR/.buildkite/format_tap.py /logs/results.tap "$BUILDKITE_LABEL" | ./buildkite-agent annotate --style="error" --context="$BUILDKITE_LABEL"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
||||
exit $TEST_STATUS
|
||||
@@ -7,6 +7,7 @@ include demo/README
|
||||
include demo/demo.tls.dh
|
||||
include demo/*.py
|
||||
include demo/*.sh
|
||||
include sytest-blacklist
|
||||
|
||||
recursive-include synapse/storage/schema *.sql
|
||||
recursive-include synapse/storage/schema *.sql.postgres
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Move logging code out of `synapse.util` and into `synapse.logging`.
|
||||
@@ -0,0 +1 @@
|
||||
Fix 'utime went backwards' errors on daemonization.
|
||||
@@ -0,0 +1 @@
|
||||
Add a blacklist file to the repo to blacklist certain sytests from failing CI.
|
||||
@@ -0,0 +1 @@
|
||||
Make runtime errors surrounding password reset emails much clearer.
|
||||
@@ -0,0 +1 @@
|
||||
Move logging code out of `synapse.util` and into `synapse.logging`.
|
||||
@@ -1,7 +1,7 @@
|
||||
# Example log_config file for synapse. To enable, point `log_config` to it in
|
||||
# Example log_config file for synapse. To enable, point `log_config` to it in
|
||||
# `homeserver.yaml`, and restart synapse.
|
||||
#
|
||||
# This configuration will produce similar results to the defaults within
|
||||
# This configuration will produce similar results to the defaults within
|
||||
# synapse, but can be edited to give more flexibility.
|
||||
|
||||
version: 1
|
||||
@@ -12,7 +12,7 @@ formatters:
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.util.logcontext.LoggingContextFilter
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
@@ -35,7 +35,7 @@ handlers:
|
||||
root:
|
||||
level: INFO
|
||||
handlers: [console] # to use file handler instead, switch to [file]
|
||||
|
||||
|
||||
loggers:
|
||||
synapse:
|
||||
level: INFO
|
||||
|
||||
@@ -36,7 +36,7 @@ from synapse.util import origin_from_ucid
|
||||
|
||||
from synapse.app.homeserver import SynapseHomeServer
|
||||
|
||||
# from synapse.util.logutils import log_function
|
||||
# from synapse.logging.utils import log_function
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.python import log
|
||||
|
||||
@@ -8,7 +8,7 @@ formatters:
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.util.logcontext.LoggingContextFilter
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
|
||||
Vendored
+7
@@ -1,3 +1,10 @@
|
||||
matrix-synapse-py3 (1.1.0-1) UNRELEASED; urgency=medium
|
||||
|
||||
[ Amber Brown ]
|
||||
* Update logging config defaults to match API changes in Synapse.
|
||||
|
||||
-- Erik Johnston <erikj@rae> Thu, 04 Jul 2019 13:59:02 +0100
|
||||
|
||||
matrix-synapse-py3 (1.1.0) stable; urgency=medium
|
||||
|
||||
[ Silke Hofstra ]
|
||||
|
||||
Vendored
+1
-1
@@ -7,7 +7,7 @@ formatters:
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.util.logcontext.LoggingContextFilter
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
|
||||
@@ -6,7 +6,7 @@ formatters:
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.util.logcontext.LoggingContextFilter
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
|
||||
+19
-19
@@ -1,4 +1,4 @@
|
||||
Log contexts
|
||||
Log Contexts
|
||||
============
|
||||
|
||||
.. contents::
|
||||
@@ -12,7 +12,7 @@ record.
|
||||
Logcontexts are also used for CPU and database accounting, so that we can track
|
||||
which requests were responsible for high CPU use or database activity.
|
||||
|
||||
The ``synapse.util.logcontext`` module provides a facilities for managing the
|
||||
The ``synapse.logging.context`` module provides a facilities for managing the
|
||||
current log context (as well as providing the ``LoggingContextFilter`` class).
|
||||
|
||||
Deferreds make the whole thing complicated, so this document describes how it
|
||||
@@ -27,19 +27,19 @@ found them:
|
||||
|
||||
.. code:: python
|
||||
|
||||
from synapse.util import logcontext # omitted from future snippets
|
||||
from synapse.logging import context # omitted from future snippets
|
||||
|
||||
def handle_request(request_id):
|
||||
request_context = logcontext.LoggingContext()
|
||||
request_context = context.LoggingContext()
|
||||
|
||||
calling_context = logcontext.LoggingContext.current_context()
|
||||
logcontext.LoggingContext.set_current_context(request_context)
|
||||
calling_context = context.LoggingContext.current_context()
|
||||
context.LoggingContext.set_current_context(request_context)
|
||||
try:
|
||||
request_context.request = request_id
|
||||
do_request_handling()
|
||||
logger.debug("finished")
|
||||
finally:
|
||||
logcontext.LoggingContext.set_current_context(calling_context)
|
||||
context.LoggingContext.set_current_context(calling_context)
|
||||
|
||||
def do_request_handling():
|
||||
logger.debug("phew") # this will be logged against request_id
|
||||
@@ -51,7 +51,7 @@ written much more succinctly as:
|
||||
.. code:: python
|
||||
|
||||
def handle_request(request_id):
|
||||
with logcontext.LoggingContext() as request_context:
|
||||
with context.LoggingContext() as request_context:
|
||||
request_context.request = request_id
|
||||
do_request_handling()
|
||||
logger.debug("finished")
|
||||
@@ -74,7 +74,7 @@ blocking operation, and returns a deferred:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_request(request_id):
|
||||
with logcontext.LoggingContext() as request_context:
|
||||
with context.LoggingContext() as request_context:
|
||||
request_context.request = request_id
|
||||
yield do_request_handling()
|
||||
logger.debug("finished")
|
||||
@@ -179,7 +179,7 @@ though, we need to make up a new Deferred, or we get a Deferred back from
|
||||
external code. We need to make it follow our rules.
|
||||
|
||||
The easy way to do it is with a combination of ``defer.inlineCallbacks``, and
|
||||
``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``,
|
||||
``context.PreserveLoggingContext``. Suppose we want to implement ``sleep``,
|
||||
which returns a deferred which will run its callbacks after a given number of
|
||||
seconds. That might look like:
|
||||
|
||||
@@ -204,13 +204,13 @@ That doesn't follow the rules, but we can fix it by wrapping it with
|
||||
This technique works equally for external functions which return deferreds,
|
||||
or deferreds we have made ourselves.
|
||||
|
||||
You can also use ``logcontext.make_deferred_yieldable``, which just does the
|
||||
You can also use ``context.make_deferred_yieldable``, which just does the
|
||||
boilerplate for you, so the above could be written:
|
||||
|
||||
.. code:: python
|
||||
|
||||
def sleep(seconds):
|
||||
return logcontext.make_deferred_yieldable(get_sleep_deferred(seconds))
|
||||
return context.make_deferred_yieldable(get_sleep_deferred(seconds))
|
||||
|
||||
|
||||
Fire-and-forget
|
||||
@@ -279,7 +279,7 @@ Obviously that option means that the operations done in
|
||||
that might be fixed by setting a different logcontext via a ``with
|
||||
LoggingContext(...)`` in ``background_operation``).
|
||||
|
||||
The second option is to use ``logcontext.run_in_background``, which wraps a
|
||||
The second option is to use ``context.run_in_background``, which wraps a
|
||||
function so that it doesn't reset the logcontext even when it returns an
|
||||
incomplete deferred, and adds a callback to the returned deferred to reset the
|
||||
logcontext. In other words, it turns a function that follows the Synapse rules
|
||||
@@ -293,7 +293,7 @@ It can be used like this:
|
||||
def do_request_handling():
|
||||
yield foreground_operation()
|
||||
|
||||
logcontext.run_in_background(background_operation)
|
||||
context.run_in_background(background_operation)
|
||||
|
||||
# this will now be logged against the request context
|
||||
logger.debug("Request handling complete")
|
||||
@@ -332,7 +332,7 @@ gathered:
|
||||
result = yield defer.gatherResults([d1, d2])
|
||||
|
||||
In this case particularly, though, option two, of using
|
||||
``logcontext.preserve_fn`` almost certainly makes more sense, so that
|
||||
``context.preserve_fn`` almost certainly makes more sense, so that
|
||||
``operation1`` and ``operation2`` are both logged against the original
|
||||
logcontext. This looks like:
|
||||
|
||||
@@ -340,8 +340,8 @@ logcontext. This looks like:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_request_handling():
|
||||
d1 = logcontext.preserve_fn(operation1)()
|
||||
d2 = logcontext.preserve_fn(operation2)()
|
||||
d1 = context.preserve_fn(operation1)()
|
||||
d2 = context.preserve_fn(operation2)()
|
||||
|
||||
with PreserveLoggingContext():
|
||||
result = yield defer.gatherResults([d1, d2])
|
||||
@@ -381,7 +381,7 @@ off the background process, and then leave the ``with`` block to wait for it:
|
||||
.. code:: python
|
||||
|
||||
def handle_request(request_id):
|
||||
with logcontext.LoggingContext() as request_context:
|
||||
with context.LoggingContext() as request_context:
|
||||
request_context.request = request_id
|
||||
d = do_request_handling()
|
||||
|
||||
@@ -414,7 +414,7 @@ runs its callbacks in the original logcontext, all is happy.
|
||||
|
||||
The business of a Deferred which runs its callbacks in the original logcontext
|
||||
isn't hard to achieve — we have it today, in the shape of
|
||||
``logcontext._PreservingContextDeferred``:
|
||||
``context._PreservingContextDeferred``:
|
||||
|
||||
.. code:: python
|
||||
|
||||
|
||||
+29
-26
@@ -27,7 +27,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||
import synapse
|
||||
from synapse.app import check_bind_error
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.util import PreserveLoggingContext
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.versionstring import get_version_string
|
||||
@@ -93,33 +93,36 @@ def start_reactor(
|
||||
install_dns_limiter(reactor)
|
||||
|
||||
def run():
|
||||
# make sure that we run the reactor with the sentinel log context,
|
||||
# otherwise other PreserveLoggingContext instances will get confused
|
||||
# and complain when they see the logcontext arbitrarily swapping
|
||||
# between the sentinel and `run` logcontexts.
|
||||
with PreserveLoggingContext():
|
||||
logger.info("Running")
|
||||
logger.info("Running")
|
||||
change_resource_limit(soft_file_limit)
|
||||
if gc_thresholds:
|
||||
gc.set_threshold(*gc_thresholds)
|
||||
reactor.run()
|
||||
|
||||
change_resource_limit(soft_file_limit)
|
||||
if gc_thresholds:
|
||||
gc.set_threshold(*gc_thresholds)
|
||||
reactor.run()
|
||||
# make sure that we run the reactor with the sentinel log context,
|
||||
# otherwise other PreserveLoggingContext instances will get confused
|
||||
# and complain when they see the logcontext arbitrarily swapping
|
||||
# between the sentinel and `run` logcontexts.
|
||||
#
|
||||
# We also need to drop the logcontext before forking if we're daemonizing,
|
||||
# otherwise the cputime metrics get confused about the per-thread resource usage
|
||||
# appearing to go backwards.
|
||||
with PreserveLoggingContext():
|
||||
if daemonize:
|
||||
if print_pidfile:
|
||||
print(pid_file)
|
||||
|
||||
if daemonize:
|
||||
if print_pidfile:
|
||||
print(pid_file)
|
||||
|
||||
daemon = Daemonize(
|
||||
app=appname,
|
||||
pid=pid_file,
|
||||
action=run,
|
||||
auto_close_fds=False,
|
||||
verbose=True,
|
||||
logger=logger,
|
||||
)
|
||||
daemon.start()
|
||||
else:
|
||||
run()
|
||||
daemon = Daemonize(
|
||||
app=appname,
|
||||
pid=pid_file,
|
||||
action=run,
|
||||
auto_close_fds=False,
|
||||
verbose=True,
|
||||
logger=logger,
|
||||
)
|
||||
daemon.start()
|
||||
else:
|
||||
run()
|
||||
|
||||
|
||||
def quit_with_error(error_string):
|
||||
|
||||
@@ -26,6 +26,7 @@ from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
@@ -36,7 +37,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
@@ -64,7 +65,6 @@ from synapse.rest.client.versions import VersionsRestServlet
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
@@ -59,7 +60,6 @@ from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
@@ -48,7 +49,6 @@ from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.federation import send_queue
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
@@ -44,7 +45,6 @@ from synapse.storage.engines import create_engine
|
||||
from synapse.types import ReadReceipt
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
@@ -41,7 +42,6 @@ from synapse.rest.client.v2_alpha._base import client_patterns
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.http.additional_resource import AdditionalResource
|
||||
from synapse.http.server import RootRedirect
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
@@ -72,7 +73,6 @@ from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
|
||||
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.module_loader import load_module
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
|
||||
@@ -27,6 +27,7 @@ from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
@@ -40,7 +41,6 @@ from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.media_repository import MediaRepositoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import __func__
|
||||
@@ -38,7 +39,6 @@ from synapse.server import HomeServer
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ from synapse.config.logger import setup_logging
|
||||
from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||
@@ -57,7 +58,6 @@ from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
@@ -28,6 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
@@ -46,7 +47,6 @@ from synapse.storage.engines import create_engine
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -53,8 +53,8 @@ import logging
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.appservice import ApplicationServiceState
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -112,13 +112,17 @@ class EmailConfig(Config):
|
||||
missing = []
|
||||
for k in required:
|
||||
if k not in email_config:
|
||||
missing.append(k)
|
||||
missing.append("email." + k)
|
||||
|
||||
if config.get("public_baseurl") is None:
|
||||
missing.append("public_base_url")
|
||||
|
||||
if len(missing) > 0:
|
||||
raise RuntimeError(
|
||||
"email.password_reset_behaviour is set to 'local' "
|
||||
"but required keys are missing: %s"
|
||||
% (", ".join(["email." + k for k in missing]),)
|
||||
"Password resets emails are configured to be sent from "
|
||||
"this homeserver due to a partial 'email' block. "
|
||||
"However, the following required keys are missing: %s"
|
||||
% (", ".join(missing),)
|
||||
)
|
||||
|
||||
# Templates for password reset emails
|
||||
@@ -156,13 +160,6 @@ class EmailConfig(Config):
|
||||
filepath, "email.password_reset_template_success_html"
|
||||
)
|
||||
|
||||
if config.get("public_baseurl") is None:
|
||||
raise RuntimeError(
|
||||
"email.password_reset_behaviour is set to 'local' but no "
|
||||
"public_baseurl is set. This is necessary to generate password "
|
||||
"reset links"
|
||||
)
|
||||
|
||||
if self.email_enable_notifs:
|
||||
required = [
|
||||
"smtp_host",
|
||||
|
||||
@@ -24,7 +24,7 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
|
||||
|
||||
import synapse
|
||||
from synapse.app import _base as appbase
|
||||
from synapse.util.logcontext import LoggingContextFilter
|
||||
from synapse.logging.context import LoggingContextFilter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
from ._base import Config
|
||||
@@ -40,7 +40,7 @@ formatters:
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.util.logcontext.LoggingContextFilter
|
||||
(): synapse.logging.context.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
|
||||
@@ -44,15 +44,16 @@ from synapse.api.errors import (
|
||||
RequestSendFailed,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
from synapse.util.logcontext import (
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
@@ -140,7 +141,7 @@ class Keyring(object):
|
||||
"""
|
||||
req = VerifyJsonRequest(server_name, json_object, validity_time, request_name)
|
||||
requests = (req,)
|
||||
return logcontext.make_deferred_yieldable(self._verify_objects(requests)[0])
|
||||
return make_deferred_yieldable(self._verify_objects(requests)[0])
|
||||
|
||||
def verify_json_objects_for_server(self, server_and_json):
|
||||
"""Bulk verifies signatures of json objects, bulk fetching keys as
|
||||
@@ -557,7 +558,7 @@ class BaseV2KeyFetcher(object):
|
||||
|
||||
signed_key_json_bytes = encode_canonical_json(signed_key_json)
|
||||
|
||||
yield logcontext.make_deferred_yieldable(
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
@@ -612,7 +613,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
|
||||
defer.returnValue({})
|
||||
|
||||
results = yield logcontext.make_deferred_yieldable(
|
||||
results = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[run_in_background(get_key, server) for server in self.key_servers],
|
||||
consumeErrors=True,
|
||||
|
||||
@@ -19,7 +19,7 @@ from frozendict import frozendict
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
|
||||
|
||||
class EventContext(object):
|
||||
|
||||
@@ -27,8 +27,14 @@ from synapse.crypto.event_signing import check_event_content_hash
|
||||
from synapse.events import event_type_from_format_version
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
preserve_fn,
|
||||
)
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util import unwrapFirstError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -73,7 +79,7 @@ class FederationBase(object):
|
||||
@defer.inlineCallbacks
|
||||
def handle_check_result(pdu, deferred):
|
||||
try:
|
||||
res = yield logcontext.make_deferred_yieldable(deferred)
|
||||
res = yield make_deferred_yieldable(deferred)
|
||||
except SynapseError:
|
||||
res = None
|
||||
|
||||
@@ -102,10 +108,10 @@ class FederationBase(object):
|
||||
|
||||
defer.returnValue(res)
|
||||
|
||||
handle = logcontext.preserve_fn(handle_check_result)
|
||||
handle = preserve_fn(handle_check_result)
|
||||
deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
|
||||
|
||||
valid_pdus = yield logcontext.make_deferred_yieldable(
|
||||
valid_pdus = yield make_deferred_yieldable(
|
||||
defer.gatherResults(deferreds2, consumeErrors=True)
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
@@ -115,7 +121,7 @@ class FederationBase(object):
|
||||
defer.returnValue([p for p in valid_pdus if p])
|
||||
|
||||
def _check_sigs_and_hash(self, room_version, pdu):
|
||||
return logcontext.make_deferred_yieldable(
|
||||
return make_deferred_yieldable(
|
||||
self._check_sigs_and_hashes(room_version, [pdu])[0]
|
||||
)
|
||||
|
||||
@@ -133,14 +139,14 @@ class FederationBase(object):
|
||||
* returns a redacted version of the event (if the signature
|
||||
matched but the hash did not)
|
||||
* throws a SynapseError if the signature check failed.
|
||||
The deferreds run their callbacks in the sentinel logcontext.
|
||||
The deferreds run their callbacks in the sentinel
|
||||
"""
|
||||
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
|
||||
|
||||
ctx = logcontext.LoggingContext.current_context()
|
||||
ctx = LoggingContext.current_context()
|
||||
|
||||
def callback(_, pdu):
|
||||
with logcontext.PreserveLoggingContext(ctx):
|
||||
with PreserveLoggingContext(ctx):
|
||||
if not check_event_content_hash(pdu):
|
||||
# let's try to distinguish between failures because the event was
|
||||
# redacted (which are somewhat expected) vs actual ball-tampering
|
||||
@@ -178,7 +184,7 @@ class FederationBase(object):
|
||||
|
||||
def errback(failure, pdu):
|
||||
failure.trap(SynapseError)
|
||||
with logcontext.PreserveLoggingContext(ctx):
|
||||
with PreserveLoggingContext(ctx):
|
||||
logger.warn(
|
||||
"Signature check failed for %s: %s",
|
||||
pdu.event_id,
|
||||
|
||||
@@ -39,10 +39,10 @@ from synapse.api.room_versions import (
|
||||
)
|
||||
from synapse.events import builder, room_version_to_event_format
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -207,7 +207,7 @@ class FederationClient(FederationBase):
|
||||
]
|
||||
|
||||
# FIXME: We should handle signature failures more gracefully.
|
||||
pdus[:] = yield logcontext.make_deferred_yieldable(
|
||||
pdus[:] = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
@@ -42,6 +42,8 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.http.endpoint import parse_server_name
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
ReplicationGetQueryRestServlet,
|
||||
@@ -50,8 +52,6 @@ from synapse.types import get_domain_from_id
|
||||
from synapse.util import glob_to_regex
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logcontext import nested_logging_context
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
# when processing incoming transactions, we try to handle multiple rooms in
|
||||
# parallel, up to this limit.
|
||||
|
||||
@@ -23,7 +23,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.logging.utils import log_function
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -26,6 +26,11 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
||||
from synapse.federation.sender.transaction_manager import TransactionManager
|
||||
from synapse.federation.units import Edu
|
||||
from synapse.handlers.presence import get_interested_remotes
|
||||
from synapse.logging.context import (
|
||||
make_deferred_yieldable,
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.metrics import (
|
||||
LaterGauge,
|
||||
event_processing_loop_counter,
|
||||
@@ -33,7 +38,6 @@ from synapse.metrics import (
|
||||
events_processed_counter,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.metrics import measure_func
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -210,10 +214,10 @@ class FederationSender(object):
|
||||
for event in events:
|
||||
events_by_room.setdefault(event.room_id, []).append(event)
|
||||
|
||||
yield logcontext.make_deferred_yieldable(
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
logcontext.run_in_background(handle_room_events, evs)
|
||||
run_in_background(handle_room_events, evs)
|
||||
for evs in itervalues(events_by_room)
|
||||
],
|
||||
consumeErrors=True,
|
||||
@@ -360,7 +364,7 @@ class FederationSender(object):
|
||||
for queue in queues:
|
||||
queue.flush_read_receipts_for_room(room_id)
|
||||
|
||||
@logcontext.preserve_fn # the caller should not yield on this
|
||||
@preserve_fn # the caller should not yield on this
|
||||
@defer.inlineCallbacks
|
||||
def send_presence(self, states):
|
||||
"""Send the new presence states to the appropriate destinations.
|
||||
|
||||
@@ -26,7 +26,7 @@ from synapse.api.urls import (
|
||||
FEDERATION_V1_PREFIX,
|
||||
FEDERATION_V2_PREFIX,
|
||||
)
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.logging.utils import log_function
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -36,8 +36,8 @@ from synapse.http.servlet import (
|
||||
parse_json_object_from_request,
|
||||
parse_string_from_args,
|
||||
)
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
||||
@@ -43,9 +43,9 @@ from signedjson.sign import sign_json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.logcontext import run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -22,10 +22,10 @@ from email.mime.text import MIMEText
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import UserID
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
try:
|
||||
from synapse.push.mailer import load_jinja2_templates
|
||||
|
||||
@@ -23,13 +23,13 @@ from twisted.internet import defer
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics import (
|
||||
event_processing_loop_counter,
|
||||
event_processing_loop_room_count,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util import log_failure
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -36,9 +36,9 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.types import UserID
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -987,7 +987,7 @@ class AuthHandler(BaseHandler):
|
||||
bcrypt.gensalt(self.bcrypt_rounds),
|
||||
).decode("ascii")
|
||||
|
||||
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
|
||||
return defer_to_thread(self.hs.get_reactor(), _do_hash)
|
||||
|
||||
def validate_hash(self, password, stored_hash):
|
||||
"""Validates that self.hash(password) == stored_hash.
|
||||
@@ -1013,7 +1013,7 @@ class AuthHandler(BaseHandler):
|
||||
if not isinstance(stored_hash, bytes):
|
||||
stored_hash = stored_hash.encode("ascii")
|
||||
|
||||
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
|
||||
return defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
|
||||
else:
|
||||
return defer.succeed(False)
|
||||
|
||||
|
||||
@@ -23,8 +23,8 @@ from canonicaljson import encode_canonical_json, json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -21,8 +21,8 @@ from twisted.internet import defer
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, SynapseError
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.types import UserID
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -45,6 +45,13 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
|
||||
from synapse.crypto.event_signing import compute_event_signature
|
||||
from synapse.event_auth import auth_types_for_event
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.logging.context import (
|
||||
make_deferred_yieldable,
|
||||
nested_logging_context,
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationCleanRoomRestServlet,
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
@@ -52,10 +59,9 @@ from synapse.replication.http.federation import (
|
||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.visibility import filter_events_for_server
|
||||
|
||||
@@ -338,7 +344,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
|
||||
with logcontext.nested_logging_context(p):
|
||||
with nested_logging_context(p):
|
||||
# note that if any of the missing prevs share missing state or
|
||||
# auth events, the requests to fetch those events are deduped
|
||||
# by the get_pdu_cache in federation_client.
|
||||
@@ -532,7 +538,7 @@ class FederationHandler(BaseHandler):
|
||||
event_id,
|
||||
ev.event_id,
|
||||
)
|
||||
with logcontext.nested_logging_context(ev.event_id):
|
||||
with nested_logging_context(ev.event_id):
|
||||
try:
|
||||
yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
|
||||
except FederationError as e:
|
||||
@@ -725,10 +731,10 @@ class FederationHandler(BaseHandler):
|
||||
missing_auth - failed_to_fetch,
|
||||
)
|
||||
|
||||
results = yield logcontext.make_deferred_yieldable(
|
||||
results = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
logcontext.run_in_background(
|
||||
run_in_background(
|
||||
self.federation_client.get_pdu,
|
||||
[dest],
|
||||
event_id,
|
||||
@@ -994,10 +1000,8 @@ class FederationHandler(BaseHandler):
|
||||
event_ids = list(extremities.keys())
|
||||
|
||||
logger.debug("calling resolve_state_groups in _maybe_backfill")
|
||||
resolve = logcontext.preserve_fn(
|
||||
self.state_handler.resolve_state_groups_for_events
|
||||
)
|
||||
states = yield logcontext.make_deferred_yieldable(
|
||||
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
|
||||
states = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
|
||||
)
|
||||
@@ -1171,7 +1175,7 @@ class FederationHandler(BaseHandler):
|
||||
# lots of requests for missing prev_events which we do actually
|
||||
# have. Hence we fire off the deferred, but don't wait for it.
|
||||
|
||||
logcontext.run_in_background(self._handle_queued_pdus, room_queue)
|
||||
run_in_background(self._handle_queued_pdus, room_queue)
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
@@ -1191,7 +1195,7 @@ class FederationHandler(BaseHandler):
|
||||
p.event_id,
|
||||
p.room_id,
|
||||
)
|
||||
with logcontext.nested_logging_context(p.event_id):
|
||||
with nested_logging_context(p.event_id):
|
||||
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
@@ -1610,7 +1614,7 @@ class FederationHandler(BaseHandler):
|
||||
success = True
|
||||
finally:
|
||||
if not success:
|
||||
logcontext.run_in_background(
|
||||
run_in_background(
|
||||
self.store.remove_push_actions_from_staging, event.event_id
|
||||
)
|
||||
|
||||
@@ -1629,7 +1633,7 @@ class FederationHandler(BaseHandler):
|
||||
@defer.inlineCallbacks
|
||||
def prep(ev_info):
|
||||
event = ev_info["event"]
|
||||
with logcontext.nested_logging_context(suffix=event.event_id):
|
||||
with nested_logging_context(suffix=event.event_id):
|
||||
res = yield self._prep_event(
|
||||
origin,
|
||||
event,
|
||||
@@ -1639,12 +1643,9 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
defer.returnValue(res)
|
||||
|
||||
contexts = yield logcontext.make_deferred_yieldable(
|
||||
contexts = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
logcontext.run_in_background(prep, ev_info)
|
||||
for ev_info in event_infos
|
||||
],
|
||||
[run_in_background(prep, ev_info) for ev_info in event_infos],
|
||||
consumeErrors=True,
|
||||
)
|
||||
)
|
||||
@@ -2106,10 +2107,10 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
room_version = yield self.store.get_room_version(event.room_id)
|
||||
|
||||
different_events = yield logcontext.make_deferred_yieldable(
|
||||
different_events = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
logcontext.run_in_background(
|
||||
run_in_background(
|
||||
self.store.get_event, d, allow_none=True, allow_rejected=False
|
||||
)
|
||||
for d in different_auth
|
||||
|
||||
@@ -21,12 +21,12 @@ from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import StreamToken, UserID
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -34,13 +34,13 @@ from synapse.api.errors import (
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.api.urls import ConsentURIBuilder
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import RoomAlias, UserID, create_requester
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
|
||||
@@ -20,10 +20,10 @@ from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
|
||||
@@ -34,14 +34,14 @@ from twisted.internet import defer
|
||||
import synapse.metrics
|
||||
from synapse.api.constants import EventTypes, Membership, PresenceState
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.wheel_timer import WheelTimer
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from prometheus_client import Counter
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.storage.state import StateFilter
|
||||
@@ -33,7 +34,6 @@ from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
|
||||
@@ -19,9 +19,9 @@ from collections import namedtuple
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import AuthError, SynapseError
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.wheel_timer import WheelTimer
|
||||
|
||||
|
||||
@@ -45,9 +45,9 @@ from synapse.http import (
|
||||
cancelled_to_request_timed_out_error,
|
||||
redact_uri,
|
||||
)
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -30,9 +30,9 @@ from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IAgent
|
||||
|
||||
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
# period to cache .well-known results for by default
|
||||
|
||||
@@ -25,7 +25,7 @@ from twisted.internet.error import ConnectError
|
||||
from twisted.names import client, dns
|
||||
from twisted.names.error import DNSNameError, DomainError
|
||||
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -48,8 +48,8 @@ from synapse.api.errors import (
|
||||
from synapse.http import QuieterFileBodyProducer
|
||||
from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
|
||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -19,8 +19,8 @@ import threading
|
||||
|
||||
from prometheus_client.core import Counter, Histogram
|
||||
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -39,8 +39,8 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.logging.context import preserve_fn
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ from twisted.web.server import Request, Site
|
||||
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.request_metrics import RequestMetrics, requests_counter
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -0,0 +1,693 @@
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Thread-local-alike tracking of log contexts within synapse
|
||||
|
||||
This module provides objects and utilities for tracking contexts through
|
||||
synapse code, so that log lines can include a request identifier, and so that
|
||||
CPU and database activity can be accounted for against the request that caused
|
||||
them.
|
||||
|
||||
See doc/log_contexts.rst for details on how this works.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import types
|
||||
|
||||
from twisted.internet import defer, threads
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import resource
|
||||
|
||||
# Python doesn't ship with a definition of RUSAGE_THREAD but it's defined
|
||||
# to be 1 on linux so we hard code it.
|
||||
RUSAGE_THREAD = 1
|
||||
|
||||
# If the system doesn't support RUSAGE_THREAD then this should throw an
|
||||
# exception.
|
||||
resource.getrusage(RUSAGE_THREAD)
|
||||
|
||||
def get_thread_resource_usage():
|
||||
return resource.getrusage(RUSAGE_THREAD)
|
||||
|
||||
|
||||
except Exception:
|
||||
# If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
|
||||
# won't track resource usage by returning None.
|
||||
def get_thread_resource_usage():
|
||||
return None
|
||||
|
||||
|
||||
# get an id for the current thread.
|
||||
#
|
||||
# threading.get_ident doesn't actually return an OS-level tid, and annoyingly,
|
||||
# on Linux it actually returns the same value either side of a fork() call. However
|
||||
# we only fork in one place, so it's not worth the hoop-jumping to get a real tid.
|
||||
#
|
||||
get_thread_id = threading.get_ident
|
||||
|
||||
|
||||
class ContextResourceUsage(object):
|
||||
"""Object for tracking the resources used by a log context
|
||||
|
||||
Attributes:
|
||||
ru_utime (float): user CPU time (in seconds)
|
||||
ru_stime (float): system CPU time (in seconds)
|
||||
db_txn_count (int): number of database transactions done
|
||||
db_sched_duration_sec (float): amount of time spent waiting for a
|
||||
database connection
|
||||
db_txn_duration_sec (float): amount of time spent doing database
|
||||
transactions (excluding scheduling time)
|
||||
evt_db_fetch_count (int): number of events requested from the database
|
||||
"""
|
||||
|
||||
__slots__ = [
|
||||
"ru_stime",
|
||||
"ru_utime",
|
||||
"db_txn_count",
|
||||
"db_txn_duration_sec",
|
||||
"db_sched_duration_sec",
|
||||
"evt_db_fetch_count",
|
||||
]
|
||||
|
||||
def __init__(self, copy_from=None):
|
||||
"""Create a new ContextResourceUsage
|
||||
|
||||
Args:
|
||||
copy_from (ContextResourceUsage|None): if not None, an object to
|
||||
copy stats from
|
||||
"""
|
||||
if copy_from is None:
|
||||
self.reset()
|
||||
else:
|
||||
self.ru_utime = copy_from.ru_utime
|
||||
self.ru_stime = copy_from.ru_stime
|
||||
self.db_txn_count = copy_from.db_txn_count
|
||||
|
||||
self.db_txn_duration_sec = copy_from.db_txn_duration_sec
|
||||
self.db_sched_duration_sec = copy_from.db_sched_duration_sec
|
||||
self.evt_db_fetch_count = copy_from.evt_db_fetch_count
|
||||
|
||||
def copy(self):
|
||||
return ContextResourceUsage(copy_from=self)
|
||||
|
||||
def reset(self):
|
||||
self.ru_stime = 0.0
|
||||
self.ru_utime = 0.0
|
||||
self.db_txn_count = 0
|
||||
|
||||
self.db_txn_duration_sec = 0
|
||||
self.db_sched_duration_sec = 0
|
||||
self.evt_db_fetch_count = 0
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
"<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
|
||||
"db_txn_count='%r', db_txn_duration_sec='%r', "
|
||||
"db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
|
||||
) % (
|
||||
self.ru_stime,
|
||||
self.ru_utime,
|
||||
self.db_txn_count,
|
||||
self.db_txn_duration_sec,
|
||||
self.db_sched_duration_sec,
|
||||
self.evt_db_fetch_count,
|
||||
)
|
||||
|
||||
def __iadd__(self, other):
|
||||
"""Add another ContextResourceUsage's stats to this one's.
|
||||
|
||||
Args:
|
||||
other (ContextResourceUsage): the other resource usage object
|
||||
"""
|
||||
self.ru_utime += other.ru_utime
|
||||
self.ru_stime += other.ru_stime
|
||||
self.db_txn_count += other.db_txn_count
|
||||
self.db_txn_duration_sec += other.db_txn_duration_sec
|
||||
self.db_sched_duration_sec += other.db_sched_duration_sec
|
||||
self.evt_db_fetch_count += other.evt_db_fetch_count
|
||||
return self
|
||||
|
||||
def __isub__(self, other):
|
||||
self.ru_utime -= other.ru_utime
|
||||
self.ru_stime -= other.ru_stime
|
||||
self.db_txn_count -= other.db_txn_count
|
||||
self.db_txn_duration_sec -= other.db_txn_duration_sec
|
||||
self.db_sched_duration_sec -= other.db_sched_duration_sec
|
||||
self.evt_db_fetch_count -= other.evt_db_fetch_count
|
||||
return self
|
||||
|
||||
def __add__(self, other):
|
||||
res = ContextResourceUsage(copy_from=self)
|
||||
res += other
|
||||
return res
|
||||
|
||||
def __sub__(self, other):
|
||||
res = ContextResourceUsage(copy_from=self)
|
||||
res -= other
|
||||
return res
|
||||
|
||||
|
||||
class LoggingContext(object):
|
||||
"""Additional context for log formatting. Contexts are scoped within a
|
||||
"with" block.
|
||||
|
||||
If a parent is given when creating a new context, then:
|
||||
- logging fields are copied from the parent to the new context on entry
|
||||
- when the new context exits, the cpu usage stats are copied from the
|
||||
child to the parent
|
||||
|
||||
Args:
|
||||
name (str): Name for the context for debugging.
|
||||
parent_context (LoggingContext|None): The parent of the new context
|
||||
"""
|
||||
|
||||
__slots__ = [
|
||||
"previous_context",
|
||||
"name",
|
||||
"parent_context",
|
||||
"_resource_usage",
|
||||
"usage_start",
|
||||
"main_thread",
|
||||
"alive",
|
||||
"request",
|
||||
"tag",
|
||||
]
|
||||
|
||||
thread_local = threading.local()
|
||||
|
||||
class Sentinel(object):
|
||||
"""Sentinel to represent the root context"""
|
||||
|
||||
__slots__ = []
|
||||
|
||||
def __str__(self):
|
||||
return "sentinel"
|
||||
|
||||
def copy_to(self, record):
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def add_database_transaction(self, duration_sec):
|
||||
pass
|
||||
|
||||
def add_database_scheduled(self, sched_sec):
|
||||
pass
|
||||
|
||||
def record_event_fetch(self, event_count):
|
||||
pass
|
||||
|
||||
def __nonzero__(self):
|
||||
return False
|
||||
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
sentinel = Sentinel()
|
||||
|
||||
def __init__(self, name=None, parent_context=None, request=None):
|
||||
self.previous_context = LoggingContext.current_context()
|
||||
self.name = name
|
||||
|
||||
# track the resources used by this context so far
|
||||
self._resource_usage = ContextResourceUsage()
|
||||
|
||||
# If alive has the thread resource usage when the logcontext last
|
||||
# became active.
|
||||
self.usage_start = None
|
||||
|
||||
self.main_thread = get_thread_id()
|
||||
self.request = None
|
||||
self.tag = ""
|
||||
self.alive = True
|
||||
|
||||
self.parent_context = parent_context
|
||||
|
||||
if self.parent_context is not None:
|
||||
self.parent_context.copy_to(self)
|
||||
|
||||
if request is not None:
|
||||
# the request param overrides the request from the parent context
|
||||
self.request = request
|
||||
|
||||
def __str__(self):
|
||||
if self.request:
|
||||
return str(self.request)
|
||||
return "%s@%x" % (self.name, id(self))
|
||||
|
||||
@classmethod
|
||||
def current_context(cls):
|
||||
"""Get the current logging context from thread local storage
|
||||
|
||||
Returns:
|
||||
LoggingContext: the current logging context
|
||||
"""
|
||||
return getattr(cls.thread_local, "current_context", cls.sentinel)
|
||||
|
||||
@classmethod
|
||||
def set_current_context(cls, context):
|
||||
"""Set the current logging context in thread local storage
|
||||
Args:
|
||||
context(LoggingContext): The context to activate.
|
||||
Returns:
|
||||
The context that was previously active
|
||||
"""
|
||||
current = cls.current_context()
|
||||
|
||||
if current is not context:
|
||||
current.stop()
|
||||
cls.thread_local.current_context = context
|
||||
context.start()
|
||||
return current
|
||||
|
||||
def __enter__(self):
|
||||
"""Enters this logging context into thread local storage"""
|
||||
old_context = self.set_current_context(self)
|
||||
if self.previous_context != old_context:
|
||||
logger.warn(
|
||||
"Expected previous context %r, found %r",
|
||||
self.previous_context,
|
||||
old_context,
|
||||
)
|
||||
self.alive = True
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Restore the logging context in thread local storage to the state it
|
||||
was before this context was entered.
|
||||
Returns:
|
||||
None to avoid suppressing any exceptions that were thrown.
|
||||
"""
|
||||
current = self.set_current_context(self.previous_context)
|
||||
if current is not self:
|
||||
if current is self.sentinel:
|
||||
logger.warning("Expected logging context %s was lost", self)
|
||||
else:
|
||||
logger.warning(
|
||||
"Expected logging context %s but found %s", self, current
|
||||
)
|
||||
self.previous_context = None
|
||||
self.alive = False
|
||||
|
||||
# if we have a parent, pass our CPU usage stats on
|
||||
if self.parent_context is not None and hasattr(
|
||||
self.parent_context, "_resource_usage"
|
||||
):
|
||||
self.parent_context._resource_usage += self._resource_usage
|
||||
|
||||
# reset them in case we get entered again
|
||||
self._resource_usage.reset()
|
||||
|
||||
def copy_to(self, record):
|
||||
"""Copy logging fields from this context to a log record or
|
||||
another LoggingContext
|
||||
"""
|
||||
|
||||
# 'request' is the only field we currently use in the logger, so that's
|
||||
# all we need to copy
|
||||
record.request = self.request
|
||||
|
||||
def start(self):
|
||||
if get_thread_id() != self.main_thread:
|
||||
logger.warning("Started logcontext %s on different thread", self)
|
||||
return
|
||||
|
||||
# If we haven't already started record the thread resource usage so
|
||||
# far
|
||||
if not self.usage_start:
|
||||
self.usage_start = get_thread_resource_usage()
|
||||
|
||||
def stop(self):
|
||||
if get_thread_id() != self.main_thread:
|
||||
logger.warning("Stopped logcontext %s on different thread", self)
|
||||
return
|
||||
|
||||
# When we stop, let's record the cpu used since we started
|
||||
if not self.usage_start:
|
||||
logger.warning("Called stop on logcontext %s without calling start", self)
|
||||
return
|
||||
|
||||
utime_delta, stime_delta = self._get_cputime()
|
||||
self._resource_usage.ru_utime += utime_delta
|
||||
self._resource_usage.ru_stime += stime_delta
|
||||
|
||||
self.usage_start = None
|
||||
|
||||
def get_resource_usage(self):
|
||||
"""Get resources used by this logcontext so far.
|
||||
|
||||
Returns:
|
||||
ContextResourceUsage: a *copy* of the object tracking resource
|
||||
usage so far
|
||||
"""
|
||||
# we always return a copy, for consistency
|
||||
res = self._resource_usage.copy()
|
||||
|
||||
# If we are on the correct thread and we're currently running then we
|
||||
# can include resource usage so far.
|
||||
is_main_thread = get_thread_id() == self.main_thread
|
||||
if self.alive and self.usage_start and is_main_thread:
|
||||
utime_delta, stime_delta = self._get_cputime()
|
||||
res.ru_utime += utime_delta
|
||||
res.ru_stime += stime_delta
|
||||
|
||||
return res
|
||||
|
||||
def _get_cputime(self):
|
||||
"""Get the cpu usage time so far
|
||||
|
||||
Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
|
||||
"""
|
||||
current = get_thread_resource_usage()
|
||||
|
||||
utime_delta = current.ru_utime - self.usage_start.ru_utime
|
||||
stime_delta = current.ru_stime - self.usage_start.ru_stime
|
||||
|
||||
# sanity check
|
||||
if utime_delta < 0:
|
||||
logger.error(
|
||||
"utime went backwards! %f < %f",
|
||||
current.ru_utime,
|
||||
self.usage_start.ru_utime,
|
||||
)
|
||||
utime_delta = 0
|
||||
|
||||
if stime_delta < 0:
|
||||
logger.error(
|
||||
"stime went backwards! %f < %f",
|
||||
current.ru_stime,
|
||||
self.usage_start.ru_stime,
|
||||
)
|
||||
stime_delta = 0
|
||||
|
||||
return utime_delta, stime_delta
|
||||
|
||||
def add_database_transaction(self, duration_sec):
|
||||
if duration_sec < 0:
|
||||
raise ValueError("DB txn time can only be non-negative")
|
||||
self._resource_usage.db_txn_count += 1
|
||||
self._resource_usage.db_txn_duration_sec += duration_sec
|
||||
|
||||
def add_database_scheduled(self, sched_sec):
|
||||
"""Record a use of the database pool
|
||||
|
||||
Args:
|
||||
sched_sec (float): number of seconds it took us to get a
|
||||
connection
|
||||
"""
|
||||
if sched_sec < 0:
|
||||
raise ValueError("DB scheduling time can only be non-negative")
|
||||
self._resource_usage.db_sched_duration_sec += sched_sec
|
||||
|
||||
def record_event_fetch(self, event_count):
|
||||
"""Record a number of events being fetched from the db
|
||||
|
||||
Args:
|
||||
event_count (int): number of events being fetched
|
||||
"""
|
||||
self._resource_usage.evt_db_fetch_count += event_count
|
||||
|
||||
|
||||
class LoggingContextFilter(logging.Filter):
|
||||
"""Logging filter that adds values from the current logging context to each
|
||||
record.
|
||||
Args:
|
||||
**defaults: Default values to avoid formatters complaining about
|
||||
missing fields
|
||||
"""
|
||||
|
||||
def __init__(self, **defaults):
|
||||
self.defaults = defaults
|
||||
|
||||
def filter(self, record):
|
||||
"""Add each fields from the logging contexts to the record.
|
||||
Returns:
|
||||
True to include the record in the log output.
|
||||
"""
|
||||
context = LoggingContext.current_context()
|
||||
for key, value in self.defaults.items():
|
||||
setattr(record, key, value)
|
||||
|
||||
# context should never be None, but if it somehow ends up being, then
|
||||
# we end up in a death spiral of infinite loops, so let's check, for
|
||||
# robustness' sake.
|
||||
if context is not None:
|
||||
context.copy_to(record)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class PreserveLoggingContext(object):
|
||||
"""Captures the current logging context and restores it when the scope is
|
||||
exited. Used to restore the context after a function using
|
||||
@defer.inlineCallbacks is resumed by a callback from the reactor."""
|
||||
|
||||
__slots__ = ["current_context", "new_context", "has_parent"]
|
||||
|
||||
def __init__(self, new_context=None):
|
||||
if new_context is None:
|
||||
new_context = LoggingContext.sentinel
|
||||
self.new_context = new_context
|
||||
|
||||
def __enter__(self):
|
||||
"""Captures the current logging context"""
|
||||
self.current_context = LoggingContext.set_current_context(self.new_context)
|
||||
|
||||
if self.current_context:
|
||||
self.has_parent = self.current_context.previous_context is not None
|
||||
if not self.current_context.alive:
|
||||
logger.debug("Entering dead context: %s", self.current_context)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Restores the current logging context"""
|
||||
context = LoggingContext.set_current_context(self.current_context)
|
||||
|
||||
if context != self.new_context:
|
||||
if context is LoggingContext.sentinel:
|
||||
logger.warning("Expected logging context %s was lost", self.new_context)
|
||||
else:
|
||||
logger.warning(
|
||||
"Expected logging context %s but found %s",
|
||||
self.new_context,
|
||||
context,
|
||||
)
|
||||
|
||||
if self.current_context is not LoggingContext.sentinel:
|
||||
if not self.current_context.alive:
|
||||
logger.debug("Restoring dead context: %s", self.current_context)
|
||||
|
||||
|
||||
def nested_logging_context(suffix, parent_context=None):
|
||||
"""Creates a new logging context as a child of another.
|
||||
|
||||
The nested logging context will have a 'request' made up of the parent context's
|
||||
request, plus the given suffix.
|
||||
|
||||
CPU/db usage stats will be added to the parent context's on exit.
|
||||
|
||||
Normal usage looks like:
|
||||
|
||||
with nested_logging_context(suffix):
|
||||
# ... do stuff
|
||||
|
||||
Args:
|
||||
suffix (str): suffix to add to the parent context's 'request'.
|
||||
parent_context (LoggingContext|None): parent context. Will use the current context
|
||||
if None.
|
||||
|
||||
Returns:
|
||||
LoggingContext: new logging context.
|
||||
"""
|
||||
if parent_context is None:
|
||||
parent_context = LoggingContext.current_context()
|
||||
return LoggingContext(
|
||||
parent_context=parent_context, request=parent_context.request + "-" + suffix
|
||||
)
|
||||
|
||||
|
||||
def preserve_fn(f):
|
||||
"""Function decorator which wraps the function with run_in_background"""
|
||||
|
||||
def g(*args, **kwargs):
|
||||
return run_in_background(f, *args, **kwargs)
|
||||
|
||||
return g
|
||||
|
||||
|
||||
def run_in_background(f, *args, **kwargs):
|
||||
"""Calls a function, ensuring that the current context is restored after
|
||||
return from the function, and that the sentinel context is set once the
|
||||
deferred returned by the function completes.
|
||||
|
||||
Useful for wrapping functions that return a deferred or coroutine, which you don't
|
||||
yield or await on (for instance because you want to pass it to
|
||||
deferred.gatherResults()).
|
||||
|
||||
Note that if you completely discard the result, you should make sure that
|
||||
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = LoggingContext.current_context()
|
||||
try:
|
||||
res = f(*args, **kwargs)
|
||||
except: # noqa: E722
|
||||
# the assumption here is that the caller doesn't want to be disturbed
|
||||
# by synchronous exceptions, so let's turn them into Failures.
|
||||
return defer.fail()
|
||||
|
||||
if isinstance(res, types.CoroutineType):
|
||||
res = defer.ensureDeferred(res)
|
||||
|
||||
if not isinstance(res, defer.Deferred):
|
||||
return res
|
||||
|
||||
if res.called and not res.paused:
|
||||
# The function should have maintained the logcontext, so we can
|
||||
# optimise out the messing about
|
||||
return res
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = LoggingContext.set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
res.addBoth(_set_context_cb, ctx)
|
||||
return res
|
||||
|
||||
|
||||
def make_deferred_yieldable(deferred):
|
||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||
|
||||
If the deferred has completed (or is not actually a Deferred), essentially
|
||||
does nothing (just returns another completed deferred with the
|
||||
result/failure).
|
||||
|
||||
If the deferred has not yet completed, resets the logcontext before
|
||||
returning a deferred. Then, when the deferred completes, restores the
|
||||
current logcontext before running callbacks/errbacks.
|
||||
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if not isinstance(deferred, defer.Deferred):
|
||||
return deferred
|
||||
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
return deferred
|
||||
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
return deferred
|
||||
|
||||
|
||||
def _set_context_cb(result, context):
|
||||
"""A callback function which just sets the logging context"""
|
||||
LoggingContext.set_current_context(context)
|
||||
return result
|
||||
|
||||
|
||||
def defer_to_thread(reactor, f, *args, **kwargs):
|
||||
"""
|
||||
Calls the function `f` using a thread from the reactor's default threadpool and
|
||||
returns the result as a Deferred.
|
||||
|
||||
Creates a new logcontext for `f`, which is created as a child of the current
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
|
||||
the Deferred will be invoked, and whose threadpool we should use for the
|
||||
function.
|
||||
|
||||
Normally this will be hs.get_reactor().
|
||||
|
||||
f (callable): The function to call.
|
||||
|
||||
args: positional arguments to pass to f.
|
||||
|
||||
kwargs: keyword arguments to pass to f.
|
||||
|
||||
Returns:
|
||||
Deferred: A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
|
||||
|
||||
|
||||
def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
|
||||
"""
|
||||
A wrapper for twisted.internet.threads.deferToThreadpool, which handles
|
||||
logcontexts correctly.
|
||||
|
||||
Calls the function `f` using a thread from the given threadpool and returns
|
||||
the result as a Deferred.
|
||||
|
||||
Creates a new logcontext for `f`, which is created as a child of the current
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
|
||||
the Deferred will be invoked. Normally this will be hs.get_reactor().
|
||||
|
||||
threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
|
||||
running `f`. Normally this will be hs.get_reactor().getThreadPool().
|
||||
|
||||
f (callable): The function to call.
|
||||
|
||||
args: positional arguments to pass to f.
|
||||
|
||||
kwargs: keyword arguments to pass to f.
|
||||
|
||||
Returns:
|
||||
Deferred: A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
logcontext = LoggingContext.current_context()
|
||||
|
||||
def g():
|
||||
with LoggingContext(parent_context=logcontext):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
||||
@@ -0,0 +1,53 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
from six import StringIO
|
||||
|
||||
|
||||
class LogFormatter(logging.Formatter):
|
||||
"""Log formatter which gives more detail for exceptions
|
||||
|
||||
This is the same as the standard log formatter, except that when logging
|
||||
exceptions [typically via log.foo("msg", exc_info=1)], it prints the
|
||||
sequence that led up to the point at which the exception was caught.
|
||||
(Normally only stack frames between the point the exception was raised and
|
||||
where it was caught are logged).
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(LogFormatter, self).__init__(*args, **kwargs)
|
||||
|
||||
def formatException(self, ei):
|
||||
sio = StringIO()
|
||||
(typ, val, tb) = ei
|
||||
|
||||
# log the stack above the exception capture point if possible, but
|
||||
# check that we actually have an f_back attribute to work around
|
||||
# https://twistedmatrix.com/trac/ticket/9305
|
||||
|
||||
if tb and hasattr(tb.tb_frame, "f_back"):
|
||||
sio.write("Capture point (most recent call last):\n")
|
||||
traceback.print_stack(tb.tb_frame.f_back, None, sio)
|
||||
|
||||
traceback.print_exception(typ, val, tb, None, sio)
|
||||
s = sio.getvalue()
|
||||
sio.close()
|
||||
if s[-1:] == "\n":
|
||||
s = s[:-1]
|
||||
return s
|
||||
@@ -22,7 +22,7 @@ from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
+2
-2
@@ -23,12 +23,12 @@ from twisted.internet import defer
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import StreamToken
|
||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.push.presentable_names import (
|
||||
calculate_room_name,
|
||||
descriptor_from_member_events,
|
||||
@@ -36,7 +37,6 @@ from synapse.push.presentable_names import (
|
||||
)
|
||||
from synapse.types import UserID
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -62,9 +62,9 @@ from twisted.internet import defer
|
||||
from twisted.protocols.basic import LineOnlyReceiver
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from .commands import (
|
||||
|
||||
@@ -17,8 +17,8 @@
|
||||
to ensure idempotency when performing PUTs using the REST API."""
|
||||
import logging
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from twisted.protocols.basic import FileSender
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError, cs_error
|
||||
from synapse.http.server import finish_request, respond_with_json
|
||||
from synapse.util import logcontext
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util.stringutils import is_ascii
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -75,9 +75,7 @@ def respond_with_file(request, media_type, file_path, file_size=None, upload_nam
|
||||
add_file_headers(request, media_type, file_size, upload_name)
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
yield logcontext.make_deferred_yieldable(
|
||||
FileSender().beginFileTransfer(f, request)
|
||||
)
|
||||
yield make_deferred_yieldable(FileSender().beginFileTransfer(f, request))
|
||||
|
||||
finish_request(request)
|
||||
else:
|
||||
|
||||
@@ -33,8 +33,8 @@ from synapse.api.errors import (
|
||||
RequestSendFailed,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -463,7 +463,7 @@ class MediaRepository(object):
|
||||
)
|
||||
|
||||
thumbnailer = Thumbnailer(input_path)
|
||||
t_byte_source = yield logcontext.defer_to_thread(
|
||||
t_byte_source = yield defer_to_thread(
|
||||
self.hs.get_reactor(),
|
||||
self._generate_thumbnail,
|
||||
thumbnailer,
|
||||
@@ -511,7 +511,7 @@ class MediaRepository(object):
|
||||
)
|
||||
|
||||
thumbnailer = Thumbnailer(input_path)
|
||||
t_byte_source = yield logcontext.defer_to_thread(
|
||||
t_byte_source = yield defer_to_thread(
|
||||
self.hs.get_reactor(),
|
||||
self._generate_thumbnail,
|
||||
thumbnailer,
|
||||
@@ -596,7 +596,7 @@ class MediaRepository(object):
|
||||
return
|
||||
|
||||
if thumbnailer.transpose_method is not None:
|
||||
m_width, m_height = yield logcontext.defer_to_thread(
|
||||
m_width, m_height = yield defer_to_thread(
|
||||
self.hs.get_reactor(), thumbnailer.transpose
|
||||
)
|
||||
|
||||
@@ -616,11 +616,11 @@ class MediaRepository(object):
|
||||
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
|
||||
# Generate the thumbnail
|
||||
if t_method == "crop":
|
||||
t_byte_source = yield logcontext.defer_to_thread(
|
||||
t_byte_source = yield defer_to_thread(
|
||||
self.hs.get_reactor(), thumbnailer.crop, t_width, t_height, t_type
|
||||
)
|
||||
elif t_method == "scale":
|
||||
t_byte_source = yield logcontext.defer_to_thread(
|
||||
t_byte_source = yield defer_to_thread(
|
||||
self.hs.get_reactor(), thumbnailer.scale, t_width, t_height, t_type
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -24,9 +24,8 @@ import six
|
||||
from twisted.internet import defer
|
||||
from twisted.protocols.basic import FileSender
|
||||
|
||||
from synapse.util import logcontext
|
||||
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
|
||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
from ._base import Responder
|
||||
|
||||
@@ -65,7 +64,7 @@ class MediaStorage(object):
|
||||
|
||||
with self.store_into_file(file_info) as (f, fname, finish_cb):
|
||||
# Write to the main repository
|
||||
yield logcontext.defer_to_thread(
|
||||
yield defer_to_thread(
|
||||
self.hs.get_reactor(), _write_file_synchronously, source, f
|
||||
)
|
||||
yield finish_cb()
|
||||
|
||||
@@ -42,11 +42,11 @@ from synapse.http.server import (
|
||||
wrap_json_request_handler,
|
||||
)
|
||||
from synapse.http.servlet import parse_integer, parse_string
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.media.v1._base import get_filename_from_headers
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from ._base import FileInfo
|
||||
|
||||
@@ -20,8 +20,7 @@ import shutil
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.config._base import Config
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.logging.context import defer_to_thread, run_in_background
|
||||
|
||||
from .media_storage import FileResponder
|
||||
|
||||
@@ -125,7 +124,7 @@ class FileStorageProviderBackend(StorageProvider):
|
||||
if not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
|
||||
return logcontext.defer_to_thread(
|
||||
return defer_to_thread(
|
||||
self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname
|
||||
)
|
||||
|
||||
|
||||
@@ -28,11 +28,11 @@ from twisted.internet import defer
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.state import v1, v2
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches import get_cache_factor_for
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -30,12 +30,12 @@ from prometheus_client import Histogram
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import batch_iter
|
||||
from synapse.util.caches.descriptors import Cache
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.util.stringutils import exception_to_unicode
|
||||
|
||||
# import a function which will return a monotonic time, in seconds
|
||||
|
||||
@@ -33,6 +33,8 @@ from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events import EventBase # noqa: F401
|
||||
from synapse.events.snapshot import EventContext # noqa: F401
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.metrics import BucketCollector
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.state import StateResolutionStore
|
||||
@@ -45,8 +47,6 @@ from synapse.util import batch_iter
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -29,14 +29,14 @@ from synapse.api.room_versions import EventFormatVersions
|
||||
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
|
||||
from synapse.events.snapshot import EventContext # noqa: F401
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.logcontext import (
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
@@ -41,12 +41,12 @@ from six.moves import range
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ import attr
|
||||
|
||||
from twisted.internet import defer, task
|
||||
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.logging import context
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -46,7 +46,7 @@ class Clock(object):
|
||||
@defer.inlineCallbacks
|
||||
def sleep(self, seconds):
|
||||
d = defer.Deferred()
|
||||
with PreserveLoggingContext():
|
||||
with context.PreserveLoggingContext():
|
||||
self._reactor.callLater(seconds, d.callback, seconds)
|
||||
res = yield d
|
||||
defer.returnValue(res)
|
||||
@@ -91,10 +91,10 @@ class Clock(object):
|
||||
"""
|
||||
|
||||
def wrapped_callback(*args, **kwargs):
|
||||
with PreserveLoggingContext():
|
||||
with context.PreserveLoggingContext():
|
||||
callback(*args, **kwargs)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
with context.PreserveLoggingContext():
|
||||
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
|
||||
|
||||
def cancel_call_later(self, timer, ignore_errs=False):
|
||||
|
||||
@@ -23,13 +23,12 @@ from twisted.internet import defer
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.python import failure
|
||||
|
||||
from synapse.util import Clock, logcontext, unwrapFirstError
|
||||
|
||||
from .logcontext import (
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.util import Clock, unwrapFirstError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -153,7 +152,7 @@ def concurrently_execute(func, args, limit):
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
return logcontext.make_deferred_yieldable(
|
||||
return make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[run_in_background(_concurrently_execute_inner) for _ in range(limit)],
|
||||
consumeErrors=True,
|
||||
@@ -174,7 +173,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
|
||||
Deferred[list]: Resolved when all functions have been invoked, or errors if
|
||||
one of the function calls fails.
|
||||
"""
|
||||
return logcontext.make_deferred_yieldable(
|
||||
return make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[run_in_background(func, item, *args, **kwargs) for item in iter],
|
||||
consumeErrors=True,
|
||||
|
||||
@@ -24,7 +24,8 @@ from six import itervalues, string_types
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.logging.context import make_deferred_yieldable, preserve_fn
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches import get_cache_factor_for
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
@@ -388,7 +389,7 @@ class CacheDescriptor(_CacheDescriptorBase):
|
||||
|
||||
except KeyError:
|
||||
ret = defer.maybeDeferred(
|
||||
logcontext.preserve_fn(self.function_to_call), obj, *args, **kwargs
|
||||
preserve_fn(self.function_to_call), obj, *args, **kwargs
|
||||
)
|
||||
|
||||
def onErr(f):
|
||||
@@ -408,7 +409,7 @@ class CacheDescriptor(_CacheDescriptorBase):
|
||||
observer = result_d.observe()
|
||||
|
||||
if isinstance(observer, defer.Deferred):
|
||||
return logcontext.make_deferred_yieldable(observer)
|
||||
return make_deferred_yieldable(observer)
|
||||
else:
|
||||
return observer
|
||||
|
||||
@@ -563,7 +564,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
|
||||
|
||||
cached_defers.append(
|
||||
defer.maybeDeferred(
|
||||
logcontext.preserve_fn(self.function_to_call), **args_to_call
|
||||
preserve_fn(self.function_to_call), **args_to_call
|
||||
).addCallbacks(complete_all, errback)
|
||||
)
|
||||
|
||||
@@ -571,7 +572,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
|
||||
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
|
||||
lambda _: results, unwrapFirstError
|
||||
)
|
||||
return logcontext.make_deferred_yieldable(d)
|
||||
return make_deferred_yieldable(d)
|
||||
else:
|
||||
return results
|
||||
|
||||
|
||||
@@ -16,9 +16,9 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches import register_cache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -78,7 +78,7 @@ class ResponseCache(object):
|
||||
|
||||
*deferred* should run its callbacks in the sentinel logcontext (ie,
|
||||
you should wrap normal synapse deferreds with
|
||||
logcontext.run_in_background).
|
||||
synapse.logging.context.run_in_background).
|
||||
|
||||
Can return either a new Deferred (which also doesn't follow the synapse
|
||||
logcontext rules), or, if *deferred* was already complete, the actual
|
||||
|
||||
@@ -17,8 +17,8 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ from six.moves import queue
|
||||
|
||||
from twisted.internet import threads
|
||||
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
|
||||
|
||||
class BackgroundFileConsumer(object):
|
||||
|
||||
+24
-669
@@ -1,4 +1,4 @@
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -12,673 +12,28 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Thread-local-alike tracking of log contexts within synapse
|
||||
|
||||
This module provides objects and utilities for tracking contexts through
|
||||
synapse code, so that log lines can include a request identifier, and so that
|
||||
CPU and database activity can be accounted for against the request that caused
|
||||
them.
|
||||
|
||||
See doc/log_contexts.rst for details on how this works.
|
||||
"""
|
||||
Backwards compatibility re-exports of ``synapse.logging.context`` functionality.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import types
|
||||
|
||||
from twisted.internet import defer, threads
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import resource
|
||||
|
||||
# Python doesn't ship with a definition of RUSAGE_THREAD but it's defined
|
||||
# to be 1 on linux so we hard code it.
|
||||
RUSAGE_THREAD = 1
|
||||
|
||||
# If the system doesn't support RUSAGE_THREAD then this should throw an
|
||||
# exception.
|
||||
resource.getrusage(RUSAGE_THREAD)
|
||||
|
||||
def get_thread_resource_usage():
|
||||
return resource.getrusage(RUSAGE_THREAD)
|
||||
|
||||
|
||||
except Exception:
|
||||
# If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
|
||||
# won't track resource usage by returning None.
|
||||
def get_thread_resource_usage():
|
||||
return None
|
||||
|
||||
|
||||
class ContextResourceUsage(object):
|
||||
"""Object for tracking the resources used by a log context
|
||||
|
||||
Attributes:
|
||||
ru_utime (float): user CPU time (in seconds)
|
||||
ru_stime (float): system CPU time (in seconds)
|
||||
db_txn_count (int): number of database transactions done
|
||||
db_sched_duration_sec (float): amount of time spent waiting for a
|
||||
database connection
|
||||
db_txn_duration_sec (float): amount of time spent doing database
|
||||
transactions (excluding scheduling time)
|
||||
evt_db_fetch_count (int): number of events requested from the database
|
||||
"""
|
||||
|
||||
__slots__ = [
|
||||
"ru_stime",
|
||||
"ru_utime",
|
||||
"db_txn_count",
|
||||
"db_txn_duration_sec",
|
||||
"db_sched_duration_sec",
|
||||
"evt_db_fetch_count",
|
||||
]
|
||||
|
||||
def __init__(self, copy_from=None):
|
||||
"""Create a new ContextResourceUsage
|
||||
|
||||
Args:
|
||||
copy_from (ContextResourceUsage|None): if not None, an object to
|
||||
copy stats from
|
||||
"""
|
||||
if copy_from is None:
|
||||
self.reset()
|
||||
else:
|
||||
self.ru_utime = copy_from.ru_utime
|
||||
self.ru_stime = copy_from.ru_stime
|
||||
self.db_txn_count = copy_from.db_txn_count
|
||||
|
||||
self.db_txn_duration_sec = copy_from.db_txn_duration_sec
|
||||
self.db_sched_duration_sec = copy_from.db_sched_duration_sec
|
||||
self.evt_db_fetch_count = copy_from.evt_db_fetch_count
|
||||
|
||||
def copy(self):
|
||||
return ContextResourceUsage(copy_from=self)
|
||||
|
||||
def reset(self):
|
||||
self.ru_stime = 0.0
|
||||
self.ru_utime = 0.0
|
||||
self.db_txn_count = 0
|
||||
|
||||
self.db_txn_duration_sec = 0
|
||||
self.db_sched_duration_sec = 0
|
||||
self.evt_db_fetch_count = 0
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
"<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
|
||||
"db_txn_count='%r', db_txn_duration_sec='%r', "
|
||||
"db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
|
||||
) % (
|
||||
self.ru_stime,
|
||||
self.ru_utime,
|
||||
self.db_txn_count,
|
||||
self.db_txn_duration_sec,
|
||||
self.db_sched_duration_sec,
|
||||
self.evt_db_fetch_count,
|
||||
)
|
||||
|
||||
def __iadd__(self, other):
|
||||
"""Add another ContextResourceUsage's stats to this one's.
|
||||
|
||||
Args:
|
||||
other (ContextResourceUsage): the other resource usage object
|
||||
"""
|
||||
self.ru_utime += other.ru_utime
|
||||
self.ru_stime += other.ru_stime
|
||||
self.db_txn_count += other.db_txn_count
|
||||
self.db_txn_duration_sec += other.db_txn_duration_sec
|
||||
self.db_sched_duration_sec += other.db_sched_duration_sec
|
||||
self.evt_db_fetch_count += other.evt_db_fetch_count
|
||||
return self
|
||||
|
||||
def __isub__(self, other):
|
||||
self.ru_utime -= other.ru_utime
|
||||
self.ru_stime -= other.ru_stime
|
||||
self.db_txn_count -= other.db_txn_count
|
||||
self.db_txn_duration_sec -= other.db_txn_duration_sec
|
||||
self.db_sched_duration_sec -= other.db_sched_duration_sec
|
||||
self.evt_db_fetch_count -= other.evt_db_fetch_count
|
||||
return self
|
||||
|
||||
def __add__(self, other):
|
||||
res = ContextResourceUsage(copy_from=self)
|
||||
res += other
|
||||
return res
|
||||
|
||||
def __sub__(self, other):
|
||||
res = ContextResourceUsage(copy_from=self)
|
||||
res -= other
|
||||
return res
|
||||
|
||||
|
||||
class LoggingContext(object):
|
||||
"""Additional context for log formatting. Contexts are scoped within a
|
||||
"with" block.
|
||||
|
||||
If a parent is given when creating a new context, then:
|
||||
- logging fields are copied from the parent to the new context on entry
|
||||
- when the new context exits, the cpu usage stats are copied from the
|
||||
child to the parent
|
||||
|
||||
Args:
|
||||
name (str): Name for the context for debugging.
|
||||
parent_context (LoggingContext|None): The parent of the new context
|
||||
"""
|
||||
|
||||
__slots__ = [
|
||||
"previous_context",
|
||||
"name",
|
||||
"parent_context",
|
||||
"_resource_usage",
|
||||
"usage_start",
|
||||
"main_thread",
|
||||
"alive",
|
||||
"request",
|
||||
"tag",
|
||||
]
|
||||
|
||||
thread_local = threading.local()
|
||||
|
||||
class Sentinel(object):
|
||||
"""Sentinel to represent the root context"""
|
||||
|
||||
__slots__ = []
|
||||
|
||||
def __str__(self):
|
||||
return "sentinel"
|
||||
|
||||
def copy_to(self, record):
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def add_database_transaction(self, duration_sec):
|
||||
pass
|
||||
|
||||
def add_database_scheduled(self, sched_sec):
|
||||
pass
|
||||
|
||||
def record_event_fetch(self, event_count):
|
||||
pass
|
||||
|
||||
def __nonzero__(self):
|
||||
return False
|
||||
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
sentinel = Sentinel()
|
||||
|
||||
def __init__(self, name=None, parent_context=None, request=None):
|
||||
self.previous_context = LoggingContext.current_context()
|
||||
self.name = name
|
||||
|
||||
# track the resources used by this context so far
|
||||
self._resource_usage = ContextResourceUsage()
|
||||
|
||||
# If alive has the thread resource usage when the logcontext last
|
||||
# became active.
|
||||
self.usage_start = None
|
||||
|
||||
self.main_thread = threading.current_thread()
|
||||
self.request = None
|
||||
self.tag = ""
|
||||
self.alive = True
|
||||
|
||||
self.parent_context = parent_context
|
||||
|
||||
if self.parent_context is not None:
|
||||
self.parent_context.copy_to(self)
|
||||
|
||||
if request is not None:
|
||||
# the request param overrides the request from the parent context
|
||||
self.request = request
|
||||
|
||||
def __str__(self):
|
||||
if self.request:
|
||||
return str(self.request)
|
||||
return "%s@%x" % (self.name, id(self))
|
||||
|
||||
@classmethod
|
||||
def current_context(cls):
|
||||
"""Get the current logging context from thread local storage
|
||||
|
||||
Returns:
|
||||
LoggingContext: the current logging context
|
||||
"""
|
||||
return getattr(cls.thread_local, "current_context", cls.sentinel)
|
||||
|
||||
@classmethod
|
||||
def set_current_context(cls, context):
|
||||
"""Set the current logging context in thread local storage
|
||||
Args:
|
||||
context(LoggingContext): The context to activate.
|
||||
Returns:
|
||||
The context that was previously active
|
||||
"""
|
||||
current = cls.current_context()
|
||||
|
||||
if current is not context:
|
||||
current.stop()
|
||||
cls.thread_local.current_context = context
|
||||
context.start()
|
||||
return current
|
||||
|
||||
def __enter__(self):
|
||||
"""Enters this logging context into thread local storage"""
|
||||
old_context = self.set_current_context(self)
|
||||
if self.previous_context != old_context:
|
||||
logger.warn(
|
||||
"Expected previous context %r, found %r",
|
||||
self.previous_context,
|
||||
old_context,
|
||||
)
|
||||
self.alive = True
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Restore the logging context in thread local storage to the state it
|
||||
was before this context was entered.
|
||||
Returns:
|
||||
None to avoid suppressing any exceptions that were thrown.
|
||||
"""
|
||||
current = self.set_current_context(self.previous_context)
|
||||
if current is not self:
|
||||
if current is self.sentinel:
|
||||
logger.warning("Expected logging context %s was lost", self)
|
||||
else:
|
||||
logger.warning(
|
||||
"Expected logging context %s but found %s", self, current
|
||||
)
|
||||
self.previous_context = None
|
||||
self.alive = False
|
||||
|
||||
# if we have a parent, pass our CPU usage stats on
|
||||
if self.parent_context is not None and hasattr(
|
||||
self.parent_context, "_resource_usage"
|
||||
):
|
||||
self.parent_context._resource_usage += self._resource_usage
|
||||
|
||||
# reset them in case we get entered again
|
||||
self._resource_usage.reset()
|
||||
|
||||
def copy_to(self, record):
|
||||
"""Copy logging fields from this context to a log record or
|
||||
another LoggingContext
|
||||
"""
|
||||
|
||||
# 'request' is the only field we currently use in the logger, so that's
|
||||
# all we need to copy
|
||||
record.request = self.request
|
||||
|
||||
def start(self):
|
||||
if threading.current_thread() is not self.main_thread:
|
||||
logger.warning("Started logcontext %s on different thread", self)
|
||||
return
|
||||
|
||||
# If we haven't already started record the thread resource usage so
|
||||
# far
|
||||
if not self.usage_start:
|
||||
self.usage_start = get_thread_resource_usage()
|
||||
|
||||
def stop(self):
|
||||
if threading.current_thread() is not self.main_thread:
|
||||
logger.warning("Stopped logcontext %s on different thread", self)
|
||||
return
|
||||
|
||||
# When we stop, let's record the cpu used since we started
|
||||
if not self.usage_start:
|
||||
logger.warning("Called stop on logcontext %s without calling start", self)
|
||||
return
|
||||
|
||||
utime_delta, stime_delta = self._get_cputime()
|
||||
self._resource_usage.ru_utime += utime_delta
|
||||
self._resource_usage.ru_stime += stime_delta
|
||||
|
||||
self.usage_start = None
|
||||
|
||||
def get_resource_usage(self):
|
||||
"""Get resources used by this logcontext so far.
|
||||
|
||||
Returns:
|
||||
ContextResourceUsage: a *copy* of the object tracking resource
|
||||
usage so far
|
||||
"""
|
||||
# we always return a copy, for consistency
|
||||
res = self._resource_usage.copy()
|
||||
|
||||
# If we are on the correct thread and we're currently running then we
|
||||
# can include resource usage so far.
|
||||
is_main_thread = threading.current_thread() is self.main_thread
|
||||
if self.alive and self.usage_start and is_main_thread:
|
||||
utime_delta, stime_delta = self._get_cputime()
|
||||
res.ru_utime += utime_delta
|
||||
res.ru_stime += stime_delta
|
||||
|
||||
return res
|
||||
|
||||
def _get_cputime(self):
|
||||
"""Get the cpu usage time so far
|
||||
|
||||
Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
|
||||
"""
|
||||
current = get_thread_resource_usage()
|
||||
|
||||
utime_delta = current.ru_utime - self.usage_start.ru_utime
|
||||
stime_delta = current.ru_stime - self.usage_start.ru_stime
|
||||
|
||||
# sanity check
|
||||
if utime_delta < 0:
|
||||
logger.error(
|
||||
"utime went backwards! %f < %f",
|
||||
current.ru_utime,
|
||||
self.usage_start.ru_utime,
|
||||
)
|
||||
utime_delta = 0
|
||||
|
||||
if stime_delta < 0:
|
||||
logger.error(
|
||||
"stime went backwards! %f < %f",
|
||||
current.ru_stime,
|
||||
self.usage_start.ru_stime,
|
||||
)
|
||||
stime_delta = 0
|
||||
|
||||
return utime_delta, stime_delta
|
||||
|
||||
def add_database_transaction(self, duration_sec):
|
||||
if duration_sec < 0:
|
||||
raise ValueError("DB txn time can only be non-negative")
|
||||
self._resource_usage.db_txn_count += 1
|
||||
self._resource_usage.db_txn_duration_sec += duration_sec
|
||||
|
||||
def add_database_scheduled(self, sched_sec):
|
||||
"""Record a use of the database pool
|
||||
|
||||
Args:
|
||||
sched_sec (float): number of seconds it took us to get a
|
||||
connection
|
||||
"""
|
||||
if sched_sec < 0:
|
||||
raise ValueError("DB scheduling time can only be non-negative")
|
||||
self._resource_usage.db_sched_duration_sec += sched_sec
|
||||
|
||||
def record_event_fetch(self, event_count):
|
||||
"""Record a number of events being fetched from the db
|
||||
|
||||
Args:
|
||||
event_count (int): number of events being fetched
|
||||
"""
|
||||
self._resource_usage.evt_db_fetch_count += event_count
|
||||
|
||||
|
||||
class LoggingContextFilter(logging.Filter):
|
||||
"""Logging filter that adds values from the current logging context to each
|
||||
record.
|
||||
Args:
|
||||
**defaults: Default values to avoid formatters complaining about
|
||||
missing fields
|
||||
"""
|
||||
|
||||
def __init__(self, **defaults):
|
||||
self.defaults = defaults
|
||||
|
||||
def filter(self, record):
|
||||
"""Add each fields from the logging contexts to the record.
|
||||
Returns:
|
||||
True to include the record in the log output.
|
||||
"""
|
||||
context = LoggingContext.current_context()
|
||||
for key, value in self.defaults.items():
|
||||
setattr(record, key, value)
|
||||
|
||||
# context should never be None, but if it somehow ends up being, then
|
||||
# we end up in a death spiral of infinite loops, so let's check, for
|
||||
# robustness' sake.
|
||||
if context is not None:
|
||||
context.copy_to(record)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class PreserveLoggingContext(object):
|
||||
"""Captures the current logging context and restores it when the scope is
|
||||
exited. Used to restore the context after a function using
|
||||
@defer.inlineCallbacks is resumed by a callback from the reactor."""
|
||||
|
||||
__slots__ = ["current_context", "new_context", "has_parent"]
|
||||
|
||||
def __init__(self, new_context=None):
|
||||
if new_context is None:
|
||||
new_context = LoggingContext.sentinel
|
||||
self.new_context = new_context
|
||||
|
||||
def __enter__(self):
|
||||
"""Captures the current logging context"""
|
||||
self.current_context = LoggingContext.set_current_context(self.new_context)
|
||||
|
||||
if self.current_context:
|
||||
self.has_parent = self.current_context.previous_context is not None
|
||||
if not self.current_context.alive:
|
||||
logger.debug("Entering dead context: %s", self.current_context)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Restores the current logging context"""
|
||||
context = LoggingContext.set_current_context(self.current_context)
|
||||
|
||||
if context != self.new_context:
|
||||
if context is LoggingContext.sentinel:
|
||||
logger.warning("Expected logging context %s was lost", self.new_context)
|
||||
else:
|
||||
logger.warning(
|
||||
"Expected logging context %s but found %s",
|
||||
self.new_context,
|
||||
context,
|
||||
)
|
||||
|
||||
if self.current_context is not LoggingContext.sentinel:
|
||||
if not self.current_context.alive:
|
||||
logger.debug("Restoring dead context: %s", self.current_context)
|
||||
|
||||
|
||||
def nested_logging_context(suffix, parent_context=None):
|
||||
"""Creates a new logging context as a child of another.
|
||||
|
||||
The nested logging context will have a 'request' made up of the parent context's
|
||||
request, plus the given suffix.
|
||||
|
||||
CPU/db usage stats will be added to the parent context's on exit.
|
||||
|
||||
Normal usage looks like:
|
||||
|
||||
with nested_logging_context(suffix):
|
||||
# ... do stuff
|
||||
|
||||
Args:
|
||||
suffix (str): suffix to add to the parent context's 'request'.
|
||||
parent_context (LoggingContext|None): parent context. Will use the current context
|
||||
if None.
|
||||
|
||||
Returns:
|
||||
LoggingContext: new logging context.
|
||||
"""
|
||||
if parent_context is None:
|
||||
parent_context = LoggingContext.current_context()
|
||||
return LoggingContext(
|
||||
parent_context=parent_context, request=parent_context.request + "-" + suffix
|
||||
)
|
||||
|
||||
|
||||
def preserve_fn(f):
|
||||
"""Function decorator which wraps the function with run_in_background"""
|
||||
|
||||
def g(*args, **kwargs):
|
||||
return run_in_background(f, *args, **kwargs)
|
||||
|
||||
return g
|
||||
|
||||
|
||||
def run_in_background(f, *args, **kwargs):
|
||||
"""Calls a function, ensuring that the current context is restored after
|
||||
return from the function, and that the sentinel context is set once the
|
||||
deferred returned by the function completes.
|
||||
|
||||
Useful for wrapping functions that return a deferred or coroutine, which you don't
|
||||
yield or await on (for instance because you want to pass it to
|
||||
deferred.gatherResults()).
|
||||
|
||||
Note that if you completely discard the result, you should make sure that
|
||||
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = LoggingContext.current_context()
|
||||
try:
|
||||
res = f(*args, **kwargs)
|
||||
except: # noqa: E722
|
||||
# the assumption here is that the caller doesn't want to be disturbed
|
||||
# by synchronous exceptions, so let's turn them into Failures.
|
||||
return defer.fail()
|
||||
|
||||
if isinstance(res, types.CoroutineType):
|
||||
res = defer.ensureDeferred(res)
|
||||
|
||||
if not isinstance(res, defer.Deferred):
|
||||
return res
|
||||
|
||||
if res.called and not res.paused:
|
||||
# The function should have maintained the logcontext, so we can
|
||||
# optimise out the messing about
|
||||
return res
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = LoggingContext.set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
res.addBoth(_set_context_cb, ctx)
|
||||
return res
|
||||
|
||||
|
||||
def make_deferred_yieldable(deferred):
|
||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||
|
||||
If the deferred has completed (or is not actually a Deferred), essentially
|
||||
does nothing (just returns another completed deferred with the
|
||||
result/failure).
|
||||
|
||||
If the deferred has not yet completed, resets the logcontext before
|
||||
returning a deferred. Then, when the deferred completes, restores the
|
||||
current logcontext before running callbacks/errbacks.
|
||||
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if not isinstance(deferred, defer.Deferred):
|
||||
return deferred
|
||||
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
return deferred
|
||||
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
return deferred
|
||||
|
||||
|
||||
def _set_context_cb(result, context):
|
||||
"""A callback function which just sets the logging context"""
|
||||
LoggingContext.set_current_context(context)
|
||||
return result
|
||||
|
||||
|
||||
def defer_to_thread(reactor, f, *args, **kwargs):
|
||||
"""
|
||||
Calls the function `f` using a thread from the reactor's default threadpool and
|
||||
returns the result as a Deferred.
|
||||
|
||||
Creates a new logcontext for `f`, which is created as a child of the current
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
|
||||
the Deferred will be invoked, and whose threadpool we should use for the
|
||||
function.
|
||||
|
||||
Normally this will be hs.get_reactor().
|
||||
|
||||
f (callable): The function to call.
|
||||
|
||||
args: positional arguments to pass to f.
|
||||
|
||||
kwargs: keyword arguments to pass to f.
|
||||
|
||||
Returns:
|
||||
Deferred: A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
|
||||
|
||||
|
||||
def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
|
||||
"""
|
||||
A wrapper for twisted.internet.threads.deferToThreadpool, which handles
|
||||
logcontexts correctly.
|
||||
|
||||
Calls the function `f` using a thread from the given threadpool and returns
|
||||
the result as a Deferred.
|
||||
|
||||
Creates a new logcontext for `f`, which is created as a child of the current
|
||||
logcontext (so its CPU usage metrics will get attributed to the current
|
||||
logcontext). `f` should preserve the logcontext it is given.
|
||||
|
||||
The result deferred follows the Synapse logcontext rules: you should `yield`
|
||||
on it.
|
||||
|
||||
Args:
|
||||
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
|
||||
the Deferred will be invoked. Normally this will be hs.get_reactor().
|
||||
|
||||
threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
|
||||
running `f`. Normally this will be hs.get_reactor().getThreadPool().
|
||||
|
||||
f (callable): The function to call.
|
||||
|
||||
args: positional arguments to pass to f.
|
||||
|
||||
kwargs: keyword arguments to pass to f.
|
||||
|
||||
Returns:
|
||||
Deferred: A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
logcontext = LoggingContext.current_context()
|
||||
|
||||
def g():
|
||||
with LoggingContext(parent_context=logcontext):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
LoggingContextFilter,
|
||||
PreserveLoggingContext,
|
||||
defer_to_thread,
|
||||
make_deferred_yieldable,
|
||||
nested_logging_context,
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"defer_to_thread",
|
||||
"LoggingContext",
|
||||
"LoggingContextFilter",
|
||||
"make_deferred_yieldable",
|
||||
"nested_logging_context",
|
||||
"preserve_fn",
|
||||
"PreserveLoggingContext",
|
||||
"run_in_background",
|
||||
]
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -13,41 +12,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Backwards compatibility re-exports of ``synapse.logging.formatter`` functionality.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
from synapse.logging.formatter import LogFormatter
|
||||
|
||||
from six import StringIO
|
||||
|
||||
|
||||
class LogFormatter(logging.Formatter):
|
||||
"""Log formatter which gives more detail for exceptions
|
||||
|
||||
This is the same as the standard log formatter, except that when logging
|
||||
exceptions [typically via log.foo("msg", exc_info=1)], it prints the
|
||||
sequence that led up to the point at which the exception was caught.
|
||||
(Normally only stack frames between the point the exception was raised and
|
||||
where it was caught are logged).
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(LogFormatter, self).__init__(*args, **kwargs)
|
||||
|
||||
def formatException(self, ei):
|
||||
sio = StringIO()
|
||||
(typ, val, tb) = ei
|
||||
|
||||
# log the stack above the exception capture point if possible, but
|
||||
# check that we actually have an f_back attribute to work around
|
||||
# https://twistedmatrix.com/trac/ticket/9305
|
||||
|
||||
if tb and hasattr(tb.tb_frame, "f_back"):
|
||||
sio.write("Capture point (most recent call last):\n")
|
||||
traceback.print_stack(tb.tb_frame.f_back, None, sio)
|
||||
|
||||
traceback.print_exception(typ, val, tb, None, sio)
|
||||
s = sio.getvalue()
|
||||
sio.close()
|
||||
if s[-1:] == "\n":
|
||||
s = s[:-1]
|
||||
return s
|
||||
__all__ = ["LogFormatter"]
|
||||
|
||||
@@ -20,8 +20,8 @@ from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import InFlightGauge
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ import logging
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import LimitExceededError
|
||||
from synapse.util.logcontext import (
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
|
||||
@@ -17,7 +17,7 @@ import random
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.util.logcontext
|
||||
import synapse.logging.context
|
||||
from synapse.api.errors import CodeMessageException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -225,4 +225,4 @@ class RetryDestinationLimiter(object):
|
||||
logger.exception("Failed to store destination_retry_timings")
|
||||
|
||||
# we deliberately do this in the background.
|
||||
synapse.util.logcontext.run_in_background(store_retry_timings)
|
||||
synapse.logging.context.run_in_background(store_retry_timings)
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
# This file serves as a blacklist for SyTest tests that we expect will fail in
|
||||
# Synapse.
|
||||
#
|
||||
# Each line of this file is scanned by sytest during a run and if the line
|
||||
# exactly matches the name of a test, it will be marked as "expected fail",
|
||||
# meaning the test will still run, but failure will not mark the entire test
|
||||
# suite as failing.
|
||||
#
|
||||
# Test names are encouraged to have a bug accompanied with them, serving as an
|
||||
# explanation for why the test has been excluded.
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1679
|
||||
Remote room members also see posted message events
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/2065
|
||||
Guest users can accept invites to private rooms over federation
|
||||
|
||||
# Blacklisted due to https://github.com/vector-im/riot-web/issues/7211
|
||||
The only membership state included in a gapped incremental sync is for senders in the timeline
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1658
|
||||
Newly created users see their own presence in /initialSync (SYT-34)
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1396
|
||||
Should reject keys claiming to belong to a different user
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/2306
|
||||
Users appear/disappear from directory when join_rules are changed
|
||||
Users appear/disappear from directory when history_visibility are changed
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1531
|
||||
Enabling an unknown default rule fails with 404
|
||||
|
||||
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1663
|
||||
New federated private chats get full presence information (SYN-115)
|
||||
@@ -22,7 +22,7 @@ from synapse.appservice.scheduler import (
|
||||
_ServiceQueuer,
|
||||
_TransactionController,
|
||||
)
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
@@ -30,9 +30,12 @@ from synapse.crypto.keyring import (
|
||||
ServerKeyFetcher,
|
||||
StoreKeyFetcher,
|
||||
)
|
||||
from synapse.logging.context import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
from tests import unittest
|
||||
|
||||
@@ -131,7 +134,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
@defer.inlineCallbacks
|
||||
def get_perspectives(**kwargs):
|
||||
self.assertEquals(LoggingContext.current_context().request, "11")
|
||||
with logcontext.PreserveLoggingContext():
|
||||
with PreserveLoggingContext():
|
||||
yield persp_deferred
|
||||
defer.returnValue(persp_resp)
|
||||
|
||||
@@ -158,7 +161,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
self.assertFalse(res_deferreds[0].called)
|
||||
res_deferreds[0].addBoth(self.check_context, None)
|
||||
|
||||
yield logcontext.make_deferred_yieldable(res_deferreds[0])
|
||||
yield make_deferred_yieldable(res_deferreds[0])
|
||||
|
||||
# let verify_json_objects_for_server finish its work before we kill the
|
||||
# logcontext
|
||||
@@ -184,7 +187,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
[("server10", json1, 0, "test")]
|
||||
)
|
||||
res_deferreds_2[0].addBoth(self.check_context, None)
|
||||
yield logcontext.make_deferred_yieldable(res_deferreds_2[0])
|
||||
yield make_deferred_yieldable(res_deferreds_2[0])
|
||||
|
||||
# let verify_json_objects_for_server finish its work before we kill the
|
||||
# logcontext
|
||||
|
||||
@@ -36,8 +36,8 @@ from synapse.http.federation.matrix_federation_agent import (
|
||||
_cache_period_from_headers,
|
||||
)
|
||||
from synapse.http.federation.srv_resolver import Server
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
from tests.http import TestServerTLSConnectionFactory, get_test_ca_cert_file
|
||||
from tests.server import FakeTransport, ThreadedMemoryReactorClock
|
||||
|
||||
@@ -22,7 +22,7 @@ from twisted.internet.error import ConnectError
|
||||
from twisted.names import dns, error
|
||||
|
||||
from synapse.http.federation.srv_resolver import SrvResolver
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
from tests import unittest
|
||||
from tests.utils import MockClock
|
||||
|
||||
@@ -29,7 +29,7 @@ from synapse.http.matrixfederationclient import (
|
||||
MatrixFederationHttpClient,
|
||||
MatrixFederationRequest,
|
||||
)
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
from tests.server import FakeTransport
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
@@ -28,7 +28,7 @@ def do_patch():
|
||||
Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit
|
||||
"""
|
||||
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
orig_inline_callbacks = defer.inlineCallbacks
|
||||
|
||||
|
||||
@@ -18,8 +18,8 @@ from mock import Mock
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.rest.client.v1 import login, room
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@ from mock import Mock, call
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.rest.client.transactions import CLEANUP_PERIOD_MS, HttpTransactionCache
|
||||
from synapse.util import Clock
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
from tests import unittest
|
||||
from tests.utils import MockClock
|
||||
|
||||
@@ -24,11 +24,11 @@ from six.moves.urllib import parse
|
||||
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.rest.media.v1._base import FileInfo
|
||||
from synapse.rest.media.v1.filepath import MediaFilePaths
|
||||
from synapse.rest.media.v1.media_storage import MediaStorage
|
||||
from synapse.rest.media.v1.storage_provider import FileStorageProviderBackend
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
@@ -3,9 +3,9 @@ from mock import Mock
|
||||
from twisted.internet.defer import maybeDeferred, succeed
|
||||
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.types import Requester, UserID
|
||||
from synapse.util import Clock
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user