1
0

Compare commits

...

29 Commits

Author SHA1 Message Date
Travis Ralston c4f3e97879 Appease the linter 2019-09-05 22:10:57 -06:00
Travis Ralston 170526557f Filter out hidden read receipts from sync too 2019-09-05 21:51:53 -06:00
Travis Ralston 30f0824079 Filter out hidden read receipts for federation workers 2019-09-05 20:56:01 -06:00
Travis Ralston 27eac52813 import the important things in life 2019-09-05 20:41:54 -06:00
Travis Ralston 53cf721ede changelog 2019-09-05 20:41:10 -06:00
Travis Ralston aab3f5d11e Support optionally not sending read receipts to other users/servers 2019-09-05 20:37:58 -06:00
Jorik Schellekens f7c873a643 Trace how long it takes for the send trasaction to complete, including retrys (#5986) 2019-09-05 17:44:55 +01:00
Jorik Schellekens bc604e7f94 Gracefully handle log context slips and missing opentracing import errors. (#5988) 2019-09-05 17:33:29 +01:00
Erik Johnston 1a6ae33309 Merge pull request #5984 from matrix-org/joriks/opentracing_link_send_to_edu_contexts
Link the send loop with the edus contexts
2019-09-05 15:22:24 +01:00
Jorik Schellekens ef20aa52eb use access methods (duh..)
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-09-05 15:07:17 +01:00
Jorik Schellekens 7093790fbc Bugfix phrasing
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-09-05 15:07:00 +01:00
Jorik Schellekens 5ade977d08 Opentracing context cannot be none 2019-09-05 15:06:13 +01:00
Jorik Schellekens 909827b422 Add opentracing to all client servlets (#5983) 2019-09-05 14:46:04 +01:00
Jorik Schellekens 93bc9d73bf newsfile 2019-09-05 14:45:07 +01:00
Jorik Schellekens 1d65292e94 Link the send loop with the edus contexts
The contexts were being filtered too early so  the send loop wasn't
being linked to them unless the destination
was whitelisted.
2019-09-05 14:42:37 +01:00
Andrew Morgan a0d294c306 Switch to using v2 Identity Service APIs other than lookup (MSC 2140) (#5892) 2019-09-05 14:31:22 +01:00
Jorik Schellekens b9cfd3c375 Fix opentracing contexts missing from outbound replication requests (#5982) 2019-09-05 14:22:15 +01:00
Andrew Morgan 90d17a3d28 Add POST /_matrix/client/r0/account/3pid/unbind (MSC2140) (#5980)
Implements `POST /_matrix/client/r0/account/3pid/unbind` from [MSC2140](https://github.com/matrix-org/matrix-doc/blob/dbkr/tos_2/proposals/2140-terms-of-service-2.md#post-_matrixclientr0account3pidunbind).
2019-09-05 14:00:30 +01:00
Andrew Morgan b736c6cd3a Remove bind_email and bind_msisdn (#5964)
Removes the `bind_email` and `bind_msisdn` parameters from the `/register` C/S API endpoint as per [MSC2140: Terms of Service for ISes and IMs](https://github.com/matrix-org/matrix-doc/pull/2140/files#diff-c03a26de5ac40fb532de19cb7fc2aaf7R107).
2019-09-04 18:24:23 +01:00
Andrew Morgan b09d443632 Cleanup event auth type initialisation (#5975)
Very small code cleanup.
2019-09-04 16:16:56 +01:00
Erik Johnston 6e834e94fc Fix and refactor room and user stats (#5971)
Previously the stats were not being correctly populated.
2019-09-04 13:04:27 +01:00
Andrew Morgan ea128a3e8e code cleanups 2019-09-03 21:05:06 +01:00
Travis Ralston 2f416fc997 Ensure the list media admin API is always available (#5966)
* Ensure the list media admin API is always available

This API is required for some external media repo implementations to operate (mostly for doing quarantine operations on a room).

* changelog
2019-09-03 13:35:20 -06:00
Andrew Morgan 6b6086b8bf Fix docstring 2019-09-03 20:00:09 +01:00
Andrew Morgan a98b8583c6 Remove unnecessary variable declaration 2019-09-03 19:58:51 +01:00
Michael Kaye 894c1a5759 Docker packaging should not su-exec or chmod if already running as UID/GID (#5970)
Adjust su-exec to only be used if needed.

If UID == getuid() and GID == getgid() then we do not need to su-exec, and chmod will not work.
2019-09-03 16:36:01 +01:00
Travis Ralston 0eac7077c9 Ensure an auth instance is available to ListMediaInRoom (#5967)
* Ensure an auth instance is available to ListMediaInRoom

Fixes https://github.com/matrix-org/synapse/issues/5737

* Changelog
2019-09-03 09:01:30 -06:00
Matthew Hodgson 8401bcd206 fix typo 2019-09-03 12:44:14 +01:00
Andrew Morgan 2a44782666 Remove double return statements (#5962)
Remove all the "double return" statements which were a result of us removing all the instances of

```
defer.returnValue(...)
return
```

statements when we switched to python3 fully.
2019-09-03 11:42:45 +01:00
65 changed files with 2110 additions and 920 deletions
+1
View File
@@ -0,0 +1 @@
Compatibility with v2 Identity Service APIs other than /lookup.
+1
View File
@@ -0,0 +1 @@
Remove unnecessary return statements in the codebase which were the result of a regex run.
+1
View File
@@ -0,0 +1 @@
Remove `bind_email` and `bind_msisdn` parameters from /register ala MSC2140.
+1
View File
@@ -0,0 +1 @@
Fix admin API for listing media in a room not being available with an external media repo.
+1
View File
@@ -0,0 +1 @@
Fix list media admin API always returning an error.
+1
View File
@@ -0,0 +1 @@
Avoid changing UID/GID if they are already correct.
+1
View File
@@ -0,0 +1 @@
Fix room and user stats tracking.
+1
View File
@@ -0,0 +1 @@
Cleanup event auth type initialisation.
+1
View File
@@ -0,0 +1 @@
Add POST /_matrix/client/r0/account/3pid/unbind endpoint from MSC2140 for unbinding a 3PID from an identity server without removing it from the homeserver user account.
+1
View File
@@ -0,0 +1 @@
Include missing opentracing contexts in outbout replication requests.
+1
View File
@@ -0,0 +1 @@
Add minimum opentracing for client servlets.
+1
View File
@@ -0,0 +1 @@
Fix sending of EDUs when opentracing is enabled with an empty whitelist.
+1
View File
@@ -0,0 +1 @@
Trace replication send times.
+1
View File
@@ -0,0 +1 @@
Fix invalid references to None while opentracing if the log context slips.
+1
View File
@@ -0,0 +1 @@
Support a way for clients to not send read receipts to other users/servers.
+5
View File
@@ -268,6 +268,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_emailrequest(self, args):
# TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -302,6 +303,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_emailvalidate(self, args):
# TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/validate/email/submitToken"
@@ -330,6 +332,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_3pidbind(self, args):
# TODO: Update to use v2 Identity Service API endpoint
url = self._identityServerUrl() + "/_matrix/identity/api/v1/3pid/bind"
json_res = yield self.http_client.do_request(
@@ -398,6 +401,7 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_invite(self, roomid, userstring):
if not userstring.startswith("@") and self._is_on("complete_usernames"):
# TODO: Update to use v2 Identity Service API endpoint
url = self._identityServerUrl() + "/_matrix/identity/api/v1/lookup"
json_res = yield self.http_client.do_request(
@@ -407,6 +411,7 @@ class SynapseCmd(cmd.Cmd):
mxid = None
if "mxid" in json_res and "signatures" in json_res:
# TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/pubkey/ed25519"
+49 -35
View File
@@ -41,8 +41,8 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
config_dir (str): where to put generated config files
config_path (str): where to put the main config file
environ (dict): environment dictionary
ownership (str): "<user>:<group>" string which will be used to set
ownership of the generated configs
ownership (str|None): "<user>:<group>" string which will be used to set
ownership of the generated configs. If None, ownership will not change.
"""
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
if v not in environ:
@@ -105,24 +105,24 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
log("Generating log config file " + log_config_file)
convert("/conf/log.config", log_config_file, environ)
subprocess.check_output(["chown", "-R", ownership, "/data"])
# Hopefully we already have a signing key, but generate one if not.
subprocess.check_output(
[
"su-exec",
ownership,
"python",
"-m",
"synapse.app.homeserver",
"--config-path",
config_path,
# tell synapse to put generated keys in /data rather than /compiled
"--keys-directory",
config_dir,
"--generate-keys",
]
)
args = [
"python",
"-m",
"synapse.app.homeserver",
"--config-path",
config_path,
# tell synapse to put generated keys in /data rather than /compiled
"--keys-directory",
config_dir,
"--generate-keys",
]
if ownership is not None:
subprocess.check_output(["chown", "-R", ownership, "/data"])
args = ["su-exec", ownership] + args
subprocess.check_output(args)
def run_generate_config(environ, ownership):
@@ -130,7 +130,7 @@ def run_generate_config(environ, ownership):
Args:
environ (dict): env var dict
ownership (str): "userid:groupid" arg for chmod
ownership (str|None): "userid:groupid" arg for chmod. If None, ownership will not change.
Never returns.
"""
@@ -149,9 +149,6 @@ def run_generate_config(environ, ownership):
log("Creating log config %s" % (log_config_file,))
convert("/conf/log.config", log_config_file, environ)
# make sure that synapse has perms to write to the data dir.
subprocess.check_output(["chown", ownership, data_dir])
args = [
"python",
"-m",
@@ -170,12 +167,33 @@ def run_generate_config(environ, ownership):
"--open-private-ports",
]
# log("running %s" % (args, ))
os.execv("/usr/local/bin/python", args)
if ownership is not None:
args = ["su-exec", ownership] + args
os.execv("/sbin/su-exec", args)
# make sure that synapse has perms to write to the data dir.
subprocess.check_output(["chown", ownership, data_dir])
else:
os.execv("/usr/local/bin/python", args)
def main(args, environ):
mode = args[1] if len(args) > 1 else None
ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
desired_uid = int(environ.get("UID", "991"))
desired_gid = int(environ.get("GID", "991"))
if (desired_uid == os.getuid()) and (desired_gid == os.getgid()):
ownership = None
else:
ownership = "{}:{}".format(desired_uid, desired_gid)
log(
"Container running as UserID %s:%s, ENV (or defaults) requests %s:%s"
% (os.getuid(), os.getgid(), desired_uid, desired_gid)
)
if ownership is None:
log("Will not perform chmod/su-exec as UserID already matches request")
# In generate mode, generate a configuration and missing keys, then exit
if mode == "generate":
@@ -227,16 +245,12 @@ def main(args, environ):
log("Starting synapse with config file " + config_path)
args = [
"su-exec",
ownership,
"python",
"-m",
"synapse.app.homeserver",
"--config-path",
config_path,
]
os.execv("/sbin/su-exec", args)
args = ["python", "-m", "synapse.app.homeserver", "--config-path", config_path]
if ownership is not None:
args = ["su-exec", ownership] + args
os.execv("/sbin/su-exec", args)
else:
os.execv("/usr/local/bin/python", args)
if __name__ == "__main__":
+62
View File
@@ -0,0 +1,62 @@
Room and User Statistics
========================
Synapse maintains room and user statistics (as well as a cache of room state),
in various tables. These can be used for administrative purposes but are also
used when generating the public room directory.
# Synapse Developer Documentation
## High-Level Concepts
### Definitions
* **subject**: Something we are tracking stats about currently a room or user.
* **current row**: An entry for a subject in the appropriate current statistics
table. Each subject can have only one.
* **historical row**: An entry for a subject in the appropriate historical
statistics table. Each subject can have any number of these.
### Overview
Stats are maintained as time series. There are two kinds of column:
* absolute columns where the value is correct for the time given by `end_ts`
in the stats row. (Imagine a line graph for these values)
* They can also be thought of as 'gauges' in Prometheus, if you are familiar.
* per-slice columns where the value corresponds to how many of the occurrences
occurred within the time slice given by `(end_ts bucket_size)…end_ts`
or `start_ts…end_ts`. (Imagine a histogram for these values)
Stats are maintained in two tables (for each type): current and historical.
Current stats correspond to the present values. Each subject can only have one
entry.
Historical stats correspond to values in the past. Subjects may have multiple
entries.
## Concepts around the management of stats
### Current rows
Current rows contain the most up-to-date statistics for a room.
They only contain absolute columns
### Historical rows
Historical rows can always be considered to be valid for the time slice and
end time specified.
* historical rows will not exist for every time slice they will be omitted
if there were no changes. In this case, the following assumptions can be
made to interpolate/recreate missing rows:
- absolute fields have the same values as in the preceding row
- per-slice fields are zero (`0`)
* historical rows will not be retained forever rows older than a configurable
time will be purged.
#### Purge
The purging of historical rows is not yet implemented.
-1
View File
@@ -704,7 +704,6 @@ class Auth(object):
and visibility.content["history_visibility"] == "world_readable"
):
return Membership.JOIN, None
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
+2
View File
@@ -262,6 +262,8 @@ class FederationSenderHandler(object):
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
if receipt.data.get("hidden", False):
return # do not send over federation
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,
-3
View File
@@ -107,7 +107,6 @@ class ApplicationServiceApi(SimpleHttpClient):
except CodeMessageException as e:
if e.code == 404:
return False
return
logger.warning("query_user to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("query_user to %s threw exception %s", uri, ex)
@@ -127,7 +126,6 @@ class ApplicationServiceApi(SimpleHttpClient):
logger.warning("query_alias to %s received %s", uri, e.code)
if e.code == 404:
return False
return
except Exception as ex:
logger.warning("query_alias to %s threw exception %s", uri, ex)
return False
@@ -230,7 +228,6 @@ class ApplicationServiceApi(SimpleHttpClient):
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
return True
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
+5 -8
View File
@@ -27,19 +27,16 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
self.stats_bucket_size = 86400
self.stats_bucket_size = 86400 * 1000
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
self.stats_bucket_size = (
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
self.stats_bucket_size = self.parse_duration(
stats_config.get("bucket_size", "1d")
)
self.stats_retention = (
self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
/ 1000
self.stats_retention = self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
+5 -5
View File
@@ -637,11 +637,11 @@ def auth_types_for_event(event):
if event.type == EventTypes.Create:
return []
auth_types = []
auth_types.append((EventTypes.PowerLevels, ""))
auth_types.append((EventTypes.Member, event.sender))
auth_types.append((EventTypes.Create, ""))
auth_types = [
(EventTypes.PowerLevels, ""),
(EventTypes.Member, event.sender),
(EventTypes.Create, ""),
]
if event.type == EventTypes.Member:
membership = event.content["membership"]
@@ -26,6 +26,7 @@ from synapse.logging.opentracing import (
set_tag,
start_active_span_follows_from,
tags,
whitelisted_homeserver,
)
from synapse.util.metrics import measure_func
@@ -59,9 +60,15 @@ class TransactionManager(object):
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)
span_contexts = (
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
)
span_contexts = []
keep_destination = whitelisted_homeserver(destination)
for edu in pending_edus:
context = edu.get_context()
if context:
span_contexts.append(extract_text_map(json.loads(context)))
if keep_destination:
edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts):
+5 -1
View File
@@ -342,7 +342,11 @@ class BaseFederationServlet(object):
continue
server.register_paths(
method, (pattern,), self._wrap(code), self.__class__.__name__
method,
(pattern,),
self._wrap(code),
self.__class__.__name__,
trace=False,
)
+3
View File
@@ -41,6 +41,9 @@ class Edu(JsonEncodedObject):
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
def strip_context(self):
getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home
-2
View File
@@ -294,12 +294,10 @@ class ApplicationServicesHandler(object):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
return False
return
user_info = yield self.store.get_user_by_id(user_id)
if user_info:
return False
return
# user not found; could be the AS though, so check.
services = self.store.get_app_services()
+1 -4
View File
@@ -25,7 +25,6 @@ from synapse.logging.opentracing import (
log_kv,
set_tag,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -121,9 +120,7 @@ class DeviceMessageHandler(object):
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
"org.matrix.opentracing_context": json.dumps(context),
}
log_kv({"local_messages": local_messages})
-1
View File
@@ -167,7 +167,6 @@ class EventHandler(BaseHandler):
if not event:
return None
return
users = yield self.store.get_users_in_room(event.room_id)
is_peeking = user.to_string() not in users
+122 -43
View File
@@ -61,21 +61,76 @@ class IdentityHandler(BaseHandler):
return False
return True
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
if "id_server" in creds:
id_server = creds["id_server"]
elif "idServer" in creds:
id_server = creds["idServer"]
else:
raise SynapseError(400, "No id_server in creds")
def _extract_items_from_creds_dict(self, creds):
"""
Retrieve entries from a "credentials" dictionary
if "client_secret" in creds:
client_secret = creds["client_secret"]
elif "clientSecret" in creds:
client_secret = creds["clientSecret"]
Args:
creds (dict[str, str]): Dictionary of credentials that contain the following keys:
* client_secret|clientSecret: A unique secret str provided by the client
* id_server|idServer: the domain of the identity server to query
* id_access_token: The access token to authenticate to the identity
server with.
Returns:
tuple(str, str, str|None): A tuple containing the client_secret, the id_server,
and the id_access_token value if available.
"""
client_secret = creds.get("client_secret") or creds.get("clientSecret")
if not client_secret:
raise SynapseError(
400, "No client_secret in creds", errcode=Codes.MISSING_PARAM
)
id_server = creds.get("id_server") or creds.get("idServer")
if not id_server:
raise SynapseError(
400, "No id_server in creds", errcode=Codes.MISSING_PARAM
)
id_access_token = creds.get("id_access_token")
return client_secret, id_server, id_access_token
@defer.inlineCallbacks
def threepid_from_creds(self, creds, use_v2=True):
"""
Retrieve and validate a threepid identitier from a "credentials" dictionary
Args:
creds (dict[str, str]): Dictionary of credentials that contain the following keys:
* client_secret|clientSecret: A unique secret str provided by the client
* id_server|idServer: the domain of the identity server to query
* id_access_token: The access token to authenticate to the identity
server with. Required if use_v2 is true
use_v2 (bool): Whether to use v2 Identity Service API endpoints
Returns:
Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
the /getValidated3pid endpoint of the Identity Service API, or None if the
threepid was not found
"""
client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
creds
)
# If an id_access_token is not supplied, force usage of v1
if id_access_token is None:
use_v2 = False
query_params = {"sid": creds["sid"], "client_secret": client_secret}
# Decide which API endpoint URLs and query parameters to use
if use_v2:
url = "https://%s%s" % (
id_server,
"/_matrix/identity/v2/3pid/getValidated3pid",
)
query_params["id_access_token"] = id_access_token
else:
raise SynapseError(400, "No client_secret in creds")
url = "https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/3pid/getValidated3pid",
)
if not self._should_trust_id_server(id_server):
logger.warn(
@@ -85,43 +140,55 @@ class IdentityHandler(BaseHandler):
return None
try:
data = yield self.http_client.get_json(
"https://%s%s"
% (id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid"),
{"sid": creds["sid"], "client_secret": client_secret},
)
data = yield self.http_client.get_json(url, query_params)
return data if "medium" in data else None
except HttpResponseException as e:
logger.info("getValidated3pid failed with Matrix error: %r", e)
raise e.to_synapse_error()
if e.code != 404 or not use_v2:
# Generic failure
logger.info("getValidated3pid failed with Matrix error: %r", e)
raise e.to_synapse_error()
if "medium" in data:
return data
return None
# This identity server is too old to understand Identity Service API v2
# Attempt v1 endpoint
logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", url)
return (yield self.threepid_from_creds(creds, use_v2=False))
@defer.inlineCallbacks
def bind_threepid(self, creds, mxid):
def bind_threepid(self, creds, mxid, use_v2=True):
"""Bind a 3PID to an identity server
Args:
creds (dict[str, str]): Dictionary of credentials that contain the following keys:
* client_secret|clientSecret: A unique secret str provided by the client
* id_server|idServer: the domain of the identity server to query
* id_access_token: The access token to authenticate to the identity
server with. Required if use_v2 is true
mxid (str): The MXID to bind the 3PID to
use_v2 (bool): Whether to use v2 Identity Service API endpoints
Returns:
Deferred[dict]: The response from the identity server
"""
logger.debug("binding threepid %r to %s", creds, mxid)
data = None
if "id_server" in creds:
id_server = creds["id_server"]
elif "idServer" in creds:
id_server = creds["idServer"]
else:
raise SynapseError(400, "No id_server in creds")
client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
creds
)
if "client_secret" in creds:
client_secret = creds["client_secret"]
elif "clientSecret" in creds:
client_secret = creds["clientSecret"]
# If an id_access_token is not supplied, force usage of v1
if id_access_token is None:
use_v2 = False
# Decide which API endpoint URLs to use
bind_data = {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid}
if use_v2:
bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
bind_data["id_access_token"] = id_access_token
else:
raise SynapseError(400, "No client_secret in creds")
bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
try:
data = yield self.http_client.post_json_get_json(
"https://%s%s" % (id_server, "/_matrix/identity/api/v1/3pid/bind"),
{"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid},
)
data = yield self.http_client.post_json_get_json(bind_url, bind_data)
logger.debug("bound threepid %r to %s", creds, mxid)
# Remember where we bound the threepid
@@ -131,13 +198,23 @@ class IdentityHandler(BaseHandler):
address=data["address"],
id_server=id_server,
)
return data
except HttpResponseException as e:
if e.code != 404 or not use_v2:
logger.error("3PID bind failed with Matrix error: %r", e)
raise e.to_synapse_error()
except CodeMessageException as e:
data = json.loads(e.msg) # XXX WAT?
return data
return data
logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
return (yield self.bind_threepid(creds, mxid, use_v2=False))
@defer.inlineCallbacks
def try_unbind_threepid(self, mxid, threepid):
"""Removes a binding from an identity server
"""Attempt to remove a 3PID from an identity server, or if one is not provided, all
identity servers we're aware the binding is present on
Args:
mxid (str): Matrix user ID of binding to be removed
@@ -188,6 +265,8 @@ class IdentityHandler(BaseHandler):
server doesn't support unbinding
"""
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
content = {
"mxid": mxid,
"threepid": {"medium": threepid["medium"], "address": threepid["address"]},
@@ -199,7 +278,7 @@ class IdentityHandler(BaseHandler):
auth_headers = self.federation_http_client.build_auth_headers(
destination=None,
method="POST",
url_bytes="/_matrix/identity/api/v1/3pid/unbind".encode("ascii"),
url_bytes=url_bytes,
content=content,
destination_is=id_server,
)
+6 -3
View File
@@ -395,7 +395,12 @@ class InitialSyncHandler(BaseHandler):
)
if not receipts:
receipts = []
return receipts
return [
r
for r in receipts
if not r.data.get("hidden", False) or r.user_id == user_id
]
presence, receipts, (messages, token) = yield make_deferred_yieldable(
defer.gatherResults(
@@ -450,7 +455,6 @@ class InitialSyncHandler(BaseHandler):
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
return member_event.membership, member_event.event_id
return
except AuthError:
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
@@ -460,7 +464,6 @@ class InitialSyncHandler(BaseHandler):
and visibility.content["history_visibility"] == "world_readable"
):
return Membership.JOIN, None
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
+1 -1
View File
@@ -255,7 +255,7 @@ class PresenceHandler(object):
self.unpersisted_users_changes = set()
if unpersisted:
logger.info("Persisting %d upersisted presence updates", len(unpersisted))
logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
yield self.store.update_presence(
[self.user_to_current_state[user_id] for user_id in unpersisted]
)
+4 -3
View File
@@ -106,7 +106,7 @@ class ReceiptsHandler(BaseHandler):
return True
@defer.inlineCallbacks
def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
def received_client_receipt(self, room_id, receipt_type, user_id, event_id, hidden):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
@@ -115,14 +115,15 @@ class ReceiptsHandler(BaseHandler):
receipt_type=receipt_type,
user_id=user_id,
event_ids=[event_id],
data={"ts": int(self.clock.time_msec())},
data={"ts": int(self.clock.time_msec()), "hidden": hidden},
)
is_new = yield self._handle_new_receipts([receipt])
if not is_new:
return
yield self.federation.send_read_receipt(receipt)
if not hidden:
yield self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
+6 -46
View File
@@ -397,7 +397,6 @@ class RegistrationHandler(BaseHandler):
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
@@ -544,9 +543,7 @@ class RegistrationHandler(BaseHandler):
return (device_id, access_token)
@defer.inlineCallbacks
def post_registration_actions(
self, user_id, auth_result, access_token, bind_email, bind_msisdn
):
def post_registration_actions(self, user_id, auth_result, access_token):
"""A user has completed registration
Args:
@@ -555,18 +552,10 @@ class RegistrationHandler(BaseHandler):
registered user.
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
bind_email (bool): Whether to bind the email with the identity
server.
bind_msisdn (bool): Whether to bind the msisdn with the identity
server.
"""
if self.hs.config.worker_app:
yield self._post_registration_client(
user_id=user_id,
auth_result=auth_result,
access_token=access_token,
bind_email=bind_email,
bind_msisdn=bind_msisdn,
user_id=user_id, auth_result=auth_result, access_token=access_token
)
return
@@ -579,13 +568,11 @@ class RegistrationHandler(BaseHandler):
):
yield self.store.upsert_monthly_active_user(user_id)
yield self._register_email_threepid(
user_id, threepid, access_token, bind_email
)
yield self._register_email_threepid(user_id, threepid, access_token)
if auth_result and LoginType.MSISDN in auth_result:
threepid = auth_result[LoginType.MSISDN]
yield self._register_msisdn_threepid(user_id, threepid, bind_msisdn)
yield self._register_msisdn_threepid(user_id, threepid)
if auth_result and LoginType.TERMS in auth_result:
yield self._on_user_consented(user_id, self.hs.config.user_consent_version)
@@ -604,14 +591,12 @@ class RegistrationHandler(BaseHandler):
yield self.post_consent_actions(user_id)
@defer.inlineCallbacks
def _register_email_threepid(self, user_id, threepid, token, bind_email):
def _register_email_threepid(self, user_id, threepid, token):
"""Add an email address as a 3pid identifier
Also adds an email pusher for the email address, if configured in the
HS config
Also optionally binds emails to the given user_id on the identity server
Must be called on master.
Args:
@@ -619,8 +604,6 @@ class RegistrationHandler(BaseHandler):
threepid (object): m.login.email.identity auth response
token (str|None): access_token for the user, or None if not logged
in.
bind_email (bool): true if the client requested the email to be
bound at the identity server
Returns:
defer.Deferred:
"""
@@ -662,29 +645,15 @@ class RegistrationHandler(BaseHandler):
data={},
)
if bind_email:
logger.info("bind_email specified: binding")
logger.debug("Binding emails %s to %s" % (threepid, user_id))
yield self.identity_handler.bind_threepid(
threepid["threepid_creds"], user_id
)
else:
logger.info("bind_email not specified: not binding email")
@defer.inlineCallbacks
def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
def _register_msisdn_threepid(self, user_id, threepid):
"""Add a phone number as a 3pid identifier
Also optionally binds msisdn to the given user_id on the identity server
Must be called on master.
Args:
user_id (str): id of user
threepid (object): m.login.msisdn auth response
token (str): access_token for the user
bind_email (bool): true if the client requested the email to be
bound at the identity server
Returns:
defer.Deferred:
"""
@@ -700,12 +669,3 @@ class RegistrationHandler(BaseHandler):
yield self._auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
)
if bind_msisdn:
logger.info("bind_msisdn specified: binding")
logger.debug("Binding msisdn %s to %s", threepid, user_id)
yield self.identity_handler.bind_threepid(
threepid["threepid_creds"], user_id
)
else:
logger.info("bind_msisdn not specified: not binding msisdn")
-1
View File
@@ -852,7 +852,6 @@ class RoomContextHandler(object):
)
if not event:
return None
return
filtered = yield (filter_evts([event]))
if not filtered:
+2 -7
View File
@@ -962,9 +962,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
if complexity:
if complexity["v1"] > max_complexity:
return True
return False
return complexity["v1"] > max_complexity
return None
@defer.inlineCallbacks
@@ -980,10 +978,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
max_complexity = self.hs.config.limit_remote_rooms.complexity
complexity = yield self.store.get_room_complexity(room_id)
if complexity["v1"] > max_complexity:
return True
return False
return complexity["v1"] > max_complexity
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+132 -175
View File
@@ -14,15 +14,14 @@
# limitations under the License.
import logging
from collections import Counter
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import EventTypes, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler):
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
if not self.hs.config.stats_enabled:
if not self.hs.config.stats_enabled or self._is_processing:
return
if self._is_processing:
return
self._is_processing = True
@defer.inlineCallbacks
def process():
@@ -75,39 +73,72 @@ class StatsHandler(StateDeltasHandler):
finally:
self._is_processing = False
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
return None
self.pos = yield self.store.get_stats_positions()
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
if not deltas:
return
deltas = yield self.store.get_current_state_deltas(self.pos)
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
room_deltas, user_deltas = yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
max_pos = deltas[-1]["stream_id"]
else:
room_deltas = {}
user_deltas = {}
max_pos = yield self.store.get_room_max_stream_ordering()
event_processing_positions.labels("stats").set(self.pos)
# Then count deltas for total_events and total_event_bytes.
room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
self.pos, max_pos
)
for room_id, fields in room_count.items():
room_deltas.setdefault(room_id, {}).update(fields)
for user_id, fields in user_count.items():
user_deltas.setdefault(user_id, {}).update(fields)
logger.debug("room_deltas: %s", room_deltas)
logger.debug("user_deltas: %s", user_deltas)
# Always call this so that we update the stats position.
yield self.store.bulk_update_stats_delta(
self.clock.time_msec(),
updates={"room": room_deltas, "user": user_deltas},
stream_id=max_pos,
)
event_processing_positions.labels("stats").set(max_pos)
if self.pos == max_pos:
break
self.pos = max_pos
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
Returns:
Deferred[tuple[dict[str, Counter], dict[str, counter]]]
Resovles to two dicts, the room deltas and the user deltas,
mapping from room/user ID to changes in the various fields.
"""
Called with the state deltas to process
"""
room_to_stats_deltas = {}
user_to_stats_deltas = {}
room_to_state_updates = {}
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
@@ -115,11 +146,10 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@@ -131,203 +161,130 @@ class StatsHandler(StateDeltasHandler):
continue
if event_id is None and prev_event_id is None:
# Errr...
logger.error(
"event ID is None and so is the previous event ID. stream_id: %s",
stream_id,
)
continue
event_content = {}
sender = None
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
sender = event.sender
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
room_state = room_to_state_updates.setdefault(room_id, {})
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] += 1
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif membership == prev_membership:
pass # noop
elif prev_membership == Membership.JOIN:
room_stats_delta["joined_members"] -= 1
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
)
room_stats_delta["invited_members"] -= 1
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
)
room_stats_delta["left_members"] -= 1
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
)
room_stats_delta["banned_members"] -= 1
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
logger.error(err)
raise ValueError(err)
raise ValueError(
"%r is not a valid prev_membership" % (prev_membership,)
)
if membership == prev_membership:
pass # noop
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
)
room_stats_delta["joined_members"] += 1
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
)
room_stats_delta["invited_members"] += 1
if sender and self.is_mine_id(sender):
user_to_stats_deltas.setdefault(sender, Counter())[
"invites_sent"
] += 1
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
)
room_stats_delta["left_members"] += 1
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
)
room_stats_delta["banned_members"] += 1
else:
err = "%s is not a valid membership" % (repr(membership),)
logger.error(err)
raise ValueError(err)
raise ValueError("%r is not a valid membership" % (membership,))
user_id = state_key
if self.is_mine_id(user_id):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
# this accounts for transitions like leave → ban and so on.
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
membership == Membership.JOIN
)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
if has_changed_joinedness:
delta = +1 if membership == Membership.JOIN else -1
user_to_stats_deltas.setdefault(user_id, Counter())[
"joined_rooms"
] += delta
room_stats_delta["local_users_in_room"] += delta
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
yield self.store.update_room_state(
room_id,
{
"join_rules": None,
"history_visibility": None,
"encryption": None,
"name": None,
"topic": None,
"avatar": None,
"canonical_alias": None,
},
)
room_state["is_federatable"] = event_content.get("m.federate", True)
if sender and self.is_mine_id(sender):
user_to_stats_deltas.setdefault(sender, Counter())[
"rooms_created"
] += 1
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
room_state["join_rules"] = event_content.get("join_rule")
elif typ == EventTypes.RoomHistoryVisibility:
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
room_state["history_visibility"] = event_content.get(
"history_visibility"
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
room_id, {"encryption": event_content.get("algorithm")}
)
room_state["encryption"] = event_content.get("algorithm")
elif typ == EventTypes.Name:
yield self.store.update_room_state(
room_id, {"name": event_content.get("name")}
)
room_state["name"] = event_content.get("name")
elif typ == EventTypes.Topic:
yield self.store.update_room_state(
room_id, {"topic": event_content.get("topic")}
)
room_state["topic"] = event_content.get("topic")
elif typ == EventTypes.RoomAvatar:
yield self.store.update_room_state(
room_id, {"avatar": event_content.get("url")}
)
room_state["avatar"] = event_content.get("url")
elif typ == EventTypes.CanonicalAlias:
yield self.store.update_room_state(
room_id, {"canonical_alias": event_content.get("alias")}
)
room_state["canonical_alias"] = event_content.get("alias")
elif typ == EventTypes.GuestAccess:
room_state["guest_access"] = event_content.get("guest_access")
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
Increment/decrement a user's number of public rooms when a room they are
in changes to/from public visibility.
for room_id, state in room_to_state_updates.items():
yield self.store.update_room_state(room_id, state)
Args:
ts (int): Timestamp in seconds
room_id (str)
is_public (bool)
"""
# For now, blindly iterate over all local users in the room so that
# we can handle the whole problem of copying buckets over as needed
user_ids = yield self.store.get_users_in_room(room_id)
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
)
@defer.inlineCallbacks
def _is_public_room(self, room_id):
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
history_visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility
)
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
(
history_visibility
and history_visibility.content.get("history_visibility")
== "world_readable"
)
):
return True
else:
return False
return room_to_stats_deltas, user_to_stats_deltas
+28 -2
View File
@@ -334,6 +334,7 @@ class SyncHandler(object):
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
@@ -376,7 +377,33 @@ class SyncHandler(object):
room_id = event["room_id"]
# exclude room id, as above
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
# filter out receipts the user shouldn't see
content = event_copy.get("content", {})
event_ids = content.keys()
reconstructed = event_copy.copy()
reconstructed["content"] = {} # clear old content
for event_id in event_ids:
m_read = content[event_id].get("m.read", None)
if m_read is None:
# clone it now - it's not something we can process
reconstructed["content"][event_id] = content[event_id]
continue
user_ids = m_read.keys()
for rr_user_id in user_ids:
data = m_read[rr_user_id]
hidden = data.get("hidden", False)
if rr_user_id == user_id or not hidden:
# append the key to the reconstructed receipt
new_content = reconstructed["content"]
if new_content.get(event_id, None) is None:
new_content[event_id] = {"m.read": {}}
ev_content = new_content[event_id]["m.read"]
if ev_content.get(rr_user_id, None) is None:
ev_content[rr_user_id] = {}
ev_content[rr_user_id] = data
if len(reconstructed["content"].keys()) > 0:
ephemeral_by_room.setdefault(room_id, []).append(reconstructed)
return now_token, ephemeral_by_room
@@ -578,7 +605,6 @@ class SyncHandler(object):
if not last_events:
return None
return
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
+49 -34
View File
@@ -46,6 +46,7 @@ from synapse.http import (
redact_uri,
)
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -269,42 +270,56 @@ class SimpleHttpClient(object):
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
body_producer = None
if data is not None:
body_producer = QuieterFileBodyProducer(BytesIO(data))
with start_active_span(
"outgoing-client-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.HTTP_METHOD: method,
tags.HTTP_URL: uri,
},
finish_on_close=True,
):
try:
body_producer = None
if data is not None:
body_producer = QuieterFileBodyProducer(BytesIO(data))
request_deferred = treq.request(
method,
uri,
agent=self.agent,
data=body_producer,
headers=headers,
**self._extra_treq_args
)
request_deferred = timeout_deferred(
request_deferred,
60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
request_deferred = treq.request(
method,
uri,
agent=self.agent,
data=body_producer,
headers=headers,
**self._extra_treq_args
)
request_deferred = timeout_deferred(
request_deferred,
60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s", method, redact_uri(uri), response.code
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,
redact_uri(uri),
type(e).__name__,
e.args[0],
)
raise
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method,
redact_uri(uri),
response.code,
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,
redact_uri(uri),
type(e).__name__,
e.args[0],
)
set_tag(tags.ERROR, True)
set_tag("error_reason", e.args[0])
raise
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
-1
View File
@@ -345,7 +345,6 @@ class MatrixFederationHttpClient(object):
else:
query_bytes = b""
# Retreive current span
scope = start_active_span(
"outgoing-federation-request",
tags={
+12 -1
View File
@@ -40,6 +40,7 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.logging.context import preserve_fn
from synapse.logging.opentracing import trace_servlet
from synapse.util.caches import intern_dict
logger = logging.getLogger(__name__)
@@ -257,7 +258,9 @@ class JsonResource(HttpServer, resource.Resource):
self.path_regexs = {}
self.hs = hs
def register_paths(self, method, path_patterns, callback, servlet_classname):
def register_paths(
self, method, path_patterns, callback, servlet_classname, trace=True
):
"""
Registers a request handler against a regular expression. Later request URLs are
checked against these regular expressions in order to identify an appropriate
@@ -273,8 +276,16 @@ class JsonResource(HttpServer, resource.Resource):
servlet_classname (str): The name of the handler to be used in prometheus
and opentracing logs.
trace (bool): Whether we should start a span to trace the servlet.
"""
method = method.encode("utf-8") # method is bytes on py3
if trace:
# We don't extract the context from the servlet because we can't
# trust the sender
callback = trace_servlet(servlet_classname)(callback)
for path_pattern in path_patterns:
logger.debug("Registering for %s %s", method, path_pattern.pattern)
self.path_regexs.setdefault(method, []).append(
+1 -5
View File
@@ -20,7 +20,6 @@ import logging
from canonicaljson import json
from synapse.api.errors import Codes, SynapseError
from synapse.logging.opentracing import trace_servlet
logger = logging.getLogger(__name__)
@@ -298,10 +297,7 @@ class RestServlet(object):
servlet_classname = self.__class__.__name__
method_handler = getattr(self, "on_%s" % (method,))
http_server.register_paths(
method,
patterns,
trace_servlet(servlet_classname)(method_handler),
servlet_classname,
method, patterns, method_handler, servlet_classname
)
else:
+75 -29
View File
@@ -239,8 +239,7 @@ _homeserver_whitelist = None
def only_if_tracing(func):
"""Executes the function only if we're tracing. Otherwise return.
Assumes the function wrapped may return None"""
"""Executes the function only if we're tracing. Otherwise returns None."""
@wraps(func)
def _only_if_tracing_inner(*args, **kwargs):
@@ -252,6 +251,41 @@ def only_if_tracing(func):
return _only_if_tracing_inner
def ensure_active_span(message, ret=None):
"""Executes the operation only if opentracing is enabled and there is an active span.
If there is no active span it logs message at the error level.
Args:
message (str): Message which fills in "There was no active span when trying to %s"
in the error log if there is no active span and opentracing is enabled.
ret (object): return value if opentracing is None or there is no active span.
Returns (object): The result of the func or ret if opentracing is disabled or there
was no active span.
"""
def ensure_active_span_inner_1(func):
@wraps(func)
def ensure_active_span_inner_2(*args, **kwargs):
if not opentracing:
return ret
if not opentracing.tracer.active_span:
logger.error(
"There was no active span when trying to %s."
" Did you forget to start one or did a context slip?",
message,
)
return ret
return func(*args, **kwargs)
return ensure_active_span_inner_2
return ensure_active_span_inner_1
@contextlib.contextmanager
def _noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
@@ -319,7 +353,7 @@ def whitelisted_homeserver(destination):
Args:
destination (str)
"""
_homeserver_whitelist
if _homeserver_whitelist:
return _homeserver_whitelist.match(destination)
return False
@@ -349,26 +383,24 @@ def start_active_span(
if opentracing is None:
return _noop_context_manager()
else:
# We need to enter the scope here for the logcontext to become active
return opentracing.tracer.start_active_span(
operation_name,
child_of=child_of,
references=references,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
)
return opentracing.tracer.start_active_span(
operation_name,
child_of=child_of,
references=references,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
)
def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
return _noop_context_manager()
else:
references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
return scope
references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
return scope
def start_active_span_from_request(
@@ -465,19 +497,19 @@ def start_active_span_from_edu(
# Opentracing setters for tags, logs, etc
@only_if_tracing
@ensure_active_span("set a tag")
def set_tag(key, value):
"""Sets a tag on the active span"""
opentracing.tracer.active_span.set_tag(key, value)
@only_if_tracing
@ensure_active_span("log")
def log_kv(key_values, timestamp=None):
"""Log to the active span"""
opentracing.tracer.active_span.log_kv(key_values, timestamp)
@only_if_tracing
@ensure_active_span("set the traces operation name")
def set_operation_name(operation_name):
"""Sets the operation name of the active span"""
opentracing.tracer.active_span.set_operation_name(operation_name)
@@ -486,13 +518,18 @@ def set_operation_name(operation_name):
# Injection and extraction
@only_if_tracing
@ensure_active_span("inject the span into a header")
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
"""
Injects a span context into twisted headers in-place
Args:
headers (twisted.web.http_headers.Headers)
destination (str): address of entity receiving the span context. If check_destination
is true the context will only be injected if the destination matches the
opentracing whitelist
check_destination (bool): If false, destination will be ignored and the context
will always be injected.
span (opentracing.Span)
Returns:
@@ -517,7 +554,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
headers.addRawHeaders(key, value)
@only_if_tracing
@ensure_active_span("inject the span into a byte dict")
def inject_active_span_byte_dict(headers, destination, check_destination=True):
"""
Injects a span context into a dict where the headers are encoded as byte
@@ -525,6 +562,11 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
Args:
headers (dict)
destination (str): address of entity receiving the span context. If check_destination
is true the context will only be injected if the destination matches the
opentracing whitelist
check_destination (bool): If false, destination will be ignored and the context
will always be injected.
span (opentracing.Span)
Returns:
@@ -537,7 +579,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
here:
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
if not whitelisted_homeserver(destination):
if check_destination and not whitelisted_homeserver(destination):
return
span = opentracing.tracer.active_span
@@ -549,16 +591,18 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
headers[key.encode()] = [value.encode()]
@only_if_tracing
@ensure_active_span("inject the span into a text map")
def inject_active_span_text_map(carrier, destination, check_destination=True):
"""
Injects a span context into a dict
Args:
carrier (dict)
destination (str): the name of the remote server. The span context
will only be injected if the destination matches the homeserver_whitelist
or destination is None.
destination (str): address of entity receiving the span context. If check_destination
is true the context will only be injected if the destination matches the
opentracing whitelist
check_destination (bool): If false, destination will be ignored and the context
will always be injected.
Returns:
In-place modification of carrier
@@ -579,6 +623,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
)
@ensure_active_span("get the active span context as a dict", ret={})
def get_active_span_text_map(destination=None):
"""
Gets a span context as a dict. This can be used instead of manually
@@ -591,7 +636,7 @@ def get_active_span_text_map(destination=None):
dict: the active span's context if opentracing is enabled, otherwise empty.
"""
if not opentracing or (destination and not whitelisted_homeserver(destination)):
if destination and not whitelisted_homeserver(destination):
return {}
carrier = {}
@@ -602,6 +647,7 @@ def get_active_span_text_map(destination=None):
return carrier
@ensure_active_span("get the span context as a string.", ret={})
def active_span_context_as_string():
"""
Returns:
+11 -10
View File
@@ -22,13 +22,17 @@ from six.moves import urllib
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
RequestSendFailed,
SynapseError,
)
from synapse.logging.opentracing import (
inject_active_span_byte_dict,
trace,
trace_servlet,
)
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
@@ -129,6 +133,7 @@ class ReplicationEndpoint(object):
client = hs.get_simple_http_client()
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
@@ -167,9 +172,7 @@ class ReplicationEndpoint(object):
# the master, and so whether we should clean up or not.
while True:
headers = {}
opentracing.inject_active_span_byte_dict(
headers, None, check_destination=False
)
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
break
@@ -210,13 +213,11 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
handler = trace_servlet(self.__class__.__name__, extract_context=True)(handler)
# We don't let register paths trace this servlet using the default tracing
# options because we wish to extract the context explicitly.
http_server.register_paths(
method,
[pattern],
opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
handler
),
self.__class__.__name__,
method, [pattern], handler, self.__class__.__name__, trace=False
)
def _cached_handler(self, request, txn_id, **kwargs):
+3 -18
View File
@@ -106,7 +106,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
self.registration_handler = hs.get_registration_handler()
@staticmethod
def _serialize_payload(user_id, auth_result, access_token, bind_email, bind_msisdn):
def _serialize_payload(user_id, auth_result, access_token):
"""
Args:
user_id (str): The user ID that consented
@@ -114,17 +114,8 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
registered user.
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
bind_email (bool): Whether to bind the email with the identity
server
bind_msisdn (bool): Whether to bind the msisdn with the identity
server
"""
return {
"auth_result": auth_result,
"access_token": access_token,
"bind_email": bind_email,
"bind_msisdn": bind_msisdn,
}
return {"auth_result": auth_result, "access_token": access_token}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
@@ -132,15 +123,9 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
auth_result = content["auth_result"]
access_token = content["access_token"]
bind_email = content["bind_email"]
bind_msisdn = content["bind_msisdn"]
yield self.registration_handler.post_registration_actions(
user_id=user_id,
auth_result=auth_result,
access_token=access_token,
bind_email=bind_email,
bind_msisdn=bind_msisdn,
user_id=user_id, auth_result=auth_result, access_token=access_token
)
return 200, {}
+5 -2
View File
@@ -41,7 +41,7 @@ from synapse.rest.admin._base import (
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.users import UserAdminServlet
@@ -761,9 +761,12 @@ def register_servlets_for_client_rest_resource(hs, http_server):
DeleteGroupAdminRestServlet(hs).register(http_server)
AccountValidityRenewServlet(hs).register(http_server)
# Load the media repo ones if we're using them.
# Load the media repo ones if we're using them. Otherwise load the servlets which
# don't need a media repo (typically readonly admin APIs).
if hs.config.can_load_media_repo:
register_servlets_for_media_repo(hs, http_server)
else:
ListMediaInRoom(hs).register(http_server)
# don't add more things here: new servlets should only be exposed on
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.
+1
View File
@@ -60,6 +60,7 @@ class ListMediaInRoom(RestServlet):
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
-1
View File
@@ -703,7 +703,6 @@ class RoomMembershipRestServlet(TransactionRestServlet):
txn_id,
)
return 200, {}
return
target = requester.user
if membership_action in ["invite", "ban", "unban", "kick"]:
+40 -6
View File
@@ -542,15 +542,16 @@ class ThreepidRestServlet(RestServlet):
def on_POST(self, request):
body = parse_json_object_from_request(request)
threePidCreds = body.get("threePidCreds")
threePidCreds = body.get("three_pid_creds", threePidCreds)
if threePidCreds is None:
raise SynapseError(400, "Missing param", Codes.MISSING_PARAM)
threepid_creds = body.get("threePidCreds") or body.get("three_pid_creds")
if threepid_creds is None:
raise SynapseError(
400, "Missing param three_pid_creds", Codes.MISSING_PARAM
)
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
threepid = yield self.identity_handler.threepid_from_creds(threePidCreds)
threepid = yield self.identity_handler.threepid_from_creds(threepid_creds)
if not threepid:
raise SynapseError(400, "Failed to auth 3pid", Codes.THREEPID_AUTH_FAILED)
@@ -566,11 +567,43 @@ class ThreepidRestServlet(RestServlet):
if "bind" in body and body["bind"]:
logger.debug("Binding threepid %s to %s", threepid, user_id)
yield self.identity_handler.bind_threepid(threePidCreds, user_id)
yield self.identity_handler.bind_threepid(threepid_creds, user_id)
return 200, {}
class ThreepidUnbindRestServlet(RestServlet):
PATTERNS = client_patterns("/account/3pid/unbind$")
def __init__(self, hs):
super(ThreepidUnbindRestServlet, self).__init__()
self.hs = hs
self.identity_handler = hs.get_handlers().identity_handler
self.auth = hs.get_auth()
self.datastore = self.hs.get_datastore()
@defer.inlineCallbacks
def on_POST(self, request):
"""Unbind the given 3pid from a specific identity server, or identity servers that are
known to have this 3pid bound
"""
requester = yield self.auth.get_user_by_req(request)
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ["medium", "address"])
medium = body.get("medium")
address = body.get("address")
id_server = body.get("id_server")
# Attempt to unbind the threepid from an identity server. If id_server is None, try to
# unbind from all identity servers this threepid has been added to in the past
result = yield self.identity_handler.try_unbind_threepid(
requester.user.to_string(),
{"address": address, "medium": medium, "id_server": id_server},
)
return 200, {"id_server_unbind_result": "success" if result else "no-support"}
class ThreepidDeleteRestServlet(RestServlet):
PATTERNS = client_patterns("/account/3pid/delete$")
@@ -629,5 +662,6 @@ def register_servlets(hs, http_server):
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
ThreepidUnbindRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
@@ -49,6 +49,7 @@ class ReadMarkerRestServlet(RestServlet):
"m.read",
user_id=requester.user.to_string(),
event_id=read_event_id,
hidden=body.get("m.hidden", False),
)
read_marker_event_id = body.get("m.fully_read", None)
+8 -2
View File
@@ -18,7 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from ._base import client_patterns
@@ -46,10 +46,16 @@ class ReceiptRestServlet(RestServlet):
if receipt_type != "m.read":
raise SynapseError(400, "Receipt type must be 'm.read'")
body = parse_json_object_from_request(request)
yield self.presence_handler.bump_presence_active_time(requester.user)
yield self.receipts_handler.received_client_receipt(
room_id, receipt_type, user_id=requester.user.to_string(), event_id=event_id
room_id,
receipt_type,
user_id=requester.user.to_string(),
event_id=event_id,
hidden=body.get("m.hidden", False),
)
return 200, {}
-4
View File
@@ -230,7 +230,6 @@ class RegisterRestServlet(RestServlet):
if kind == b"guest":
ret = yield self._do_guest_registration(body, address=client_addr)
return ret
return
elif kind != b"user":
raise UnrecognizedRequestError(
"Do not understand membership kind: %s" % (kind,)
@@ -280,7 +279,6 @@ class RegisterRestServlet(RestServlet):
desired_username, access_token, body
)
return 200, result # we throw for non 200 responses
return
# for regular registration, downcase the provided username before
# attempting to register it. This should mean
@@ -483,8 +481,6 @@ class RegisterRestServlet(RestServlet):
user_id=registered_user_id,
auth_result=auth_result,
access_token=return_dict.get("access_token"),
bind_email=params.get("bind_email"),
bind_msisdn=params.get("bind_msisdn"),
)
return 200, return_dict
@@ -183,7 +183,6 @@ class PreviewUrlResource(DirectServeResource):
if isinstance(og, six.text_type):
og = og.encode("utf8")
return og
return
media_info = yield self._download_url(url, user)
-1
View File
@@ -136,7 +136,6 @@ class StateHandler(object):
if event_id:
event = yield self.store.get_event(event_id, allow_none=True)
return event
return
state_map = yield self.store.get_events(
list(state.values()), get_prev_content=False
-1
View File
@@ -165,7 +165,6 @@ class ApplicationServiceTransactionWorkerStore(
)
if result:
return result.get("state")
return
return None
def set_appservice_state(self, service, state):
+1 -1
View File
@@ -856,7 +856,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"ts": now,
"opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
else "{}",
}
for destination in hosts
for device_id in device_ids
-2
View File
@@ -47,7 +47,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not room_id:
return None
return
servers = yield self._simple_select_onecol(
"room_alias_servers",
@@ -58,7 +57,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not servers:
return None
return
return RoomAliasMapping(room_id, room_alias.to_string(), servers)
+3 -2
View File
@@ -2270,8 +2270,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_state",
"room_stats_current",
"room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
-1
View File
@@ -35,7 +35,6 @@ class ProfileWorkerStore(SQLBaseStore):
if e.code == 404:
# no match
return ProfileInfo(None, None)
return
else:
raise
+12
View File
@@ -869,6 +869,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@@ -1140,6 +1151,7 @@ class RegistrationStore(
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""
# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(
+23 -21
View File
@@ -112,29 +112,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
return self.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)
txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
def get_users_in_room_txn(self, txn, room_id):
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
return self.runInteraction("get_users_in_room", f)
txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
@cached(max_entries=100000)
def get_room_summary(self, room_id):
@@ -0,0 +1,152 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS room_state;
DROP TABLE IF EXISTS room_stats_state;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_process_users',
'populate_stats_cleanup'
);
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', '');
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
DROP TABLE IF EXISTS stats_incremental_position;
CREATE TABLE stats_incremental_position (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);
-- insert a null row and make sure it is the only one.
INSERT INTO stats_incremental_position (
stream_id
) SELECT COALESCE(MAX(stream_ordering), 0) from events;
-- represents PRESENT room statistics for a room
-- only holds absolute fields
DROP TABLE IF EXISTS room_stats_current;
CREATE TABLE room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
-- These are absolute counts
current_state_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
local_users_in_room INT NOT NULL,
-- The maximum delta stream position that this row takes into account.
completed_delta_stream_id BIGINT NOT NULL
);
-- represents HISTORICAL room statistics for a room
DROP TABLE IF EXISTS room_stats_historical;
CREATE TABLE room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
-- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size BIGINT NOT NULL,
-- These stats are absolute counts
current_state_events BIGINT NOT NULL,
joined_members BIGINT NOT NULL,
invited_members BIGINT NOT NULL,
left_members BIGINT NOT NULL,
banned_members BIGINT NOT NULL,
local_users_in_room BIGINT NOT NULL,
-- These stats are per time slice
total_events BIGINT NOT NULL,
total_event_bytes BIGINT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- represents PRESENT statistics for a user
-- only holds absolute fields
DROP TABLE IF EXISTS user_stats_current;
CREATE TABLE user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
joined_rooms BIGINT NOT NULL,
-- The maximum delta stream position that this row takes into account.
completed_delta_stream_id BIGINT NOT NULL
);
-- represents HISTORICAL statistics for a user
DROP TABLE IF EXISTS user_stats_historical;
CREATE TABLE user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size BIGINT NOT NULL,
joined_rooms BIGINT NOT NULL,
invites_sent BIGINT NOT NULL,
rooms_created BIGINT NOT NULL,
total_events BIGINT NOT NULL,
total_event_bytes BIGINT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts);
CREATE TABLE room_stats_state (
room_id TEXT NOT NULL,
name TEXT,
canonical_alias TEXT,
join_rules TEXT,
history_visibility TEXT,
encryption TEXT,
avatar TEXT,
guest_access TEXT,
is_federatable BOOLEAN,
topic TEXT
);
CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id);
+713 -321
View File
File diff suppressed because it is too large Load Diff
+531 -110
View File
@@ -13,16 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse import storage
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
# The expected number of state events in a fresh public room.
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5
# The expected number of state events in a fresh private room.
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6
class StatsRoomTests(unittest.HomeserverTestCase):
@@ -33,7 +34,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
@@ -47,7 +47,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
@@ -56,7 +56,17 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
"depends_on": "populate_stats_prepare",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
@@ -66,16 +76,46 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
"depends_on": "populate_stats_process_users",
},
)
)
def get_all_room_state(self):
return self.store._simple_select_list(
"room_stats_state", None, retcols=("name", "topic", "canonical_alias")
)
def _get_current_stats(self, stats_type, stat_id):
table, id_col = storage.stats.TYPE_TO_TABLE[stats_type]
cols = list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type]) + list(
storage.stats.PER_SLICE_FIELDS[stats_type]
)
end_ts = self.store.quantise_stats_time(self.reactor.seconds() * 1000)
return self.get_success(
self.store._simple_select_one(
table + "_historical",
{id_col: stat_id, end_ts: end_ts},
cols,
allow_none=True,
)
)
def _perform_background_initial_update(self):
# Do the initial population of the stats via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
r = self.get_success(self.store.get_all_room_state())
r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
@@ -91,7 +131,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
)
# Stats disabled, shouldn't have done anything
r = self.get_success(self.store.get_all_room_state())
r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
@@ -104,7 +144,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r = self.get_success(self.store.get_all_room_state())
r = self.get_success(self.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
@@ -114,6 +154,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
@@ -138,12 +179,18 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(
self.store._simple_update_one(
table="stats_incremental_position",
keyvalues={},
updatevalues={"stream_id": 0},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
@@ -154,6 +201,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# orig_delta_processor = self.store.
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
@@ -185,8 +234,15 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# self.handler.notify_new_event()
# We need to let the delta processor advance…
self.pump(10 * 60)
# Get the slices! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
self.assertEqual(len(r), 2)
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
@@ -194,111 +250,476 @@ class StatsRoomTests(unittest.HomeserverTestCase):
# The newest has 3
self.assertEqual(r[0]["joined_members"], 3)
def test_incorrect_state_transition(self):
def test_create_user(self):
"""
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
(JOIN, INVITE, LEAVE, BAN), an error is raised.
When we create a user, it should have statistics already ready.
"""
events = {
"a1": {"membership": Membership.LEAVE},
"a2": {"membership": "not a real thing"},
}
def get_event(event_id, allow_none=True):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": 60,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid prev_membership"
)
# And the other way...
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)
def test_redacted_prev_event(self):
"""
If the prev_event does not exist, then it is assumed to be a LEAVE.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
u1stats = self._get_current_stats("user", u1)
# Do the initial population of the user directory via the background update
self._add_background_updates()
self.assertIsNotNone(u1stats)
# not in any rooms by default
self.assertEqual(u1stats["joined_rooms"], 0)
def test_create_room(self):
"""
When we create a room, it should have statistics already ready.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
r1stats = self._get_current_stats("room", r1)
r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
r2stats = self._get_current_stats("room", r2)
self.assertIsNotNone(r1stats)
self.assertIsNotNone(r2stats)
# contains the default things you'd expect in a fresh room
self.assertEqual(
r1stats["total_events"],
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM,
"Wrong number of total_events in new room's stats!"
" You may need to update this if more state events are added to"
" the room creation process.",
)
self.assertEqual(
r2stats["total_events"],
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
"Wrong number of total_events in new room's stats!"
" You may need to update this if more state events are added to"
" the room creation process.",
)
self.assertEqual(
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(
r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM
)
self.assertEqual(r1stats["joined_members"], 1)
self.assertEqual(r1stats["invited_members"], 0)
self.assertEqual(r1stats["banned_members"], 0)
self.assertEqual(r2stats["joined_members"], 1)
self.assertEqual(r2stats["invited_members"], 0)
self.assertEqual(r2stats["banned_members"], 0)
def test_send_message_increments_total_events(self):
"""
When we send a message, it increments total_events.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send(r1, "hiss", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
def test_send_state_event_nonoverwriting(self):
"""
When we send a non-overwriting state event, it increments total_events AND current_state_events
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
self.helper.send_state(
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send_state(
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy"
)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
def test_send_state_event_overwriting(self):
"""
When we send an overwriting state event, it increments total_events ONLY
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
self.helper.send_state(
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.send_state(
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby"
)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
def test_join_first_time(self):
"""
When a user joins a room for the first time, total_events, current_state_events and
joined_members should increase by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1
)
def test_join_after_leave(self):
"""
When a user joins a room after being previously left, total_events and
joined_members should increase by exactly 1.
current_state_events should not increase.
left_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
self.helper.leave(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
)
self.assertEqual(
r1stats_post["left_members"] - r1stats_ante["left_members"], -1
)
def test_invited(self):
"""
When a user invites another user, current_state_events, total_events and
invited_members should increase by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
r1stats_ante = self._get_current_stats("room", r1)
self.helper.invite(r1, u1, u2, tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
1,
)
self.assertEqual(
r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1
)
def test_join_after_invite(self):
"""
When a user joins a room after being invited, total_events and
joined_members should increase by exactly 1.
current_state_events should not increase.
invited_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.invite(r1, u1, u2, tok=u1token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.join(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
)
self.assertEqual(
r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1
)
def test_left(self):
"""
When a user leaves a room after joining, total_events and
left_members should increase by exactly 1.
current_state_events should not increase.
joined_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.leave(r1, u2, tok=u2token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["left_members"] - r1stats_ante["left_members"], +1
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
)
def test_banned(self):
"""
When a user is banned from a room after joining, total_events and
left_members should increase by exactly 1.
current_state_events should not increase.
banned_members should decrease by exactly 1.
"""
self._perform_background_initial_update()
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
self.helper.join(r1, u2, tok=u2token)
r1stats_ante = self._get_current_stats("room", r1)
self.helper.change_membership(r1, u1, u2, "ban", tok=u1token)
r1stats_post = self._get_current_stats("room", r1)
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
self.assertEqual(
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
0,
)
self.assertEqual(
r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1
)
self.assertEqual(
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
)
def test_initial_background_update(self):
"""
Test that statistics can be generated by the initial background update
handler.
This test also tests that stats rows are not created for new subjects
when stats are disabled. However, it may be desirable to change this
behaviour eventually to still keep current rows.
"""
self.hs.config.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token)
# test that these subjects, which were created during a time of disabled
# stats, do not have stats.
self.assertIsNone(self._get_current_stats("room", r1))
self.assertIsNone(self._get_current_stats("user", u1))
self.hs.config.stats_enabled = True
self._perform_background_initial_update()
r1stats = self._get_current_stats("room", r1)
u1stats = self._get_current_stats("user", u1)
self.assertEqual(r1stats["joined_members"], 1)
self.assertEqual(
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
)
self.assertEqual(u1stats["joined_rooms"], 1)
def test_incomplete_stats(self):
"""
This tests that we track incomplete statistics.
We first test that incomplete stats are incrementally generated,
following the preparation of a background regen.
We then test that these incomplete rows are completed by the background
regen.
"""
u1 = self.register_user("u1", "pass")
u1token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
# preparation stage of the initial background update
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_delete(
"room_stats_current", {"1": 1}, "test_delete_stats"
)
)
self.get_success(
self.store._simple_delete(
"user_stats_current", {"1": 1}, "test_delete_stats"
)
)
self.helper.invite(r1, u1, u2, tok=u1token)
self.helper.join(r1, u2, tok=u2token)
self.helper.invite(r1, u1, u3, tok=u1token)
self.helper.send(r1, "thou shalt yield", tok=u1token)
# now do the background updates
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_prepare",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_users",
},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
r1stats_complete = self._get_current_stats("room", r1)
u1stats_complete = self._get_current_stats("user", u1)
u2stats_complete = self._get_current_stats("user", u2)
def get_event(event_id, allow_none=True):
if events.get(event_id):
m = Mock()
m.content = events[event_id]
else:
m = None
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
# now we make our assertions
def get_received_ts(event_id):
return defer.succeed(1)
# check that _complete rows are complete and correct
self.assertEqual(r1stats_complete["joined_members"], 2)
self.assertEqual(r1stats_complete["invited_members"], 1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
self.assertEqual(
r1stats_complete["current_state_events"],
2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
)
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user:test",
"room_id": room_1,
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
self.get_success(self.handler._handle_deltas(deltas))
# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)
self.assertEqual(u1stats_complete["joined_rooms"], 1)
self.assertEqual(u2stats_complete["joined_rooms"], 1)
+6 -2
View File
@@ -128,8 +128,12 @@ class RestHelper(object):
return channel.json_body
def send_state(self, room_id, event_type, body, tok, expect_code=200):
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""):
path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
room_id,
event_type,
state_key,
)
if tok:
path = path + "?access_token=%s" % tok