Compare commits
10 Commits
release-v0
...
travis/nul
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
177f2b838c | ||
|
|
f9d7d3aa89 | ||
|
|
1a94de60e8 | ||
|
|
73f1de31d1 | ||
|
|
62388a1e44 | ||
|
|
ae5521be9c | ||
|
|
66b75e2d81 | ||
|
|
2dfbeea66f | ||
|
|
b898a5600a | ||
|
|
4a30e4acb4 |
12
CHANGES.md
12
CHANGES.md
@@ -1,16 +1,8 @@
|
||||
Synapse 0.99.5.2 (2019-05-30)
|
||||
=============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug where we leaked extremities when we soft failed events, leading to performance degradation. ([\#5274](https://github.com/matrix-org/synapse/issues/5274), [\#5278](https://github.com/matrix-org/synapse/issues/5278), [\#5291](https://github.com/matrix-org/synapse/issues/5291))
|
||||
|
||||
|
||||
Synapse 0.99.5.1 (2019-05-22)
|
||||
=============================
|
||||
|
||||
0.99.5.1 supersedes 0.99.5 due to malformed debian changelog - no functional changes.
|
||||
No significant changes.
|
||||
|
||||
|
||||
Synapse 0.99.5 (2019-05-22)
|
||||
===========================
|
||||
|
||||
1
changelog.d/4338.feature
Normal file
1
changelog.d/4338.feature
Normal file
@@ -0,0 +1 @@
|
||||
Synapse now more efficiently collates room statistics.
|
||||
1
changelog.d/5200.bugfix
Normal file
1
changelog.d/5200.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix worker registration bug caused by ClientReaderSlavedStore being unable to see get_profileinfo.
|
||||
1
changelog.d/5230.misc
Normal file
1
changelog.d/5230.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove urllib3 pin as requests 2.22.0 has been released supporting urllib3 1.25.2.
|
||||
1
changelog.d/5232.misc
Normal file
1
changelog.d/5232.misc
Normal file
@@ -0,0 +1 @@
|
||||
Run black on synapse.crypto.keyring.
|
||||
1
changelog.d/5239.bugfix
Normal file
1
changelog.d/5239.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix 500 Internal Server Error when sending an event with `m.relates_to: null`.
|
||||
6
debian/changelog
vendored
6
debian/changelog
vendored
@@ -1,9 +1,3 @@
|
||||
matrix-synapse-py3 (0.99.5.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 0.99.5.2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 30 May 2019 16:28:07 +0100
|
||||
|
||||
matrix-synapse-py3 (0.99.5.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 0.99.5.1.
|
||||
|
||||
@@ -161,7 +161,7 @@ specify values for `SYNAPSE_CONFIG_PATH`, `SYNAPSE_SERVER_NAME` and
|
||||
example:
|
||||
|
||||
```
|
||||
docker run -it --rm
|
||||
docker run -it --rm \
|
||||
--mount type=volume,src=synapse-data,dst=/data \
|
||||
-e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
|
||||
-e SYNAPSE_SERVER_NAME=my.matrix.host \
|
||||
|
||||
@@ -1153,6 +1153,22 @@ password_config:
|
||||
#
|
||||
|
||||
|
||||
|
||||
# Local statistics collection. Used in populating the room directory.
|
||||
#
|
||||
# 'bucket_size' controls how large each statistics timeslice is. It can
|
||||
# be defined in a human readable short form -- e.g. "1d", "1y".
|
||||
#
|
||||
# 'retention' controls how long historical statistics will be kept for.
|
||||
# It can be defined in a human readable short form -- e.g. "1d", "1y".
|
||||
#
|
||||
#
|
||||
#stats:
|
||||
# enabled: true
|
||||
# bucket_size: 1d
|
||||
# retention: 1y
|
||||
|
||||
|
||||
# Server Notices room configuration
|
||||
#
|
||||
# Uncomment this section to enable a room which can be used to send notices
|
||||
|
||||
@@ -27,4 +27,4 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "0.99.5.2"
|
||||
__version__ = "0.99.5.1"
|
||||
|
||||
@@ -79,6 +79,7 @@ class EventTypes(object):
|
||||
|
||||
RoomHistoryVisibility = "m.room.history_visibility"
|
||||
CanonicalAlias = "m.room.canonical_alias"
|
||||
Encryption = "m.room.encryption"
|
||||
RoomAvatar = "m.room.avatar"
|
||||
RoomEncryption = "m.room.encryption"
|
||||
GuestAccess = "m.room.guest_access"
|
||||
|
||||
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
@@ -81,6 +82,7 @@ class ClientReaderSlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedClientIpStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from .api import ApiConfig
|
||||
from .appservice import AppServiceConfig
|
||||
from .captcha import CaptchaConfig
|
||||
@@ -36,20 +37,41 @@ from .saml2_config import SAML2Config
|
||||
from .server import ServerConfig
|
||||
from .server_notices_config import ServerNoticesConfig
|
||||
from .spam_checker import SpamCheckerConfig
|
||||
from .stats import StatsConfig
|
||||
from .tls import TlsConfig
|
||||
from .user_directory import UserDirectoryConfig
|
||||
from .voip import VoipConfig
|
||||
from .workers import WorkerConfig
|
||||
|
||||
|
||||
class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig,
|
||||
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
|
||||
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
|
||||
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
|
||||
JWTConfig, PasswordConfig, EmailConfig,
|
||||
WorkerConfig, PasswordAuthProviderConfig, PushConfig,
|
||||
SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,
|
||||
ConsentConfig,
|
||||
ServerNoticesConfig, RoomDirectoryConfig,
|
||||
):
|
||||
class HomeServerConfig(
|
||||
ServerConfig,
|
||||
TlsConfig,
|
||||
DatabaseConfig,
|
||||
LoggingConfig,
|
||||
RatelimitConfig,
|
||||
ContentRepositoryConfig,
|
||||
CaptchaConfig,
|
||||
VoipConfig,
|
||||
RegistrationConfig,
|
||||
MetricsConfig,
|
||||
ApiConfig,
|
||||
AppServiceConfig,
|
||||
KeyConfig,
|
||||
SAML2Config,
|
||||
CasConfig,
|
||||
JWTConfig,
|
||||
PasswordConfig,
|
||||
EmailConfig,
|
||||
WorkerConfig,
|
||||
PasswordAuthProviderConfig,
|
||||
PushConfig,
|
||||
SpamCheckerConfig,
|
||||
GroupsConfig,
|
||||
UserDirectoryConfig,
|
||||
ConsentConfig,
|
||||
StatsConfig,
|
||||
ServerNoticesConfig,
|
||||
RoomDirectoryConfig,
|
||||
):
|
||||
pass
|
||||
|
||||
60
synapse/config/stats.py
Normal file
60
synapse/config/stats.py
Normal file
@@ -0,0 +1,60 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division
|
||||
|
||||
import sys
|
||||
|
||||
from ._base import Config
|
||||
|
||||
|
||||
class StatsConfig(Config):
|
||||
"""Stats Configuration
|
||||
Configuration for the behaviour of synapse's stats engine
|
||||
"""
|
||||
|
||||
def read_config(self, config):
|
||||
self.stats_enabled = True
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_retention = sys.maxsize
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
|
||||
self.stats_bucket_size = (
|
||||
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
|
||||
)
|
||||
self.stats_retention = (
|
||||
self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
/ 1000
|
||||
)
|
||||
|
||||
def default_config(self, config_dir_path, server_name, **kwargs):
|
||||
return """
|
||||
# Local statistics collection. Used in populating the room directory.
|
||||
#
|
||||
# 'bucket_size' controls how large each statistics timeslice is. It can
|
||||
# be defined in a human readable short form -- e.g. "1d", "1y".
|
||||
#
|
||||
# 'retention' controls how long historical statistics will be kept for.
|
||||
# It can be defined in a human readable short form -- e.g. "1d", "1y".
|
||||
#
|
||||
#
|
||||
#stats:
|
||||
# enabled: true
|
||||
# bucket_size: 1d
|
||||
# retention: 1y
|
||||
"""
|
||||
@@ -56,9 +56,9 @@ from synapse.util.retryutils import NotRetryingDestination
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
VerifyKeyRequest = namedtuple("VerifyRequest", (
|
||||
"server_name", "key_ids", "json_object", "deferred"
|
||||
))
|
||||
VerifyKeyRequest = namedtuple(
|
||||
"VerifyRequest", ("server_name", "key_ids", "json_object", "deferred")
|
||||
)
|
||||
"""
|
||||
A request for a verify key to verify a JSON object.
|
||||
|
||||
@@ -96,9 +96,7 @@ class Keyring(object):
|
||||
|
||||
def verify_json_for_server(self, server_name, json_object):
|
||||
return logcontext.make_deferred_yieldable(
|
||||
self.verify_json_objects_for_server(
|
||||
[(server_name, json_object)]
|
||||
)[0]
|
||||
self.verify_json_objects_for_server([(server_name, json_object)])[0]
|
||||
)
|
||||
|
||||
def verify_json_objects_for_server(self, server_and_json):
|
||||
@@ -130,18 +128,15 @@ class Keyring(object):
|
||||
if not key_ids:
|
||||
return defer.fail(
|
||||
SynapseError(
|
||||
400,
|
||||
"Not signed by %s" % (server_name,),
|
||||
Codes.UNAUTHORIZED,
|
||||
400, "Not signed by %s" % (server_name,), Codes.UNAUTHORIZED
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug("Verifying for %s with key_ids %s",
|
||||
server_name, key_ids)
|
||||
logger.debug("Verifying for %s with key_ids %s", server_name, key_ids)
|
||||
|
||||
# add the key request to the queue, but don't start it off yet.
|
||||
verify_request = VerifyKeyRequest(
|
||||
server_name, key_ids, json_object, defer.Deferred(),
|
||||
server_name, key_ids, json_object, defer.Deferred()
|
||||
)
|
||||
verify_requests.append(verify_request)
|
||||
|
||||
@@ -179,15 +174,13 @@ class Keyring(object):
|
||||
# any other lookups until we have finished.
|
||||
# The deferreds are called with no logcontext.
|
||||
server_to_deferred = {
|
||||
rq.server_name: defer.Deferred()
|
||||
for rq in verify_requests
|
||||
rq.server_name: defer.Deferred() for rq in verify_requests
|
||||
}
|
||||
|
||||
# We want to wait for any previous lookups to complete before
|
||||
# proceeding.
|
||||
yield self.wait_for_previous_lookups(
|
||||
[rq.server_name for rq in verify_requests],
|
||||
server_to_deferred,
|
||||
[rq.server_name for rq in verify_requests], server_to_deferred
|
||||
)
|
||||
|
||||
# Actually start fetching keys.
|
||||
@@ -216,9 +209,7 @@ class Keyring(object):
|
||||
return res
|
||||
|
||||
for verify_request in verify_requests:
|
||||
verify_request.deferred.addBoth(
|
||||
remove_deferreds, verify_request,
|
||||
)
|
||||
verify_request.deferred.addBoth(remove_deferreds, verify_request)
|
||||
except Exception:
|
||||
logger.exception("Error starting key lookups")
|
||||
|
||||
@@ -248,7 +239,8 @@ class Keyring(object):
|
||||
break
|
||||
logger.info(
|
||||
"Waiting for existing lookups for %s to complete [loop %i]",
|
||||
[w[0] for w in wait_on], loop_count,
|
||||
[w[0] for w in wait_on],
|
||||
loop_count,
|
||||
)
|
||||
with PreserveLoggingContext():
|
||||
yield defer.DeferredList((w[1] for w in wait_on))
|
||||
@@ -335,13 +327,14 @@ class Keyring(object):
|
||||
|
||||
with PreserveLoggingContext():
|
||||
for verify_request in requests_missing_keys:
|
||||
verify_request.deferred.errback(SynapseError(
|
||||
401,
|
||||
"No key for %s with id %s" % (
|
||||
verify_request.server_name, verify_request.key_ids,
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
))
|
||||
verify_request.deferred.errback(
|
||||
SynapseError(
|
||||
401,
|
||||
"No key for %s with id %s"
|
||||
% (verify_request.server_name, verify_request.key_ids),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
)
|
||||
|
||||
def on_err(err):
|
||||
with PreserveLoggingContext():
|
||||
@@ -383,25 +376,26 @@ class Keyring(object):
|
||||
)
|
||||
defer.returnValue(result)
|
||||
except KeyLookupError as e:
|
||||
logger.warning(
|
||||
"Key lookup failed from %r: %s", perspective_name, e,
|
||||
)
|
||||
logger.warning("Key lookup failed from %r: %s", perspective_name, e)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Unable to get key from %r: %s %s",
|
||||
perspective_name,
|
||||
type(e).__name__, str(e),
|
||||
type(e).__name__,
|
||||
str(e),
|
||||
)
|
||||
|
||||
defer.returnValue({})
|
||||
|
||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_key, p_name, p_keys)
|
||||
for p_name, p_keys in self.perspective_servers.items()
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError))
|
||||
results = yield logcontext.make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_key, p_name, p_keys)
|
||||
for p_name, p_keys in self.perspective_servers.items()
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
union_of_keys = {}
|
||||
for result in results:
|
||||
@@ -412,32 +406,30 @@ class Keyring(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_keys_from_server(self, server_name_and_key_ids):
|
||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.get_server_verify_key_v2_direct,
|
||||
server_name,
|
||||
key_ids,
|
||||
)
|
||||
for server_name, key_ids in server_name_and_key_ids
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError))
|
||||
results = yield logcontext.make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.get_server_verify_key_v2_direct, server_name, key_ids
|
||||
)
|
||||
for server_name, key_ids in server_name_and_key_ids
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
merged = {}
|
||||
for result in results:
|
||||
merged.update(result)
|
||||
|
||||
defer.returnValue({
|
||||
server_name: keys
|
||||
for server_name, keys in merged.items()
|
||||
if keys
|
||||
})
|
||||
defer.returnValue(
|
||||
{server_name: keys for server_name, keys in merged.items() if keys}
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,
|
||||
perspective_name,
|
||||
perspective_keys):
|
||||
def get_server_verify_key_v2_indirect(
|
||||
self, server_names_and_key_ids, perspective_name, perspective_keys
|
||||
):
|
||||
# TODO(mark): Set the minimum_valid_until_ts to that needed by
|
||||
# the events being validated or the current time if validating
|
||||
# an incoming request.
|
||||
@@ -448,9 +440,7 @@ class Keyring(object):
|
||||
data={
|
||||
u"server_keys": {
|
||||
server_name: {
|
||||
key_id: {
|
||||
u"minimum_valid_until_ts": 0
|
||||
} for key_id in key_ids
|
||||
key_id: {u"minimum_valid_until_ts": 0} for key_id in key_ids
|
||||
}
|
||||
for server_name, key_ids in server_names_and_key_ids
|
||||
}
|
||||
@@ -458,21 +448,19 @@ class Keyring(object):
|
||||
long_retries=True,
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
raise_from(
|
||||
KeyLookupError("Failed to connect to remote server"), e,
|
||||
)
|
||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||
except HttpResponseException as e:
|
||||
raise_from(
|
||||
KeyLookupError("Remote server returned an error"), e,
|
||||
)
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
|
||||
keys = {}
|
||||
|
||||
responses = query_response["server_keys"]
|
||||
|
||||
for response in responses:
|
||||
if (u"signatures" not in response
|
||||
or perspective_name not in response[u"signatures"]):
|
||||
if (
|
||||
u"signatures" not in response
|
||||
or perspective_name not in response[u"signatures"]
|
||||
):
|
||||
raise KeyLookupError(
|
||||
"Key response not signed by perspective server"
|
||||
" %r" % (perspective_name,)
|
||||
@@ -482,9 +470,7 @@ class Keyring(object):
|
||||
for key_id in response[u"signatures"][perspective_name]:
|
||||
if key_id in perspective_keys:
|
||||
verify_signed_json(
|
||||
response,
|
||||
perspective_name,
|
||||
perspective_keys[key_id]
|
||||
response, perspective_name, perspective_keys[key_id]
|
||||
)
|
||||
verified = True
|
||||
|
||||
@@ -494,7 +480,7 @@ class Keyring(object):
|
||||
" known key, signed with: %r, known keys: %r",
|
||||
perspective_name,
|
||||
list(response[u"signatures"][perspective_name]),
|
||||
list(perspective_keys)
|
||||
list(perspective_keys),
|
||||
)
|
||||
raise KeyLookupError(
|
||||
"Response not signed with a known key for perspective"
|
||||
@@ -508,18 +494,20 @@ class Keyring(object):
|
||||
|
||||
keys.setdefault(server_name, {}).update(processed_response)
|
||||
|
||||
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store_keys,
|
||||
server_name=server_name,
|
||||
from_server=perspective_name,
|
||||
verify_keys=response_keys,
|
||||
)
|
||||
for server_name, response_keys in keys.items()
|
||||
],
|
||||
consumeErrors=True
|
||||
).addErrback(unwrapFirstError))
|
||||
yield logcontext.make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store_keys,
|
||||
server_name=server_name,
|
||||
from_server=perspective_name,
|
||||
verify_keys=response_keys,
|
||||
)
|
||||
for server_name, response_keys in keys.items()
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
defer.returnValue(keys)
|
||||
|
||||
@@ -534,26 +522,26 @@ class Keyring(object):
|
||||
try:
|
||||
response = yield self.client.get_json(
|
||||
destination=server_name,
|
||||
path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
|
||||
path="/_matrix/key/v2/server/"
|
||||
+ urllib.parse.quote(requested_key_id),
|
||||
ignore_backoff=True,
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
raise_from(
|
||||
KeyLookupError("Failed to connect to remote server"), e,
|
||||
)
|
||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||
except HttpResponseException as e:
|
||||
raise_from(
|
||||
KeyLookupError("Remote server returned an error"), e,
|
||||
)
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
|
||||
if (u"signatures" not in response
|
||||
or server_name not in response[u"signatures"]):
|
||||
if (
|
||||
u"signatures" not in response
|
||||
or server_name not in response[u"signatures"]
|
||||
):
|
||||
raise KeyLookupError("Key response not signed by remote server")
|
||||
|
||||
if response["server_name"] != server_name:
|
||||
raise KeyLookupError("Expected a response for server %r not %r" % (
|
||||
server_name, response["server_name"]
|
||||
))
|
||||
raise KeyLookupError(
|
||||
"Expected a response for server %r not %r"
|
||||
% (server_name, response["server_name"])
|
||||
)
|
||||
|
||||
response_keys = yield self.process_v2_response(
|
||||
from_server=server_name,
|
||||
@@ -564,16 +552,12 @@ class Keyring(object):
|
||||
keys.update(response_keys)
|
||||
|
||||
yield self.store_keys(
|
||||
server_name=server_name,
|
||||
from_server=server_name,
|
||||
verify_keys=keys,
|
||||
server_name=server_name, from_server=server_name, verify_keys=keys
|
||||
)
|
||||
defer.returnValue({server_name: keys})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process_v2_response(
|
||||
self, from_server, response_json, requested_ids=[],
|
||||
):
|
||||
def process_v2_response(self, from_server, response_json, requested_ids=[]):
|
||||
"""Parse a 'Server Keys' structure from the result of a /key request
|
||||
|
||||
This is used to parse either the entirety of the response from
|
||||
@@ -627,20 +611,13 @@ class Keyring(object):
|
||||
for key_id in response_json["signatures"].get(server_name, {}):
|
||||
if key_id not in response_json["verify_keys"]:
|
||||
raise KeyLookupError(
|
||||
"Key response must include verification keys for all"
|
||||
" signatures"
|
||||
"Key response must include verification keys for all" " signatures"
|
||||
)
|
||||
if key_id in verify_keys:
|
||||
verify_signed_json(
|
||||
response_json,
|
||||
server_name,
|
||||
verify_keys[key_id]
|
||||
)
|
||||
verify_signed_json(response_json, server_name, verify_keys[key_id])
|
||||
|
||||
signed_key_json = sign_json(
|
||||
response_json,
|
||||
self.config.server_name,
|
||||
self.config.signing_key[0],
|
||||
response_json, self.config.server_name, self.config.signing_key[0]
|
||||
)
|
||||
|
||||
signed_key_json_bytes = encode_canonical_json(signed_key_json)
|
||||
@@ -653,21 +630,23 @@ class Keyring(object):
|
||||
response_keys.update(verify_keys)
|
||||
response_keys.update(old_verify_keys)
|
||||
|
||||
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store.store_server_keys_json,
|
||||
server_name=server_name,
|
||||
key_id=key_id,
|
||||
from_server=from_server,
|
||||
ts_now_ms=time_now_ms,
|
||||
ts_expires_ms=ts_valid_until_ms,
|
||||
key_json_bytes=signed_key_json_bytes,
|
||||
)
|
||||
for key_id in updated_key_ids
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError))
|
||||
yield logcontext.make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store.store_server_keys_json,
|
||||
server_name=server_name,
|
||||
key_id=key_id,
|
||||
from_server=from_server,
|
||||
ts_now_ms=time_now_ms,
|
||||
ts_expires_ms=ts_valid_until_ms,
|
||||
key_json_bytes=signed_key_json_bytes,
|
||||
)
|
||||
for key_id in updated_key_ids
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
defer.returnValue(response_keys)
|
||||
|
||||
@@ -681,16 +660,21 @@ class Keyring(object):
|
||||
A deferred that completes when the keys are stored.
|
||||
"""
|
||||
# TODO(markjh): Store whether the keys have expired.
|
||||
return logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store.store_server_verify_key,
|
||||
server_name, server_name, key.time_added, key
|
||||
)
|
||||
for key_id, key in verify_keys.items()
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError))
|
||||
return logcontext.make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.store.store_server_verify_key,
|
||||
server_name,
|
||||
server_name,
|
||||
key.time_added,
|
||||
key,
|
||||
)
|
||||
for key_id, key in verify_keys.items()
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -713,17 +697,19 @@ def _handle_key_deferred(verify_request):
|
||||
except KeyLookupError as e:
|
||||
logger.warn(
|
||||
"Failed to download keys for %s: %s %s",
|
||||
server_name, type(e).__name__, str(e),
|
||||
server_name,
|
||||
type(e).__name__,
|
||||
str(e),
|
||||
)
|
||||
raise SynapseError(
|
||||
502,
|
||||
"Error downloading keys for %s" % (server_name,),
|
||||
Codes.UNAUTHORIZED,
|
||||
502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Got Exception when downloading keys for %s: %s %s",
|
||||
server_name, type(e).__name__, str(e),
|
||||
server_name,
|
||||
type(e).__name__,
|
||||
str(e),
|
||||
)
|
||||
raise SynapseError(
|
||||
401,
|
||||
@@ -733,22 +719,24 @@ def _handle_key_deferred(verify_request):
|
||||
|
||||
json_object = verify_request.json_object
|
||||
|
||||
logger.debug("Got key %s %s:%s for server %s, verifying" % (
|
||||
key_id, verify_key.alg, verify_key.version, server_name,
|
||||
))
|
||||
logger.debug(
|
||||
"Got key %s %s:%s for server %s, verifying"
|
||||
% (key_id, verify_key.alg, verify_key.version, server_name)
|
||||
)
|
||||
try:
|
||||
verify_signed_json(json_object, server_name, verify_key)
|
||||
except SignatureVerifyException as e:
|
||||
logger.debug(
|
||||
"Error verifying signature for %s:%s:%s with key %s: %s",
|
||||
server_name, verify_key.alg, verify_key.version,
|
||||
server_name,
|
||||
verify_key.alg,
|
||||
verify_key.version,
|
||||
encode_verify_key_base64(verify_key),
|
||||
str(e),
|
||||
)
|
||||
raise SynapseError(
|
||||
401,
|
||||
"Invalid signature for server %s with key %s:%s: %s" % (
|
||||
server_name, verify_key.alg, verify_key.version, str(e),
|
||||
),
|
||||
"Invalid signature for server %s with key %s:%s: %s"
|
||||
% (server_name, verify_key.alg, verify_key.version, str(e)),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
325
synapse/handlers/stats.py
Normal file
325
synapse/handlers/stats.py
Normal file
@@ -0,0 +1,325 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.handlers.state_deltas import StateDeltasHandler
|
||||
from synapse.metrics import event_processing_positions
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import UserID
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StatsHandler(StateDeltasHandler):
|
||||
"""Handles keeping the *_stats tables updated with a simple time-series of
|
||||
information about the users, rooms and media on the server, such that admins
|
||||
have some idea of who is consuming their resources.
|
||||
|
||||
Heavily derived from UserDirectoryHandler
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
super(StatsHandler, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
self.state = hs.get_state_handler()
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
# We kick this off so that we don't have to wait for a change before
|
||||
# we start populating stats
|
||||
self.clock.call_later(0, self.notify_new_event)
|
||||
|
||||
def notify_new_event(self):
|
||||
"""Called when there may be more deltas to process
|
||||
"""
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
self._is_processing = True
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None:
|
||||
self.pos = yield self.store.get_stats_stream_pos()
|
||||
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
defer.returnValue(None)
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
return
|
||||
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_stream_pos(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(self.pos)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
"""
|
||||
Called with the state deltas to process
|
||||
"""
|
||||
for delta in deltas:
|
||||
typ = delta["type"]
|
||||
state_key = delta["state_key"]
|
||||
room_id = delta["room_id"]
|
||||
event_id = delta["event_id"]
|
||||
stream_id = delta["stream_id"]
|
||||
prev_event_id = delta["prev_event_id"]
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_room_stats(room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
if token is not None and token >= stream_id:
|
||||
logger.debug(
|
||||
"Ignoring: %s as earlier than this room's initial ingestion event",
|
||||
event_id,
|
||||
)
|
||||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
# Errr...
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
|
||||
if event_id is not None:
|
||||
event_content = (yield self.store.get_event(event_id)).content or {}
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = yield self.store.get_received_ts(event_id)
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
prev_event_content = {}
|
||||
if prev_event_id is not None:
|
||||
prev_event_content = (
|
||||
yield self.store.get_event(prev_event_id)
|
||||
).content
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", -1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", +1
|
||||
)
|
||||
elif membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", +1
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", +1
|
||||
)
|
||||
elif membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", +1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id):
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
-1,
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
+1,
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
# Newly created room. Add it with all blank portions.
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{
|
||||
"join_rules": None,
|
||||
"history_visibility": None,
|
||||
"encryption": None,
|
||||
"name": None,
|
||||
"topic": None,
|
||||
"avatar": None,
|
||||
"canonical_alias": None,
|
||||
},
|
||||
)
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"encryption": event_content.get("algorithm")}
|
||||
)
|
||||
elif typ == EventTypes.Name:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"name": event_content.get("name")}
|
||||
)
|
||||
elif typ == EventTypes.Topic:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"topic": event_content.get("topic")}
|
||||
)
|
||||
elif typ == EventTypes.RoomAvatar:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"avatar": event_content.get("url")}
|
||||
)
|
||||
elif typ == EventTypes.CanonicalAlias:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
Increment/decrement a user's number of public rooms when a room they are
|
||||
in changes to/from public visibility.
|
||||
|
||||
Args:
|
||||
ts (int): Timestamp in seconds
|
||||
room_id (str)
|
||||
is_public (bool)
|
||||
"""
|
||||
# For now, blindly iterate over all local users in the room so that
|
||||
# we can handle the whole problem of copying buckets over as needed
|
||||
user_ids = yield self.store.get_users_in_room(room_id)
|
||||
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "public_rooms", +1 if is_public else -1
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "private_rooms", -1 if is_public else +1
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_public_room(self, room_id):
|
||||
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
|
||||
history_visibility = yield self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility
|
||||
)
|
||||
|
||||
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
|
||||
(
|
||||
history_visibility
|
||||
and history_visibility.content.get("history_visibility")
|
||||
== "world_readable"
|
||||
)
|
||||
):
|
||||
defer.returnValue(True)
|
||||
else:
|
||||
defer.returnValue(False)
|
||||
@@ -74,14 +74,6 @@ REQUIREMENTS = [
|
||||
"attrs>=17.4.0",
|
||||
|
||||
"netaddr>=0.7.18",
|
||||
|
||||
# requests is a transitive dep of treq, and urlib3 is a transitive dep
|
||||
# of requests, as well as of sentry-sdk.
|
||||
#
|
||||
# As of requests 2.21, requests does not yet support urllib3 1.25.
|
||||
# (If we do not pin it here, pip will give us the latest urllib3
|
||||
# due to the dep via sentry-sdk.)
|
||||
"urllib3<1.25",
|
||||
]
|
||||
|
||||
CONDITIONAL_REQUIREMENTS = {
|
||||
|
||||
@@ -201,6 +201,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
# Pull out the relationship early if the client sent us something
|
||||
# which cannot possibly be processed by us.
|
||||
if content.get("m.relates_to", "not None") is None:
|
||||
del content["m.relates_to"]
|
||||
|
||||
event_dict = {
|
||||
"type": event_type,
|
||||
"content": content,
|
||||
|
||||
@@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler
|
||||
from synapse.handlers.room_member import RoomMemberMasterHandler
|
||||
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
|
||||
from synapse.handlers.set_password import SetPasswordHandler
|
||||
from synapse.handlers.stats import StatsHandler
|
||||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import TypingHandler
|
||||
from synapse.handlers.user_directory import UserDirectoryHandler
|
||||
@@ -139,6 +140,7 @@ class HomeServer(object):
|
||||
'acme_handler',
|
||||
'auth_handler',
|
||||
'device_handler',
|
||||
'stats_handler',
|
||||
'e2e_keys_handler',
|
||||
'e2e_room_keys_handler',
|
||||
'event_handler',
|
||||
@@ -191,6 +193,7 @@ class HomeServer(object):
|
||||
|
||||
REQUIRED_ON_MASTER_STARTUP = [
|
||||
"user_directory_handler",
|
||||
"stats_handler"
|
||||
]
|
||||
|
||||
# This is overridden in derived application classes
|
||||
@@ -474,6 +477,9 @@ class HomeServer(object):
|
||||
def build_secrets(self):
|
||||
return Secrets()
|
||||
|
||||
def build_stats_handler(self):
|
||||
return StatsHandler(self)
|
||||
|
||||
def build_spam_checker(self):
|
||||
return SpamChecker(self)
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ from .engines import PostgresEngine
|
||||
from .event_federation import EventFederationStore
|
||||
from .event_push_actions import EventPushActionsStore
|
||||
from .events import EventsStore
|
||||
from .events_bg_updates import EventsBackgroundUpdatesStore
|
||||
from .filtering import FilteringStore
|
||||
from .group_server import GroupServerStore
|
||||
from .keys import KeyStore
|
||||
@@ -56,6 +55,7 @@ from .roommember import RoomMemberStore
|
||||
from .search import SearchStore
|
||||
from .signatures import SignatureStore
|
||||
from .state import StateStore
|
||||
from .stats import StatsStore
|
||||
from .stream import StreamStore
|
||||
from .tags import TagsStore
|
||||
from .transactions import TransactionStore
|
||||
@@ -66,7 +66,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataStore(
|
||||
EventsBackgroundUpdatesStore,
|
||||
RoomMemberStore,
|
||||
RoomStore,
|
||||
RegistrationStore,
|
||||
@@ -102,6 +101,7 @@ class DataStore(
|
||||
GroupServerStore,
|
||||
UserErasureStore,
|
||||
MonthlyActiveUsersStore,
|
||||
StatsStore,
|
||||
RelationsStore,
|
||||
):
|
||||
def __init__(self, db_conn, hs):
|
||||
|
||||
@@ -1261,8 +1261,7 @@ class SQLBaseStore(object):
|
||||
" AND ".join("%s = ?" % (k,) for k in keyvalues),
|
||||
)
|
||||
|
||||
txn.execute(sql, list(keyvalues.values()))
|
||||
return txn.rowcount
|
||||
return txn.execute(sql, list(keyvalues.values()))
|
||||
|
||||
def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
|
||||
return self.runInteraction(
|
||||
@@ -1281,12 +1280,9 @@ class SQLBaseStore(object):
|
||||
column : column name to test for inclusion against `iterable`
|
||||
iterable : list
|
||||
keyvalues : dict of column names and values to select the rows with
|
||||
|
||||
Returns:
|
||||
int: Number rows deleted
|
||||
"""
|
||||
if not iterable:
|
||||
return 0
|
||||
return
|
||||
|
||||
sql = "DELETE FROM %s" % table
|
||||
|
||||
@@ -1301,9 +1297,7 @@ class SQLBaseStore(object):
|
||||
|
||||
if clauses:
|
||||
sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
|
||||
txn.execute(sql, values)
|
||||
|
||||
return txn.rowcount
|
||||
return txn.execute(sql, values)
|
||||
|
||||
def _get_cache_dict(
|
||||
self, db_conn, table, entity_column, stream_column, max_value, limit=100000
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018-2019 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -220,11 +219,41 @@ class EventsStore(
|
||||
EventsWorkerStore,
|
||||
BackgroundUpdateStore,
|
||||
):
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(EventsStore, self).__init__(db_conn, hs)
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
|
||||
self._background_reindex_fields_sender,
|
||||
)
|
||||
|
||||
self.register_background_index_update(
|
||||
"event_contains_url_index",
|
||||
index_name="event_contains_url_index",
|
||||
table="events",
|
||||
columns=["room_id", "topological_ordering", "stream_ordering"],
|
||||
where_clause="contains_url = true AND outlier = false",
|
||||
)
|
||||
|
||||
# an event_id index on event_search is useful for the purge_history
|
||||
# api. Plus it means we get to enforce some integrity with a UNIQUE
|
||||
# clause
|
||||
self.register_background_index_update(
|
||||
"event_search_event_id_idx",
|
||||
index_name="event_search_event_id_idx",
|
||||
table="event_search",
|
||||
columns=["event_id"],
|
||||
unique=True,
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -525,18 +554,10 @@ class EventsStore(
|
||||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
|
||||
# Remove any events which are prev_events of any existing events.
|
||||
# Finally, remove any events which are prev_events of any existing events.
|
||||
existing_prevs = yield self._get_events_which_are_prevs(result)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
# Finally handle the case where the new events have soft-failed prev
|
||||
# events. If they do we need to remove them and their prev events,
|
||||
# otherwise we end up with dangling extremities.
|
||||
existing_prevs = yield self._get_prevs_before_rejected(
|
||||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -552,7 +573,7 @@ class EventsStore(
|
||||
"""
|
||||
results = []
|
||||
|
||||
def _get_events_which_are_prevs_txn(txn, batch):
|
||||
def _get_events(txn, batch):
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
@@ -575,78 +596,10 @@ class EventsStore(
|
||||
)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction(
|
||||
"_get_events_which_are_prevs",
|
||||
_get_events_which_are_prevs_txn,
|
||||
chunk,
|
||||
)
|
||||
yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_prevs_before_rejected(self, event_ids):
|
||||
"""Get soft-failed ancestors to remove from the extremities.
|
||||
|
||||
Given a set of events, find all those that have been soft-failed or
|
||||
rejected. Returns those soft failed/rejected events and their prev
|
||||
events (whether soft-failed/rejected or not), and recurses up the
|
||||
prev-event graph until it finds no more soft-failed/rejected events.
|
||||
|
||||
This is used to find extremities that are ancestors of new events, but
|
||||
are separated by soft failed events.
|
||||
|
||||
Args:
|
||||
event_ids (Iterable[str]): Events to find prev events for. Note
|
||||
that these must have already been persisted.
|
||||
|
||||
Returns:
|
||||
Deferred[set[str]]
|
||||
"""
|
||||
|
||||
# The set of event_ids to return. This includes all soft-failed events
|
||||
# and their prev events.
|
||||
existing_prevs = set()
|
||||
|
||||
def _get_prevs_before_rejected_txn(txn, batch):
|
||||
to_recursively_check = batch
|
||||
|
||||
while to_recursively_check:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
event_id IN (%s)
|
||||
AND NOT events.outlier
|
||||
""" % (
|
||||
",".join("?" for _ in to_recursively_check),
|
||||
)
|
||||
|
||||
txn.execute(sql, to_recursively_check)
|
||||
to_recursively_check = []
|
||||
|
||||
for event_id, prev_event_id, metadata, rejected in txn:
|
||||
if prev_event_id in existing_prevs:
|
||||
continue
|
||||
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
if soft_failed or rejected:
|
||||
to_recursively_check.append(prev_event_id)
|
||||
existing_prevs.add(prev_event_id)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction(
|
||||
"_get_prevs_before_rejected",
|
||||
_get_prevs_before_rejected_txn,
|
||||
chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(existing_prevs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(
|
||||
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
|
||||
@@ -1550,6 +1503,153 @@ class EventsStore(
|
||||
ret = yield self.runInteraction("count_daily_active_rooms", _count)
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_fields_sender(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, json FROM events"
|
||||
" INNER JOIN event_json USING (event_id)"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
|
||||
update_rows = []
|
||||
for row in rows:
|
||||
try:
|
||||
event_id = row[1]
|
||||
event_json = json.loads(row[2])
|
||||
sender = event_json["sender"]
|
||||
content = event_json["content"]
|
||||
|
||||
contains_url = "url" in content
|
||||
if contains_url:
|
||||
contains_url &= isinstance(content["url"], text_type)
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
update_rows.append((sender, contains_url, event_id))
|
||||
|
||||
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
|
||||
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_origin_server_ts(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
event_ids = [row[1] for row in rows]
|
||||
|
||||
rows_to_update = []
|
||||
|
||||
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
|
||||
for chunk in chunks:
|
||||
ev_rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="event_json",
|
||||
column="event_id",
|
||||
iterable=chunk,
|
||||
retcols=["event_id", "json"],
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
for row in ev_rows:
|
||||
event_id = row["event_id"]
|
||||
event_json = json.loads(row["json"])
|
||||
try:
|
||||
origin_server_ts = event_json["origin_server_ts"]
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
rows_to_update.append((origin_server_ts, event_id))
|
||||
|
||||
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
|
||||
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows_to_update),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows_to_update)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
def get_current_backfill_token(self):
|
||||
"""The current minimum token that backfilled events have reached"""
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
|
||||
@@ -1,401 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from six import text_type
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
||||
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
|
||||
self._background_reindex_fields_sender,
|
||||
)
|
||||
|
||||
self.register_background_index_update(
|
||||
"event_contains_url_index",
|
||||
index_name="event_contains_url_index",
|
||||
table="events",
|
||||
columns=["room_id", "topological_ordering", "stream_ordering"],
|
||||
where_clause="contains_url = true AND outlier = false",
|
||||
)
|
||||
|
||||
# an event_id index on event_search is useful for the purge_history
|
||||
# api. Plus it means we get to enforce some integrity with a UNIQUE
|
||||
# clause
|
||||
self.register_background_index_update(
|
||||
"event_search_event_id_idx",
|
||||
index_name="event_search_event_id_idx",
|
||||
table="event_search",
|
||||
columns=["event_id"],
|
||||
unique=True,
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
self.register_background_update_handler(
|
||||
self.DELETE_SOFT_FAILED_EXTREMITIES,
|
||||
self._cleanup_extremities_bg_update,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_fields_sender(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, json FROM events"
|
||||
" INNER JOIN event_json USING (event_id)"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
|
||||
update_rows = []
|
||||
for row in rows:
|
||||
try:
|
||||
event_id = row[1]
|
||||
event_json = json.loads(row[2])
|
||||
sender = event_json["sender"]
|
||||
content = event_json["content"]
|
||||
|
||||
contains_url = "url" in content
|
||||
if contains_url:
|
||||
contains_url &= isinstance(content["url"], text_type)
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
update_rows.append((sender, contains_url, event_id))
|
||||
|
||||
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
|
||||
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_origin_server_ts(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
event_ids = [row[1] for row in rows]
|
||||
|
||||
rows_to_update = []
|
||||
|
||||
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
|
||||
for chunk in chunks:
|
||||
ev_rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="event_json",
|
||||
column="event_id",
|
||||
iterable=chunk,
|
||||
retcols=["event_id", "json"],
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
for row in ev_rows:
|
||||
event_id = row["event_id"]
|
||||
event_json = json.loads(row["json"])
|
||||
try:
|
||||
origin_server_ts = event_json["origin_server_ts"]
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
rows_to_update.append((origin_server_ts, event_id))
|
||||
|
||||
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
|
||||
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows_to_update),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows_to_update)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _cleanup_extremities_bg_update(self, progress, batch_size):
|
||||
"""Background update to clean out extremities that should have been
|
||||
deleted previously.
|
||||
|
||||
Mainly used to deal with the aftermath of #5269.
|
||||
"""
|
||||
|
||||
# This works by first copying all existing forward extremities into the
|
||||
# `_extremities_to_check` table at start up, and then checking each
|
||||
# event in that table whether we have any descendants that are not
|
||||
# soft-failed/rejected. If that is the case then we delete that event
|
||||
# from the forward extremities table.
|
||||
#
|
||||
# For efficiency, we do this in batches by recursively pulling out all
|
||||
# descendants of a batch until we find the non soft-failed/rejected
|
||||
# events, i.e. the set of descendants whose chain of prev events back
|
||||
# to the batch of extremities are all soft-failed or rejected.
|
||||
# Typically, we won't find any such events as extremities will rarely
|
||||
# have any descendants, but if they do then we should delete those
|
||||
# extremities.
|
||||
|
||||
def _cleanup_extremities_bg_update_txn(txn):
|
||||
# The set of extremity event IDs that we're checking this round
|
||||
original_set = set()
|
||||
|
||||
# A dict[str, set[str]] of event ID to their prev events.
|
||||
graph = {}
|
||||
|
||||
# The set of descendants of the original set that are not rejected
|
||||
# nor soft-failed. Ancestors of these events should be removed
|
||||
# from the forward extremities table.
|
||||
non_rejected_leaves = set()
|
||||
|
||||
# Set of event IDs that have been soft failed, and for which we
|
||||
# should check if they have descendants which haven't been soft
|
||||
# failed.
|
||||
soft_failed_events_to_lookup = set()
|
||||
|
||||
# First, we get `batch_size` events from the table, pulling out
|
||||
# their successor events, if any, and the successor events'
|
||||
# rejection status.
|
||||
txn.execute(
|
||||
"""SELECT prev_event_id, event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL, events.outlier
|
||||
FROM (
|
||||
SELECT event_id AS prev_event_id
|
||||
FROM _extremities_to_check
|
||||
LIMIT ?
|
||||
) AS f
|
||||
LEFT JOIN event_edges USING (prev_event_id)
|
||||
LEFT JOIN events USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
""", (batch_size,)
|
||||
)
|
||||
|
||||
for prev_event_id, event_id, metadata, rejected, outlier in txn:
|
||||
original_set.add(prev_event_id)
|
||||
|
||||
if not event_id or outlier:
|
||||
# Common case where the forward extremity doesn't have any
|
||||
# descendants.
|
||||
continue
|
||||
|
||||
graph.setdefault(event_id, set()).add(prev_event_id)
|
||||
|
||||
soft_failed = False
|
||||
if metadata:
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
|
||||
if soft_failed or rejected:
|
||||
soft_failed_events_to_lookup.add(event_id)
|
||||
else:
|
||||
non_rejected_leaves.add(event_id)
|
||||
|
||||
# Now we recursively check all the soft-failed descendants we
|
||||
# found above in the same way, until we have nothing left to
|
||||
# check.
|
||||
while soft_failed_events_to_lookup:
|
||||
# We only want to do 100 at a time, so we split given list
|
||||
# into two.
|
||||
batch = list(soft_failed_events_to_lookup)
|
||||
to_check, to_defer = batch[:100], batch[100:]
|
||||
soft_failed_events_to_lookup = set(to_defer)
|
||||
|
||||
sql = """SELECT prev_event_id, event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
INNER JOIN event_json USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE
|
||||
prev_event_id IN (%s)
|
||||
AND NOT events.outlier
|
||||
""" % (
|
||||
",".join("?" for _ in to_check),
|
||||
)
|
||||
txn.execute(sql, to_check)
|
||||
|
||||
for prev_event_id, event_id, metadata, rejected in txn:
|
||||
if event_id in graph:
|
||||
# Already handled this event previously, but we still
|
||||
# want to record the edge.
|
||||
graph[event_id].add(prev_event_id)
|
||||
continue
|
||||
|
||||
graph[event_id] = {prev_event_id}
|
||||
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
if soft_failed or rejected:
|
||||
soft_failed_events_to_lookup.add(event_id)
|
||||
else:
|
||||
non_rejected_leaves.add(event_id)
|
||||
|
||||
# We have a set of non-soft-failed descendants, so we recurse up
|
||||
# the graph to find all ancestors and add them to the set of event
|
||||
# IDs that we can delete from forward extremities table.
|
||||
to_delete = set()
|
||||
while non_rejected_leaves:
|
||||
event_id = non_rejected_leaves.pop()
|
||||
prev_event_ids = graph.get(event_id, set())
|
||||
non_rejected_leaves.update(prev_event_ids)
|
||||
to_delete.update(prev_event_ids)
|
||||
|
||||
to_delete.intersection_update(original_set)
|
||||
|
||||
deleted = self._simple_delete_many_txn(
|
||||
txn=txn,
|
||||
table="event_forward_extremities",
|
||||
column="event_id",
|
||||
iterable=to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Deleted %d forward extremities of %d checked, to clean up #5269",
|
||||
deleted,
|
||||
len(original_set),
|
||||
)
|
||||
|
||||
if deleted:
|
||||
# We now need to invalidate the caches of these rooms
|
||||
rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="events",
|
||||
column="event_id",
|
||||
iterable=to_delete,
|
||||
keyvalues={},
|
||||
retcols=("room_id",)
|
||||
)
|
||||
room_ids = set(row["room_id"] for row in rows)
|
||||
for room_id in room_ids:
|
||||
txn.call_after(
|
||||
self.get_latest_event_ids_in_room.invalidate,
|
||||
(room_id,)
|
||||
)
|
||||
|
||||
self._simple_delete_many_txn(
|
||||
txn=txn,
|
||||
table="_extremities_to_check",
|
||||
column="event_id",
|
||||
iterable=original_set,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
return len(original_set)
|
||||
|
||||
num_handled = yield self.runInteraction(
|
||||
"_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
|
||||
)
|
||||
|
||||
if not num_handled:
|
||||
yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
|
||||
|
||||
def _drop_table_txn(txn):
|
||||
txn.execute("DROP TABLE _extremities_to_check")
|
||||
|
||||
yield self.runInteraction(
|
||||
"_cleanup_extremities_bg_update_drop_table",
|
||||
_drop_table_txn,
|
||||
)
|
||||
|
||||
defer.returnValue(num_handled)
|
||||
@@ -611,3 +611,27 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
return res
|
||||
|
||||
return self.runInteraction("get_rejection_reasons", f)
|
||||
|
||||
def _get_total_state_event_counts_txn(self, txn, room_id):
|
||||
"""
|
||||
See get_state_event_counts.
|
||||
"""
|
||||
sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
|
||||
txn.execute(sql, (room_id,))
|
||||
row = txn.fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
def get_total_state_event_counts(self, room_id):
|
||||
"""
|
||||
Gets the total number of state events in a room.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_total_state_event_counts",
|
||||
self._get_total_state_event_counts_txn, room_id
|
||||
)
|
||||
|
||||
@@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
return self.runInteraction("get_room_summary", _get_room_summary_txn)
|
||||
|
||||
def _get_user_count_in_room_txn(self, txn, room_id, membership):
|
||||
"""
|
||||
See get_user_count_in_room.
|
||||
"""
|
||||
sql = (
|
||||
"SELECT count(*) FROM room_memberships as m"
|
||||
" INNER JOIN current_state_events as c"
|
||||
" ON m.event_id = c.event_id "
|
||||
" AND m.room_id = c.room_id "
|
||||
" AND m.user_id = c.state_key"
|
||||
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (room_id, membership))
|
||||
row = txn.fetchone()
|
||||
return row[0]
|
||||
|
||||
def get_user_count_in_room(self, room_id, membership):
|
||||
"""
|
||||
Get the user count in a room with a particular membership.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
membership (Membership)
|
||||
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_invited_rooms_for_user(self, user_id):
|
||||
""" Get all the rooms the user is invited to
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Start a background job to cleanup extremities that were incorrectly added
|
||||
-- by bug #5269.
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('delete_soft_failed_extremities', '{}');
|
||||
|
||||
DROP TABLE IF EXISTS _extremities_to_check; -- To make this delta schema file idempotent.
|
||||
CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities;
|
||||
CREATE INDEX _extremities_to_check_id ON _extremities_to_check(event_id);
|
||||
80
synapse/storage/schema/delta/54/stats.sql
Normal file
80
synapse/storage/schema/delta/54/stats.sql
Normal file
@@ -0,0 +1,80 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE TABLE stats_stream_pos (
|
||||
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
|
||||
stream_id BIGINT,
|
||||
CHECK (Lock='X')
|
||||
);
|
||||
|
||||
INSERT INTO stats_stream_pos (stream_id) VALUES (null);
|
||||
|
||||
CREATE TABLE user_stats (
|
||||
user_id TEXT NOT NULL,
|
||||
ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
|
||||
|
||||
CREATE TABLE room_stats (
|
||||
room_id TEXT NOT NULL,
|
||||
ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
current_state_events INT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
state_events INT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
|
||||
|
||||
-- cache of current room state; useful for the publicRooms list
|
||||
CREATE TABLE room_state (
|
||||
room_id TEXT NOT NULL,
|
||||
join_rules TEXT,
|
||||
history_visibility TEXT,
|
||||
encryption TEXT,
|
||||
name TEXT,
|
||||
topic TEXT,
|
||||
avatar TEXT,
|
||||
canonical_alias TEXT
|
||||
-- get aliases straight from the right table
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
|
||||
|
||||
CREATE TABLE room_stats_earliest_token (
|
||||
room_id TEXT NOT NULL,
|
||||
token BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
|
||||
|
||||
-- Set up staging tables
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('populate_stats_createtables', '{}');
|
||||
|
||||
-- Run through each room and update stats
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_rooms', '{}', 'populate_stats_createtables');
|
||||
|
||||
-- Clean up staging tables
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_cleanup', '{}', 'populate_stats_process_rooms');
|
||||
@@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore):
|
||||
"get_current_state_deltas", get_current_state_deltas_txn
|
||||
)
|
||||
|
||||
def get_max_stream_id_in_current_state_deltas(self):
|
||||
return self._simple_select_one_onecol(
|
||||
def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
|
||||
return self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="current_state_delta_stream",
|
||||
keyvalues={},
|
||||
retcol="COALESCE(MAX(stream_id), -1)",
|
||||
desc="get_max_stream_id_in_current_state_deltas",
|
||||
)
|
||||
|
||||
def get_max_stream_id_in_current_state_deltas(self):
|
||||
return self.runInteraction(
|
||||
"get_max_stream_id_in_current_state_deltas",
|
||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||
)
|
||||
|
||||
450
synapse/storage/stats.py
Normal file
450
synapse/storage/stats.py
Normal file
@@ -0,0 +1,450 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018, 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# these fields track absolutes (e.g. total number of rooms on the server)
|
||||
ABSOLUTE_STATS_FIELDS = {
|
||||
"room": (
|
||||
"current_state_events",
|
||||
"joined_members",
|
||||
"invited_members",
|
||||
"left_members",
|
||||
"banned_members",
|
||||
"state_events",
|
||||
),
|
||||
"user": ("public_rooms", "private_rooms"),
|
||||
}
|
||||
|
||||
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
TEMP_TABLE = "_temp_populate_stats"
|
||||
|
||||
|
||||
class StatsStore(StateDeltasStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(StatsStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
self.clock = self.hs.get_clock()
|
||||
self.stats_enabled = hs.config.stats_enabled
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_createtables", self._populate_stats_createtables
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_process_rooms", self._populate_stats_process_rooms
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_cleanup", self._populate_stats_cleanup
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_createtables(self, progress, batch_size):
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
defer.returnValue(1)
|
||||
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn):
|
||||
sql = (
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
+ TEMP_TABLE
|
||||
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
sql = (
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
+ TEMP_TABLE
|
||||
+ "_position(position TEXT NOT NULL)"
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
# Get rooms we want to process from the database
|
||||
sql = """
|
||||
SELECT room_id, count(*) FROM current_state_events
|
||||
GROUP BY room_id
|
||||
"""
|
||||
txn.execute(sql)
|
||||
rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
|
||||
self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
|
||||
del rooms
|
||||
|
||||
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
|
||||
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
|
||||
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
|
||||
self.get_earliest_token_for_room_stats.invalidate_all()
|
||||
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
defer.returnValue(1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_cleanup(self, progress, batch_size):
|
||||
"""
|
||||
Update the user directory stream position, then clean up the old tables.
|
||||
"""
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
defer.returnValue(1)
|
||||
|
||||
position = yield self._simple_select_one_onecol(
|
||||
TEMP_TABLE + "_position", None, "position"
|
||||
)
|
||||
yield self.update_stats_stream_pos(position)
|
||||
|
||||
def _delete_staging_area(txn):
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
|
||||
|
||||
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
|
||||
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
defer.returnValue(1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_process_rooms(self, progress, batch_size):
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
defer.returnValue(1)
|
||||
|
||||
# If we don't have progress filed, delete everything.
|
||||
if not progress:
|
||||
yield self.delete_all_stats()
|
||||
|
||||
def _get_next_batch(txn):
|
||||
# Only fetch 250 rooms, so we don't fetch too many at once, even
|
||||
# if those 250 rooms have less than batch_size state events.
|
||||
sql = """
|
||||
SELECT room_id, events FROM %s_rooms
|
||||
ORDER BY events DESC
|
||||
LIMIT 250
|
||||
""" % (
|
||||
TEMP_TABLE,
|
||||
)
|
||||
txn.execute(sql)
|
||||
rooms_to_work_on = txn.fetchall()
|
||||
|
||||
if not rooms_to_work_on:
|
||||
return None
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return rooms_to_work_on
|
||||
|
||||
rooms_to_work_on = yield self.runInteraction(
|
||||
"populate_stats_temp_read", _get_next_batch
|
||||
)
|
||||
|
||||
# No more rooms -- complete the transaction.
|
||||
if not rooms_to_work_on:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
defer.returnValue(1)
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
(len(rooms_to_work_on), progress["remaining"]),
|
||||
)
|
||||
|
||||
# Number of state events we've processed by going through each room
|
||||
processed_event_count = 0
|
||||
|
||||
for room_id, event_count in rooms_to_work_on:
|
||||
|
||||
current_state_ids = yield self.get_current_state_ids(room_id)
|
||||
|
||||
join_rules = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
|
||||
)
|
||||
history_visibility = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
|
||||
allow_none=True,
|
||||
)
|
||||
encryption = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
|
||||
)
|
||||
name = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.Name, "")), allow_none=True
|
||||
)
|
||||
topic = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.Topic, "")), allow_none=True
|
||||
)
|
||||
avatar = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
|
||||
)
|
||||
canonical_alias = yield self.get_event(
|
||||
current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
|
||||
)
|
||||
|
||||
def _or_none(x, arg):
|
||||
if x:
|
||||
return x.content.get(arg)
|
||||
return None
|
||||
|
||||
yield self.update_room_state(
|
||||
room_id,
|
||||
{
|
||||
"join_rules": _or_none(join_rules, "join_rule"),
|
||||
"history_visibility": _or_none(
|
||||
history_visibility, "history_visibility"
|
||||
),
|
||||
"encryption": _or_none(encryption, "algorithm"),
|
||||
"name": _or_none(name, "name"),
|
||||
"topic": _or_none(topic, "topic"),
|
||||
"avatar": _or_none(avatar, "url"),
|
||||
"canonical_alias": _or_none(canonical_alias, "alias"),
|
||||
},
|
||||
)
|
||||
|
||||
now = self.hs.get_reactor().seconds()
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
def _fetch_data(txn):
|
||||
|
||||
# Get the current token of the room
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
|
||||
current_state_events = len(current_state_ids)
|
||||
joined_members = self._get_user_count_in_room_txn(
|
||||
txn, room_id, Membership.JOIN
|
||||
)
|
||||
invited_members = self._get_user_count_in_room_txn(
|
||||
txn, room_id, Membership.INVITE
|
||||
)
|
||||
left_members = self._get_user_count_in_room_txn(
|
||||
txn, room_id, Membership.LEAVE
|
||||
)
|
||||
banned_members = self._get_user_count_in_room_txn(
|
||||
txn, room_id, Membership.BAN
|
||||
)
|
||||
total_state_events = self._get_total_state_event_counts_txn(
|
||||
txn, room_id
|
||||
)
|
||||
|
||||
self._update_stats_txn(
|
||||
txn,
|
||||
"room",
|
||||
room_id,
|
||||
now,
|
||||
{
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
"current_state_events": current_state_events,
|
||||
"joined_members": joined_members,
|
||||
"invited_members": invited_members,
|
||||
"left_members": left_members,
|
||||
"banned_members": banned_members,
|
||||
"state_events": total_state_events,
|
||||
},
|
||||
)
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"room_stats_earliest_token",
|
||||
{"room_id": room_id, "token": current_token},
|
||||
)
|
||||
|
||||
yield self.runInteraction("update_room_stats", _fetch_data)
|
||||
|
||||
# We've finished a room. Delete it from the table.
|
||||
yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
|
||||
# Update the remaining counter.
|
||||
progress["remaining"] -= 1
|
||||
yield self.runInteraction(
|
||||
"populate_stats",
|
||||
self._background_update_progress_txn,
|
||||
"populate_stats_process_rooms",
|
||||
progress,
|
||||
)
|
||||
|
||||
processed_event_count += event_count
|
||||
|
||||
if processed_event_count > batch_size:
|
||||
# Don't process any more rooms, we've hit our batch size.
|
||||
defer.returnValue(processed_event_count)
|
||||
|
||||
defer.returnValue(processed_event_count)
|
||||
|
||||
def delete_all_stats(self):
|
||||
"""
|
||||
Delete all statistics records.
|
||||
"""
|
||||
|
||||
def _delete_all_stats_txn(txn):
|
||||
txn.execute("DELETE FROM room_state")
|
||||
txn.execute("DELETE FROM room_stats")
|
||||
txn.execute("DELETE FROM room_stats_earliest_token")
|
||||
txn.execute("DELETE FROM user_stats")
|
||||
|
||||
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
|
||||
|
||||
def get_stats_stream_pos(self):
|
||||
return self._simple_select_one_onecol(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
desc="stats_stream_pos",
|
||||
)
|
||||
|
||||
def update_stats_stream_pos(self, stream_id):
|
||||
return self._simple_update_one(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_stats_stream_pos",
|
||||
)
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
"""
|
||||
Args:
|
||||
room_id (str)
|
||||
fields (dict[str:Any])
|
||||
"""
|
||||
return self._simple_upsert(
|
||||
table="room_state",
|
||||
keyvalues={"room_id": room_id},
|
||||
values=fields,
|
||||
desc="update_room_state",
|
||||
)
|
||||
|
||||
def get_deltas_for_room(self, room_id, start, size=100):
|
||||
"""
|
||||
Get statistics deltas for a given room.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
start (int): Pagination start. Number of entries, not timestamp.
|
||||
size (int): How many entries to return.
|
||||
|
||||
Returns:
|
||||
Deferred[list[dict]], where the dict has the keys of
|
||||
ABSOLUTE_STATS_FIELDS["room"] and "ts".
|
||||
"""
|
||||
return self._simple_select_list_paginate(
|
||||
"room_stats",
|
||||
{"room_id": room_id},
|
||||
"ts",
|
||||
start,
|
||||
size,
|
||||
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
|
||||
order_direction="DESC",
|
||||
)
|
||||
|
||||
def get_all_room_state(self):
|
||||
return self._simple_select_list(
|
||||
"room_state", None, retcols=("name", "topic", "canonical_alias")
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_earliest_token_for_room_stats(self, room_id):
|
||||
"""
|
||||
Fetch the "earliest token". This is used by the room stats delta
|
||||
processor to ignore deltas that have been processed between the
|
||||
start of the background task and any particular room's stats
|
||||
being calculated.
|
||||
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
return self._simple_select_one_onecol(
|
||||
"room_stats_earliest_token",
|
||||
{"room_id": room_id},
|
||||
retcol="token",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def update_stats(self, stats_type, stats_id, ts, fields):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
return self._simple_upsert(
|
||||
table=table,
|
||||
keyvalues={id_col: stats_id, "ts": ts},
|
||||
values=fields,
|
||||
desc="update_stats",
|
||||
)
|
||||
|
||||
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
return self._simple_upsert_txn(
|
||||
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
|
||||
)
|
||||
|
||||
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
|
||||
def _update_stats_delta(txn):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
|
||||
sql = (
|
||||
"SELECT * FROM %s"
|
||||
" WHERE %s=? and ts=("
|
||||
" SELECT MAX(ts) FROM %s"
|
||||
" WHERE %s=?"
|
||||
")"
|
||||
) % (table, id_col, table, id_col)
|
||||
txn.execute(sql, (stats_id, stats_id))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if len(rows) == 0:
|
||||
# silently skip as we don't have anything to apply a delta to yet.
|
||||
# this tries to minimise any race between the initial sync and
|
||||
# subsequent deltas arriving.
|
||||
return
|
||||
|
||||
current_ts = ts
|
||||
latest_ts = rows[0]["ts"]
|
||||
if current_ts < latest_ts:
|
||||
# This one is in the past, but we're just encountering it now.
|
||||
# Mark it as part of the current bucket.
|
||||
current_ts = latest_ts
|
||||
elif ts != latest_ts:
|
||||
# we have to copy our absolute counters over to the new entry.
|
||||
values = {
|
||||
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
}
|
||||
values[id_col] = stats_id
|
||||
values["ts"] = ts
|
||||
values["bucket_size"] = self.stats_bucket_size
|
||||
|
||||
self._simple_insert_txn(txn, table=table, values=values)
|
||||
|
||||
# actually update the new value
|
||||
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table=table,
|
||||
keyvalues={id_col: stats_id, "ts": current_ts},
|
||||
updatevalues={field: value},
|
||||
)
|
||||
else:
|
||||
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
|
||||
table,
|
||||
field,
|
||||
field,
|
||||
id_col,
|
||||
)
|
||||
txn.execute(sql, (value, stats_id, current_ts))
|
||||
|
||||
return self.runInteraction("update_stats_delta", _update_stats_delta)
|
||||
251
tests/handlers/test_stats.py
Normal file
251
tests/handlers/test_stats.py
Normal file
@@ -0,0 +1,251 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.handler = self.hs.get_stats_handler()
|
||||
|
||||
def _add_background_updates(self):
|
||||
"""
|
||||
Add the background updates we need to run.
|
||||
"""
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_rooms",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_createtables",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
def test_initial_room(self):
|
||||
"""
|
||||
The background updates will build the table from scratch.
|
||||
"""
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Disable stats
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Stats disabled, shouldn't have done anything
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Enable stats
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["topic"], "foo")
|
||||
|
||||
def test_initial_earliest_token(self):
|
||||
"""
|
||||
Ingestion via notify_new_event will ignore tokens that the background
|
||||
update have already processed.
|
||||
"""
|
||||
self.reactor.advance(86401)
|
||||
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2_token = self.login("u2", "pass")
|
||||
|
||||
u3 = self.register_user("u3", "pass")
|
||||
u3_token = self.login("u3", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Begin the ingestion by creating the temp tables. This will also store
|
||||
# the position that the deltas should begin at, once they take over.
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
self.store._all_done = False
|
||||
self.get_success(self.store.update_stats_stream_pos(None))
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
# Now, before the table is actually ingested, add some more events.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u2, tok=u2_token)
|
||||
|
||||
# Now do the initial ingestion.
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
self.store._all_done = False
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
self.reactor.advance(86401)
|
||||
|
||||
# Now add some more events, triggering ingestion. Because of the stream
|
||||
# position being set to before the events sent in the middle, a simpler
|
||||
# implementation would reprocess those events, and say there were four
|
||||
# users, not three.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u3, tok=u3_token)
|
||||
|
||||
# Get the deltas! There should be two -- day 1, and day 2.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
|
||||
# The oldest has 2 joined members
|
||||
self.assertEqual(r[-1]["joined_members"], 2)
|
||||
|
||||
# The newest has 3
|
||||
self.assertEqual(r[0]["joined_members"], 3)
|
||||
|
||||
def test_incorrect_state_transition(self):
|
||||
"""
|
||||
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
|
||||
(JOIN, INVITE, LEAVE, BAN), an error is raised.
|
||||
"""
|
||||
events = {
|
||||
"a1": {"membership": Membership.LEAVE},
|
||||
"a2": {"membership": "not a real thing"},
|
||||
}
|
||||
|
||||
def get_event(event_id):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a1",
|
||||
"prev_event_id": "a2",
|
||||
"stream_id": "bleb",
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid prev_membership"
|
||||
)
|
||||
|
||||
# And the other way...
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": "bleb",
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid membership"
|
||||
)
|
||||
@@ -127,3 +127,20 @@ class RestHelper(object):
|
||||
)
|
||||
|
||||
return channel.json_body
|
||||
|
||||
def send_state(self, room_id, event_type, body, tok, expect_code=200):
|
||||
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
|
||||
if tok:
|
||||
path = path + "?access_token=%s" % tok
|
||||
|
||||
request, channel = make_request(
|
||||
self.hs.get_reactor(), "PUT", path, json.dumps(body).encode('utf8')
|
||||
)
|
||||
render(request, self.resource, self.hs.get_reactor())
|
||||
|
||||
assert int(channel.result["code"]) == expect_code, (
|
||||
"Expected: %d, got: %d, resp: %r"
|
||||
% (expect_code, int(channel.result["code"]), channel.result["body"])
|
||||
)
|
||||
|
||||
return channel.json_body
|
||||
|
||||
@@ -1,248 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os.path
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.storage import prepare_database
|
||||
from synapse.types import Requester, UserID
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||
"""Test the background update to clean forward extremities table.
|
||||
"""
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
self.store = homeserver.get_datastore()
|
||||
self.event_creator = homeserver.get_event_creation_handler()
|
||||
self.room_creator = homeserver.get_room_creation_handler()
|
||||
|
||||
# Create a test user and room
|
||||
self.user = UserID("alice", "test")
|
||||
self.requester = Requester(self.user, None, False, None, None)
|
||||
info = self.get_success(self.room_creator.create_room(self.requester, {}))
|
||||
self.room_id = info["room_id"]
|
||||
|
||||
def create_and_send_event(self, soft_failed=False, prev_event_ids=None):
|
||||
"""Create and send an event.
|
||||
|
||||
Args:
|
||||
soft_failed (bool): Whether to create a soft failed event or not
|
||||
prev_event_ids (list[str]|None): Explicitly set the prev events,
|
||||
or if None just use the default
|
||||
|
||||
Returns:
|
||||
str: The new event's ID.
|
||||
"""
|
||||
prev_events_and_hashes = None
|
||||
if prev_event_ids:
|
||||
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
|
||||
|
||||
event, context = self.get_success(
|
||||
self.event_creator.create_event(
|
||||
self.requester,
|
||||
{
|
||||
"type": EventTypes.Message,
|
||||
"room_id": self.room_id,
|
||||
"sender": self.user.to_string(),
|
||||
"content": {"body": "", "msgtype": "m.text"},
|
||||
},
|
||||
prev_events_and_hashes=prev_events_and_hashes,
|
||||
)
|
||||
)
|
||||
|
||||
if soft_failed:
|
||||
event.internal_metadata.soft_failed = True
|
||||
|
||||
self.get_success(
|
||||
self.event_creator.send_nonmember_event(self.requester, event, context)
|
||||
)
|
||||
|
||||
return event.event_id
|
||||
|
||||
def add_extremity(self, event_id):
|
||||
"""Add the given event as an extremity to the room.
|
||||
"""
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
table="event_forward_extremities",
|
||||
values={"room_id": self.room_id, "event_id": event_id},
|
||||
desc="test_add_extremity",
|
||||
)
|
||||
)
|
||||
|
||||
self.store.get_latest_event_ids_in_room.invalidate((self.room_id,))
|
||||
|
||||
def run_background_update(self):
|
||||
"""Re run the background update to clean up the extremities.
|
||||
"""
|
||||
# Make sure we don't clash with in progress updates.
|
||||
self.assertTrue(self.store._all_done, "Background updates are still ongoing")
|
||||
|
||||
schema_path = os.path.join(
|
||||
prepare_database.dir_path,
|
||||
"schema",
|
||||
"delta",
|
||||
"54",
|
||||
"delete_forward_extremities.sql",
|
||||
)
|
||||
|
||||
def run_delta_file(txn):
|
||||
prepare_database.executescript(txn, schema_path)
|
||||
|
||||
self.get_success(
|
||||
self.store.runInteraction("test_delete_forward_extremities", run_delta_file)
|
||||
)
|
||||
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
def test_soft_failed_extremities_handled_correctly(self):
|
||||
"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like:
|
||||
|
||||
A <- SF1 <- SF2 <- B
|
||||
|
||||
Where SF* are soft failed.
|
||||
"""
|
||||
|
||||
# Create the room graph
|
||||
event_id_1 = self.create_and_send_event()
|
||||
event_id_2 = self.create_and_send_event(True, [event_id_1])
|
||||
event_id_3 = self.create_and_send_event(True, [event_id_2])
|
||||
event_id_4 = self.create_and_send_event(False, [event_id_3])
|
||||
|
||||
# Check the latest events are as expected
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
|
||||
self.assertEqual(latest_event_ids, [event_id_4])
|
||||
|
||||
def test_basic_cleanup(self):
|
||||
"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like:
|
||||
|
||||
A <- SF1 <- B
|
||||
|
||||
Where SF* are soft failed, and with extremities of A and B
|
||||
"""
|
||||
# Create the room graph
|
||||
event_id_a = self.create_and_send_event()
|
||||
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
|
||||
event_id_b = self.create_and_send_event(False, [event_id_sf1])
|
||||
|
||||
# Add the new extremity and check the latest events are as expected
|
||||
self.add_extremity(event_id_a)
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
|
||||
|
||||
# Run the background update and check it did the right thing
|
||||
self.run_background_update()
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(latest_event_ids, [event_id_b])
|
||||
|
||||
def test_chain_of_fail_cleanup(self):
|
||||
"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like:
|
||||
|
||||
A <- SF1 <- SF2 <- B
|
||||
|
||||
Where SF* are soft failed, and with extremities of A and B
|
||||
"""
|
||||
# Create the room graph
|
||||
event_id_a = self.create_and_send_event()
|
||||
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
|
||||
event_id_sf2 = self.create_and_send_event(True, [event_id_sf1])
|
||||
event_id_b = self.create_and_send_event(False, [event_id_sf2])
|
||||
|
||||
# Add the new extremity and check the latest events are as expected
|
||||
self.add_extremity(event_id_a)
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
|
||||
|
||||
# Run the background update and check it did the right thing
|
||||
self.run_background_update()
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(latest_event_ids, [event_id_b])
|
||||
|
||||
def test_forked_graph_cleanup(self):
|
||||
r"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like, where time flows down the page:
|
||||
|
||||
A B
|
||||
/ \ /
|
||||
/ \ /
|
||||
SF1 SF2
|
||||
| |
|
||||
SF3 |
|
||||
/ \ |
|
||||
| \ |
|
||||
C SF4
|
||||
|
||||
Where SF* are soft failed, and with them A, B and C marked as
|
||||
extremities. This should resolve to B and C being marked as extremity.
|
||||
"""
|
||||
# Create the room graph
|
||||
event_id_a = self.create_and_send_event()
|
||||
event_id_b = self.create_and_send_event()
|
||||
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
|
||||
event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b])
|
||||
event_id_sf3 = self.create_and_send_event(True, [event_id_sf1])
|
||||
self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4
|
||||
event_id_c = self.create_and_send_event(False, [event_id_sf3])
|
||||
|
||||
# Add the new extremity and check the latest events are as expected
|
||||
self.add_extremity(event_id_a)
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(
|
||||
set(latest_event_ids), set((event_id_a, event_id_b, event_id_c))
|
||||
)
|
||||
|
||||
# Run the background update and check it did the right thing
|
||||
self.run_background_update()
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))
|
||||
Reference in New Issue
Block a user