1
0

Compare commits

..

10 Commits

Author SHA1 Message Date
Travis Ralston
177f2b838c changelog 2019-05-22 19:16:02 -06:00
Travis Ralston
f9d7d3aa89 Remove m.relates_to from events if the client set it to null
It appears as though Python only checks to see if the key exists in a dictionary, not necessarily for a useful value. This means that when clients submit (valid) requests with `m.relates_to: null` and Synapse later reads it, it gets a None reference error on access.

This is the easier route than guarding all the places where it could be None.
2019-05-22 19:14:10 -06:00
Richard van der Hoff
1a94de60e8 Run black on synapse.crypto.keyring (#5232) 2019-05-22 18:39:33 +01:00
Neil Johnson
73f1de31d1 Merge branch 'master' into develop 2019-05-22 17:59:43 +01:00
Marcus Hoffmann
62388a1e44 remove urllib3 pin (#5230)
requests 2.22.0 as been released supporting urllib3 1.25.2

Signed-off-by: Marcus Hoffmann <bubu@bubu1.eu>
2019-05-22 16:48:12 +01:00
Neil Johnson
ae5521be9c Merge branch 'master' into develop 2019-05-22 15:56:55 +01:00
Neil Johnson
66b75e2d81 Neilj/ensure get profileinfo available in client reader slaved store (#5213)
* expose SlavedProfileStore to ClientReaderSlavedStore
2019-05-22 13:55:32 +01:00
Steffen
2dfbeea66f Update README.md (#5222)
Add missing backslash
2019-05-22 12:53:16 +01:00
Richard van der Hoff
b898a5600a Merge branch 'master' into develop 2019-05-22 11:38:27 +01:00
Amber Brown
4a30e4acb4 Room Statistics (#4338) 2019-05-21 11:36:50 -05:00
24 changed files with 1455 additions and 171 deletions

1
changelog.d/4338.feature Normal file
View File

@@ -0,0 +1 @@
Synapse now more efficiently collates room statistics.

1
changelog.d/5200.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix worker registration bug caused by ClientReaderSlavedStore being unable to see get_profileinfo.

1
changelog.d/5230.misc Normal file
View File

@@ -0,0 +1 @@
Remove urllib3 pin as requests 2.22.0 has been released supporting urllib3 1.25.2.

1
changelog.d/5232.misc Normal file
View File

@@ -0,0 +1 @@
Run black on synapse.crypto.keyring.

1
changelog.d/5239.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix 500 Internal Server Error when sending an event with `m.relates_to: null`.

View File

@@ -161,7 +161,7 @@ specify values for `SYNAPSE_CONFIG_PATH`, `SYNAPSE_SERVER_NAME` and
example:
```
docker run -it --rm
docker run -it --rm \
--mount type=volume,src=synapse-data,dst=/data \
-e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
-e SYNAPSE_SERVER_NAME=my.matrix.host \

View File

@@ -1153,6 +1153,22 @@ password_config:
#
# Local statistics collection. Used in populating the room directory.
#
# 'bucket_size' controls how large each statistics timeslice is. It can
# be defined in a human readable short form -- e.g. "1d", "1y".
#
# 'retention' controls how long historical statistics will be kept for.
# It can be defined in a human readable short form -- e.g. "1d", "1y".
#
#
#stats:
# enabled: true
# bucket_size: 1d
# retention: 1y
# Server Notices room configuration
#
# Uncomment this section to enable a room which can be used to send notices

View File

@@ -79,6 +79,7 @@ class EventTypes(object):
RoomHistoryVisibility = "m.room.history_visibility"
CanonicalAlias = "m.room.canonical_alias"
Encryption = "m.room.encryption"
RoomAvatar = "m.room.avatar"
RoomEncryption = "m.room.encryption"
GuestAccess = "m.room.guest_access"

View File

@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
@@ -81,6 +82,7 @@ class ClientReaderSlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
BaseSlavedStore,
):

View File

@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .api import ApiConfig
from .appservice import AppServiceConfig
from .captcha import CaptchaConfig
@@ -36,20 +37,41 @@ from .saml2_config import SAML2Config
from .server import ServerConfig
from .server_notices_config import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
from .stats import StatsConfig
from .tls import TlsConfig
from .user_directory import UserDirectoryConfig
from .voip import VoipConfig
from .workers import WorkerConfig
class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
JWTConfig, PasswordConfig, EmailConfig,
WorkerConfig, PasswordAuthProviderConfig, PushConfig,
SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,
ConsentConfig,
ServerNoticesConfig, RoomDirectoryConfig,
):
class HomeServerConfig(
ServerConfig,
TlsConfig,
DatabaseConfig,
LoggingConfig,
RatelimitConfig,
ContentRepositoryConfig,
CaptchaConfig,
VoipConfig,
RegistrationConfig,
MetricsConfig,
ApiConfig,
AppServiceConfig,
KeyConfig,
SAML2Config,
CasConfig,
JWTConfig,
PasswordConfig,
EmailConfig,
WorkerConfig,
PasswordAuthProviderConfig,
PushConfig,
SpamCheckerConfig,
GroupsConfig,
UserDirectoryConfig,
ConsentConfig,
StatsConfig,
ServerNoticesConfig,
RoomDirectoryConfig,
):
pass

60
synapse/config/stats.py Normal file
View File

@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import sys
from ._base import Config
class StatsConfig(Config):
"""Stats Configuration
Configuration for the behaviour of synapse's stats engine
"""
def read_config(self, config):
self.stats_enabled = True
self.stats_bucket_size = 86400
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
self.stats_bucket_size = (
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
)
self.stats_retention = (
self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
/ 1000
)
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Local statistics collection. Used in populating the room directory.
#
# 'bucket_size' controls how large each statistics timeslice is. It can
# be defined in a human readable short form -- e.g. "1d", "1y".
#
# 'retention' controls how long historical statistics will be kept for.
# It can be defined in a human readable short form -- e.g. "1d", "1y".
#
#
#stats:
# enabled: true
# bucket_size: 1d
# retention: 1y
"""

View File

@@ -56,9 +56,9 @@ from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
VerifyKeyRequest = namedtuple("VerifyRequest", (
"server_name", "key_ids", "json_object", "deferred"
))
VerifyKeyRequest = namedtuple(
"VerifyRequest", ("server_name", "key_ids", "json_object", "deferred")
)
"""
A request for a verify key to verify a JSON object.
@@ -96,9 +96,7 @@ class Keyring(object):
def verify_json_for_server(self, server_name, json_object):
return logcontext.make_deferred_yieldable(
self.verify_json_objects_for_server(
[(server_name, json_object)]
)[0]
self.verify_json_objects_for_server([(server_name, json_object)])[0]
)
def verify_json_objects_for_server(self, server_and_json):
@@ -130,18 +128,15 @@ class Keyring(object):
if not key_ids:
return defer.fail(
SynapseError(
400,
"Not signed by %s" % (server_name,),
Codes.UNAUTHORIZED,
400, "Not signed by %s" % (server_name,), Codes.UNAUTHORIZED
)
)
logger.debug("Verifying for %s with key_ids %s",
server_name, key_ids)
logger.debug("Verifying for %s with key_ids %s", server_name, key_ids)
# add the key request to the queue, but don't start it off yet.
verify_request = VerifyKeyRequest(
server_name, key_ids, json_object, defer.Deferred(),
server_name, key_ids, json_object, defer.Deferred()
)
verify_requests.append(verify_request)
@@ -179,15 +174,13 @@ class Keyring(object):
# any other lookups until we have finished.
# The deferreds are called with no logcontext.
server_to_deferred = {
rq.server_name: defer.Deferred()
for rq in verify_requests
rq.server_name: defer.Deferred() for rq in verify_requests
}
# We want to wait for any previous lookups to complete before
# proceeding.
yield self.wait_for_previous_lookups(
[rq.server_name for rq in verify_requests],
server_to_deferred,
[rq.server_name for rq in verify_requests], server_to_deferred
)
# Actually start fetching keys.
@@ -216,9 +209,7 @@ class Keyring(object):
return res
for verify_request in verify_requests:
verify_request.deferred.addBoth(
remove_deferreds, verify_request,
)
verify_request.deferred.addBoth(remove_deferreds, verify_request)
except Exception:
logger.exception("Error starting key lookups")
@@ -248,7 +239,8 @@ class Keyring(object):
break
logger.info(
"Waiting for existing lookups for %s to complete [loop %i]",
[w[0] for w in wait_on], loop_count,
[w[0] for w in wait_on],
loop_count,
)
with PreserveLoggingContext():
yield defer.DeferredList((w[1] for w in wait_on))
@@ -335,13 +327,14 @@ class Keyring(object):
with PreserveLoggingContext():
for verify_request in requests_missing_keys:
verify_request.deferred.errback(SynapseError(
401,
"No key for %s with id %s" % (
verify_request.server_name, verify_request.key_ids,
),
Codes.UNAUTHORIZED,
))
verify_request.deferred.errback(
SynapseError(
401,
"No key for %s with id %s"
% (verify_request.server_name, verify_request.key_ids),
Codes.UNAUTHORIZED,
)
)
def on_err(err):
with PreserveLoggingContext():
@@ -383,25 +376,26 @@ class Keyring(object):
)
defer.returnValue(result)
except KeyLookupError as e:
logger.warning(
"Key lookup failed from %r: %s", perspective_name, e,
)
logger.warning("Key lookup failed from %r: %s", perspective_name, e)
except Exception as e:
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e),
type(e).__name__,
str(e),
)
defer.returnValue({})
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError))
results = yield logcontext.make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
union_of_keys = {}
for result in results:
@@ -412,32 +406,30 @@ class Keyring(object):
@defer.inlineCallbacks
def get_keys_from_server(self, server_name_and_key_ids):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.get_server_verify_key_v2_direct,
server_name,
key_ids,
)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError))
results = yield logcontext.make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.get_server_verify_key_v2_direct, server_name, key_ids
)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
merged = {}
for result in results:
merged.update(result)
defer.returnValue({
server_name: keys
for server_name, keys in merged.items()
if keys
})
defer.returnValue(
{server_name: keys for server_name, keys in merged.items() if keys}
)
@defer.inlineCallbacks
def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,
perspective_name,
perspective_keys):
def get_server_verify_key_v2_indirect(
self, server_names_and_key_ids, perspective_name, perspective_keys
):
# TODO(mark): Set the minimum_valid_until_ts to that needed by
# the events being validated or the current time if validating
# an incoming request.
@@ -448,9 +440,7 @@ class Keyring(object):
data={
u"server_keys": {
server_name: {
key_id: {
u"minimum_valid_until_ts": 0
} for key_id in key_ids
key_id: {u"minimum_valid_until_ts": 0} for key_id in key_ids
}
for server_name, key_ids in server_names_and_key_ids
}
@@ -458,21 +448,19 @@ class Keyring(object):
long_retries=True,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(
KeyLookupError("Failed to connect to remote server"), e,
)
raise_from(KeyLookupError("Failed to connect to remote server"), e)
except HttpResponseException as e:
raise_from(
KeyLookupError("Remote server returned an error"), e,
)
raise_from(KeyLookupError("Remote server returned an error"), e)
keys = {}
responses = query_response["server_keys"]
for response in responses:
if (u"signatures" not in response
or perspective_name not in response[u"signatures"]):
if (
u"signatures" not in response
or perspective_name not in response[u"signatures"]
):
raise KeyLookupError(
"Key response not signed by perspective server"
" %r" % (perspective_name,)
@@ -482,9 +470,7 @@ class Keyring(object):
for key_id in response[u"signatures"][perspective_name]:
if key_id in perspective_keys:
verify_signed_json(
response,
perspective_name,
perspective_keys[key_id]
response, perspective_name, perspective_keys[key_id]
)
verified = True
@@ -494,7 +480,7 @@ class Keyring(object):
" known key, signed with: %r, known keys: %r",
perspective_name,
list(response[u"signatures"][perspective_name]),
list(perspective_keys)
list(perspective_keys),
)
raise KeyLookupError(
"Response not signed with a known key for perspective"
@@ -508,18 +494,20 @@ class Keyring(object):
keys.setdefault(server_name, {}).update(processed_response)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store_keys,
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
)
for server_name, response_keys in keys.items()
],
consumeErrors=True
).addErrback(unwrapFirstError))
yield logcontext.make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.store_keys,
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
)
for server_name, response_keys in keys.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
defer.returnValue(keys)
@@ -534,26 +522,26 @@ class Keyring(object):
try:
response = yield self.client.get_json(
destination=server_name,
path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
path="/_matrix/key/v2/server/"
+ urllib.parse.quote(requested_key_id),
ignore_backoff=True,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(
KeyLookupError("Failed to connect to remote server"), e,
)
raise_from(KeyLookupError("Failed to connect to remote server"), e)
except HttpResponseException as e:
raise_from(
KeyLookupError("Remote server returned an error"), e,
)
raise_from(KeyLookupError("Remote server returned an error"), e)
if (u"signatures" not in response
or server_name not in response[u"signatures"]):
if (
u"signatures" not in response
or server_name not in response[u"signatures"]
):
raise KeyLookupError("Key response not signed by remote server")
if response["server_name"] != server_name:
raise KeyLookupError("Expected a response for server %r not %r" % (
server_name, response["server_name"]
))
raise KeyLookupError(
"Expected a response for server %r not %r"
% (server_name, response["server_name"])
)
response_keys = yield self.process_v2_response(
from_server=server_name,
@@ -564,16 +552,12 @@ class Keyring(object):
keys.update(response_keys)
yield self.store_keys(
server_name=server_name,
from_server=server_name,
verify_keys=keys,
server_name=server_name, from_server=server_name, verify_keys=keys
)
defer.returnValue({server_name: keys})
@defer.inlineCallbacks
def process_v2_response(
self, from_server, response_json, requested_ids=[],
):
def process_v2_response(self, from_server, response_json, requested_ids=[]):
"""Parse a 'Server Keys' structure from the result of a /key request
This is used to parse either the entirety of the response from
@@ -627,20 +611,13 @@ class Keyring(object):
for key_id in response_json["signatures"].get(server_name, {}):
if key_id not in response_json["verify_keys"]:
raise KeyLookupError(
"Key response must include verification keys for all"
" signatures"
"Key response must include verification keys for all" " signatures"
)
if key_id in verify_keys:
verify_signed_json(
response_json,
server_name,
verify_keys[key_id]
)
verify_signed_json(response_json, server_name, verify_keys[key_id])
signed_key_json = sign_json(
response_json,
self.config.server_name,
self.config.signing_key[0],
response_json, self.config.server_name, self.config.signing_key[0]
)
signed_key_json_bytes = encode_canonical_json(signed_key_json)
@@ -653,21 +630,23 @@ class Keyring(object):
response_keys.update(verify_keys)
response_keys.update(old_verify_keys)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=from_server,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
for key_id in updated_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError))
yield logcontext.make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=from_server,
ts_now_ms=time_now_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
for key_id in updated_key_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
defer.returnValue(response_keys)
@@ -681,16 +660,21 @@ class Keyring(object):
A deferred that completes when the keys are stored.
"""
# TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.store_server_verify_key,
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError))
return logcontext.make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.store.store_server_verify_key,
server_name,
server_name,
key.time_added,
key,
)
for key_id, key in verify_keys.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
@defer.inlineCallbacks
@@ -713,17 +697,19 @@ def _handle_key_deferred(verify_request):
except KeyLookupError as e:
logger.warn(
"Failed to download keys for %s: %s %s",
server_name, type(e).__name__, str(e),
server_name,
type(e).__name__,
str(e),
)
raise SynapseError(
502,
"Error downloading keys for %s" % (server_name,),
Codes.UNAUTHORIZED,
502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED
)
except Exception as e:
logger.exception(
"Got Exception when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e),
server_name,
type(e).__name__,
str(e),
)
raise SynapseError(
401,
@@ -733,22 +719,24 @@ def _handle_key_deferred(verify_request):
json_object = verify_request.json_object
logger.debug("Got key %s %s:%s for server %s, verifying" % (
key_id, verify_key.alg, verify_key.version, server_name,
))
logger.debug(
"Got key %s %s:%s for server %s, verifying"
% (key_id, verify_key.alg, verify_key.version, server_name)
)
try:
verify_signed_json(json_object, server_name, verify_key)
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
server_name, verify_key.alg, verify_key.version,
server_name,
verify_key.alg,
verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s: %s" % (
server_name, verify_key.alg, verify_key.version, str(e),
),
"Invalid signature for server %s with key %s:%s: %s"
% (server_name, verify_key.alg, verify_key.version, str(e)),
Codes.UNAUTHORIZED,
)

325
synapse/handlers/stats.py Normal file
View File

@@ -0,0 +1,325 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
class StatsHandler(StateDeltasHandler):
"""Handles keeping the *_stats tables updated with a simple time-series of
information about the users, rooms and media on the server, such that admins
have some idea of who is consuming their resources.
Heavily derived from UserDirectoryHandler
"""
def __init__(self, hs):
super(StatsHandler, self).__init__(hs)
self.hs = hs
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.stats_bucket_size = hs.config.stats_bucket_size
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
# We kick this off so that we don't have to wait for a change before
# we start populating stats
self.clock.call_later(0, self.notify_new_event)
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
if not deltas:
return
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
event_processing_positions.labels("stats").set(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""
Called with the state deltas to process
"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
if token is not None and token >= stream_id:
logger.debug(
"Ignoring: %s as earlier than this room's initial ingestion event",
event_id,
)
continue
if event_id is None and prev_event_id is None:
# Errr...
continue
event_content = {}
if event_id is not None:
event_content = (yield self.store.get_event(event_id)).content or {}
# quantise time to the nearest bucket
now = yield self.store.get_received_ts(event_id)
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
if prev_event_id is not None:
prev_event_content = (
yield self.store.get_event(prev_event_id)
).content
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
)
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
)
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
)
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
logger.error(err)
raise ValueError(err)
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
)
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
)
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
)
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
)
else:
err = "%s is not a valid membership" % (repr(membership),)
logger.error(err)
raise ValueError(err)
user_id = state_key
if self.is_mine_id(user_id):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
yield self.store.update_room_state(
room_id,
{
"join_rules": None,
"history_visibility": None,
"encryption": None,
"name": None,
"topic": None,
"avatar": None,
"canonical_alias": None,
},
)
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.RoomHistoryVisibility:
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
room_id, {"encryption": event_content.get("algorithm")}
)
elif typ == EventTypes.Name:
yield self.store.update_room_state(
room_id, {"name": event_content.get("name")}
)
elif typ == EventTypes.Topic:
yield self.store.update_room_state(
room_id, {"topic": event_content.get("topic")}
)
elif typ == EventTypes.RoomAvatar:
yield self.store.update_room_state(
room_id, {"avatar": event_content.get("url")}
)
elif typ == EventTypes.CanonicalAlias:
yield self.store.update_room_state(
room_id, {"canonical_alias": event_content.get("alias")}
)
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
Increment/decrement a user's number of public rooms when a room they are
in changes to/from public visibility.
Args:
ts (int): Timestamp in seconds
room_id (str)
is_public (bool)
"""
# For now, blindly iterate over all local users in the room so that
# we can handle the whole problem of copying buckets over as needed
user_ids = yield self.store.get_users_in_room(room_id)
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
)
@defer.inlineCallbacks
def _is_public_room(self, room_id):
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
history_visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility
)
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
(
history_visibility
and history_visibility.content.get("history_visibility")
== "world_readable"
)
):
defer.returnValue(True)
else:
defer.returnValue(False)

View File

@@ -74,14 +74,6 @@ REQUIREMENTS = [
"attrs>=17.4.0",
"netaddr>=0.7.18",
# requests is a transitive dep of treq, and urlib3 is a transitive dep
# of requests, as well as of sentry-sdk.
#
# As of requests 2.21, requests does not yet support urllib3 1.25.
# (If we do not pin it here, pip will give us the latest urllib3
# due to the dep via sentry-sdk.)
"urllib3<1.25",
]
CONDITIONAL_REQUIREMENTS = {

View File

@@ -201,6 +201,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
content = parse_json_object_from_request(request)
# Pull out the relationship early if the client sent us something
# which cannot possibly be processed by us.
if content.get("m.relates_to", "not None") is None:
del content["m.relates_to"]
event_dict = {
"type": event_type,
"content": content,

View File

@@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.user_directory import UserDirectoryHandler
@@ -139,6 +140,7 @@ class HomeServer(object):
'acme_handler',
'auth_handler',
'device_handler',
'stats_handler',
'e2e_keys_handler',
'e2e_room_keys_handler',
'event_handler',
@@ -191,6 +193,7 @@ class HomeServer(object):
REQUIRED_ON_MASTER_STARTUP = [
"user_directory_handler",
"stats_handler"
]
# This is overridden in derived application classes
@@ -474,6 +477,9 @@ class HomeServer(object):
def build_secrets(self):
return Secrets()
def build_stats_handler(self):
return StatsHandler(self)
def build_spam_checker(self):
return SpamChecker(self)

View File

@@ -55,6 +55,7 @@ from .roommember import RoomMemberStore
from .search import SearchStore
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
@@ -100,6 +101,7 @@ class DataStore(
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
StatsStore,
RelationsStore,
):
def __init__(self, db_conn, hs):

View File

@@ -611,3 +611,27 @@ class EventsWorkerStore(SQLBaseStore):
return res
return self.runInteraction("get_rejection_reasons", f)
def _get_total_state_event_counts_txn(self, txn, room_id):
"""
See get_state_event_counts.
"""
sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
txn.execute(sql, (room_id,))
row = txn.fetchone()
return row[0] if row else 0
def get_total_state_event_counts(self, room_id):
"""
Gets the total number of state events in a room.
Args:
room_id (str)
Returns:
Deferred[int]
"""
return self.runInteraction(
"get_total_state_event_counts",
self._get_total_state_event_counts_txn, room_id
)

View File

@@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return self.runInteraction("get_room_summary", _get_room_summary_txn)
def _get_user_count_in_room_txn(self, txn, room_id, membership):
"""
See get_user_count_in_room.
"""
sql = (
"SELECT count(*) FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id "
" AND m.room_id = c.room_id "
" AND m.user_id = c.state_key"
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)
txn.execute(sql, (room_id, membership))
row = txn.fetchone()
return row[0]
def get_user_count_in_room(self, room_id, membership):
"""
Get the user count in a room with a particular membership.
Args:
room_id (str)
membership (Membership)
Returns:
Deferred[int]
"""
return self.runInteraction(
"get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
)
@cached()
def get_invited_rooms_for_user(self, user_id):
""" Get all the rooms the user is invited to

View File

@@ -0,0 +1,80 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE stats_stream_pos (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT,
CHECK (Lock='X')
);
INSERT INTO stats_stream_pos (stream_id) VALUES (null);
CREATE TABLE user_stats (
user_id TEXT NOT NULL,
ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL
);
CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
CREATE TABLE room_stats (
room_id TEXT NOT NULL,
ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
current_state_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
state_events INT NOT NULL
);
CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
-- cache of current room state; useful for the publicRooms list
CREATE TABLE room_state (
room_id TEXT NOT NULL,
join_rules TEXT,
history_visibility TEXT,
encryption TEXT,
name TEXT,
topic TEXT,
avatar TEXT,
canonical_alias TEXT
-- get aliases straight from the right table
);
CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
CREATE TABLE room_stats_earliest_token (
room_id TEXT NOT NULL,
token BIGINT NOT NULL
);
CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
-- Set up staging tables
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_stats_createtables', '{}');
-- Run through each room and update stats
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', 'populate_stats_createtables');
-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_cleanup', '{}', 'populate_stats_process_rooms');

View File

@@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore):
"get_current_state_deltas", get_current_state_deltas_txn
)
def get_max_stream_id_in_current_state_deltas(self):
return self._simple_select_one_onecol(
def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
return self._simple_select_one_onecol_txn(
txn,
table="current_state_delta_stream",
keyvalues={},
retcol="COALESCE(MAX(stream_id), -1)",
desc="get_max_stream_id_in_current_state_deltas",
)
def get_max_stream_id_in_current_state_deltas(self):
return self.runInteraction(
"get_max_stream_id_in_current_state_deltas",
self._get_max_stream_id_in_current_state_deltas_txn,
)

450
synapse/storage/stats.py Normal file
View File

@@ -0,0 +1,450 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
"joined_members",
"invited_members",
"left_members",
"banned_members",
"state_events",
),
"user": ("public_rooms", "private_rooms"),
}
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
TEMP_TABLE = "_temp_populate_stats"
class StatsStore(StateDeltasStore):
def __init__(self, db_conn, hs):
super(StatsStore, self).__init__(db_conn, hs)
self.server_name = hs.hostname
self.clock = self.hs.get_clock()
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
self.register_background_update_handler(
"populate_stats_createtables", self._populate_stats_createtables
)
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
"populate_stats_cleanup", self._populate_stats_cleanup
)
@defer.inlineCallbacks
def _populate_stats_createtables(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_createtables")
defer.returnValue(1)
# Get all the rooms that we want to process.
def _make_staging_area(txn):
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
# Get rooms we want to process from the database
sql = """
SELECT room_id, count(*) FROM current_state_events
GROUP BY room_id
"""
txn.execute(sql)
rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
del rooms
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
self.get_earliest_token_for_room_stats.invalidate_all()
yield self._end_background_update("populate_stats_createtables")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
"""
Update the user directory stream position, then clean up the old tables.
"""
if not self.stats_enabled:
yield self._end_background_update("populate_stats_cleanup")
defer.returnValue(1)
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
yield self.update_stats_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
yield self._end_background_update("populate_stats_cleanup")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
defer.returnValue(1)
# If we don't have progress filed, delete everything.
if not progress:
yield self.delete_all_stats()
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
SELECT room_id, events FROM %s_rooms
ORDER BY events DESC
LIMIT 250
""" % (
TEMP_TABLE,
)
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
if not rooms_to_work_on:
return None
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
"populate_stats_temp_read", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_stats_process_rooms")
defer.returnValue(1)
logger.info(
"Processing the next %d rooms of %d remaining",
(len(rooms_to_work_on), progress["remaining"]),
)
# Number of state events we've processed by going through each room
processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
current_state_ids = yield self.get_current_state_ids(room_id)
join_rules = yield self.get_event(
current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
)
history_visibility = yield self.get_event(
current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
allow_none=True,
)
encryption = yield self.get_event(
current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
)
name = yield self.get_event(
current_state_ids.get((EventTypes.Name, "")), allow_none=True
)
topic = yield self.get_event(
current_state_ids.get((EventTypes.Topic, "")), allow_none=True
)
avatar = yield self.get_event(
current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
)
canonical_alias = yield self.get_event(
current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
)
def _or_none(x, arg):
if x:
return x.content.get(arg)
return None
yield self.update_room_state(
room_id,
{
"join_rules": _or_none(join_rules, "join_rule"),
"history_visibility": _or_none(
history_visibility, "history_visibility"
),
"encryption": _or_none(encryption, "algorithm"),
"name": _or_none(name, "name"),
"topic": _or_none(topic, "topic"),
"avatar": _or_none(avatar, "url"),
"canonical_alias": _or_none(canonical_alias, "alias"),
},
)
now = self.hs.get_reactor().seconds()
# quantise time to the nearest bucket
now = (now // self.stats_bucket_size) * self.stats_bucket_size
def _fetch_data(txn):
# Get the current token of the room
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
current_state_events = len(current_state_ids)
joined_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.JOIN
)
invited_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.INVITE
)
left_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.LEAVE
)
banned_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.BAN
)
total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
self._update_stats_txn(
txn,
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": joined_members,
"invited_members": invited_members,
"left_members": left_members,
"banned_members": banned_members,
"state_events": total_state_events,
},
)
self._simple_insert_txn(
txn,
"room_stats_earliest_token",
{"room_id": room_id, "token": current_token},
)
yield self.runInteraction("update_room_stats", _fetch_data)
# We've finished a room. Delete it from the table.
yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
processed_event_count += event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
defer.returnValue(processed_event_count)
defer.returnValue(processed_event_count)
def delete_all_stats(self):
"""
Delete all statistics records.
"""
def _delete_all_stats_txn(txn):
txn.execute("DELETE FROM room_state")
txn.execute("DELETE FROM room_stats")
txn.execute("DELETE FROM room_stats_earliest_token")
txn.execute("DELETE FROM user_stats")
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
def get_stats_stream_pos(self):
return self._simple_select_one_onecol(
table="stats_stream_pos",
keyvalues={},
retcol="stream_id",
desc="stats_stream_pos",
)
def update_stats_stream_pos(self, stream_id):
return self._simple_update_one(
table="stats_stream_pos",
keyvalues={},
updatevalues={"stream_id": stream_id},
desc="update_stats_stream_pos",
)
def update_room_state(self, room_id, fields):
"""
Args:
room_id (str)
fields (dict[str:Any])
"""
return self._simple_upsert(
table="room_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
def get_deltas_for_room(self, room_id, start, size=100):
"""
Get statistics deltas for a given room.
Args:
room_id (str)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
ABSOLUTE_STATS_FIELDS["room"] and "ts".
"""
return self._simple_select_list_paginate(
"room_stats",
{"room_id": room_id},
"ts",
start,
size,
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
order_direction="DESC",
)
def get_all_room_state(self):
return self._simple_select_list(
"room_state", None, retcols=("name", "topic", "canonical_alias")
)
@cached()
def get_earliest_token_for_room_stats(self, room_id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
start of the background task and any particular room's stats
being calculated.
Returns:
Deferred[int]
"""
return self._simple_select_one_onecol(
"room_stats_earliest_token",
{"room_id": room_id},
retcol="token",
allow_none=True,
)
def update_stats(self, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert(
table=table,
keyvalues={id_col: stats_id, "ts": ts},
values=fields,
desc="update_stats",
)
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert_txn(
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table, id_col = TYPE_TO_ROOM[stats_type]
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
return
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
self._simple_insert_txn(txn, table=table, values=values)
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
return self.runInteraction("update_stats_delta", _update_stats_delta)

View File

@@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
class StatsRoomTests(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
def _add_background_updates(self):
"""
Add the background updates we need to run.
"""
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Stats disabled, shouldn't have done anything
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
def test_initial_earliest_token(self):
"""
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2_token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
u3_token = self.login("u3", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Begin the ingestion by creating the temp tables. This will also store
# the position that the deltas should begin at, once they take over.
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
# Now, before the table is actually ingested, add some more events.
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.store._all_done = False
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
self.reactor.advance(86401)
# Now add some more events, triggering ingestion. Because of the stream
# position being set to before the events sent in the middle, a simpler
# implementation would reprocess those events, and say there were four
# users, not three.
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
# The newest has 3
self.assertEqual(r[0]["joined_members"], 3)
def test_incorrect_state_transition(self):
"""
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
(JOIN, INVITE, LEAVE, BAN), an error is raised.
"""
events = {
"a1": {"membership": Membership.LEAVE},
"a2": {"membership": "not a real thing"},
}
def get_event(event_id):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": "bleb",
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid prev_membership"
)
# And the other way...
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": "bleb",
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)

View File

@@ -127,3 +127,20 @@ class RestHelper(object):
)
return channel.json_body
def send_state(self, room_id, event_type, body, tok, expect_code=200):
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
if tok:
path = path + "?access_token=%s" % tok
request, channel = make_request(
self.hs.get_reactor(), "PUT", path, json.dumps(body).encode('utf8')
)
render(request, self.resource, self.hs.get_reactor())
assert int(channel.result["code"]) == expect_code, (
"Expected: %d, got: %d, resp: %r"
% (expect_code, int(channel.result["code"]), channel.result["body"])
)
return channel.json_body