Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6e827507f7 | |||
| 0e99412f4c | |||
| 7fd0c90234 | |||
| ebd2cd84d5 | |||
| c497e13734 | |||
| d514dac0b2 | |||
| bdd201ea7f | |||
| 74fb729213 | |||
| 412c6e21a8 | |||
| 8a5f6ed130 | |||
| c188bd2c12 | |||
| 20402aa128 | |||
| 6d86df73f1 | |||
| 87fa26006b | |||
| ebba15ee7f | |||
| e132ba79ae | |||
| b13cac896d | |||
| ce5f1cb98c | |||
| 6382914587 | |||
| fb5acd7039 | |||
| 748aa38378 | |||
| 8cf7fbbce0 | |||
| 7809f0c022 | |||
| baee288fb4 | |||
| c058aeb88d | |||
| 81b8080acd | |||
| b7f7cc7ace | |||
| d6de55bce9 | |||
| 3ad24ab386 | |||
| 1b63ccd848 | |||
| 09f6152a11 | |||
| aedfec3ad7 | |||
| 17e1e80726 | |||
| 1016f303e5 |
+25
@@ -1,3 +1,28 @@
|
||||
Synapse 1.3.1 (2019-08-17)
|
||||
==========================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Drop hard dependency on `sdnotify` python package. ([\#5871](https://github.com/matrix-org/synapse/issues/5871))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix startup issue (hang on ACME provisioning) due to ordering of Twisted reactor startup. Thanks to @chrismoos for supplying the fix. ([\#5867](https://github.com/matrix-org/synapse/issues/5867))
|
||||
|
||||
|
||||
Synapse 1.3.0 (2019-08-15)
|
||||
==========================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix 500 Internal Server Error on `publicRooms` when the public room list was
|
||||
cached. ([\#5851](https://github.com/matrix-org/synapse/issues/5851))
|
||||
|
||||
|
||||
Synapse 1.3.0rc1 (2019-08-13)
|
||||
==========================
|
||||
|
||||
|
||||
+4
-5
@@ -419,12 +419,11 @@ If Synapse is not configured with an SMTP server, password reset via email will
|
||||
|
||||
## Registering a user
|
||||
|
||||
You will need at least one user on your server in order to use a Matrix
|
||||
client. Users can be registered either via a Matrix client, or via a
|
||||
commandline script.
|
||||
The easiest way to create a new user is to do so from a client like [Riot](https://riot.im).
|
||||
|
||||
To get started, it is easiest to use the command line to register new
|
||||
users. This can be done as follows:
|
||||
Alternatively you can do so from the command line if you have installed via pip.
|
||||
|
||||
This can be done as follows:
|
||||
|
||||
```
|
||||
$ source ~/synapse/env/bin/activate
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Don't create broken room when power_level_content_override.users does not contain creator_id.
|
||||
@@ -0,0 +1 @@
|
||||
Retry well-known lookup before the cache expires, giving a grace period where the remote well-known can be down but we still use the old result.
|
||||
@@ -1,2 +0,0 @@
|
||||
Rework room and user statistics to separate current & historical rows, as well
|
||||
as track stats correctly.
|
||||
@@ -0,0 +1 @@
|
||||
Add a tag recording a request's authenticated entity and corresponding servlet in opentracing.
|
||||
@@ -0,0 +1 @@
|
||||
Fix database index so that different backup versions can have the same sessions.
|
||||
@@ -0,0 +1 @@
|
||||
Remove log line for debugging issue #5407.
|
||||
@@ -0,0 +1 @@
|
||||
Fix Synapse looking for config options `password_reset_failure_template` and `password_reset_success_template`, when they are actually `password_reset_template_failure_html`, `password_reset_template_success_html`.
|
||||
@@ -0,0 +1 @@
|
||||
Add admin API endpoint for setting whether or not a user is a server administrator.
|
||||
Vendored
+13
-3
@@ -1,8 +1,18 @@
|
||||
matrix-synapse-py3 (1.2.1) stable; urgency=medium
|
||||
matrix-synapse-py3 (1.3.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.2.1.
|
||||
* New synapse release 1.3.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Fri, 26 Jul 2019 11:32:47 +0100
|
||||
-- Synapse Packaging team <packages@matrix.org> Sat, 17 Aug 2019 09:15:49 +0100
|
||||
|
||||
matrix-synapse-py3 (1.3.0) stable; urgency=medium
|
||||
|
||||
[ Andrew Morgan ]
|
||||
* Remove libsqlite3-dev from required build dependencies.
|
||||
|
||||
[ Synapse Packaging team ]
|
||||
* New synapse release 1.3.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 15 Aug 2019 12:04:23 +0100
|
||||
|
||||
matrix-synapse-py3 (1.2.0) stable; urgency=medium
|
||||
|
||||
|
||||
Vendored
-1
@@ -15,7 +15,6 @@ Build-Depends:
|
||||
python3-setuptools,
|
||||
python3-pip,
|
||||
python3-venv,
|
||||
libsqlite3-dev,
|
||||
tar,
|
||||
Standards-Version: 3.9.8
|
||||
Homepage: https://github.com/matrix-org/synapse
|
||||
|
||||
@@ -84,3 +84,23 @@ with a body of:
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
|
||||
Change whether a user is a server administrator or not
|
||||
======================================================
|
||||
|
||||
Note that you cannot demote yourself.
|
||||
|
||||
The api is::
|
||||
|
||||
PUT /_synapse/admin/v1/users/<user_id>/admin
|
||||
|
||||
with a body of:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
@@ -1,146 +0,0 @@
|
||||
Room and User Statistics
|
||||
========================
|
||||
|
||||
Synapse maintains room and user statistics (as well as a cache of room state),
|
||||
in various tables.
|
||||
|
||||
These can be used for administrative purposes but are also used when generating
|
||||
the public room directory. If these tables get stale or out of sync (possibly
|
||||
after database corruption), you may wish to regenerate them.
|
||||
|
||||
|
||||
# Synapse Administrator Documentation
|
||||
|
||||
## Various SQL scripts that you may find useful
|
||||
|
||||
### Delete stats, including historical stats
|
||||
|
||||
```sql
|
||||
DELETE FROM room_stats_current;
|
||||
DELETE FROM room_stats_historical;
|
||||
DELETE FROM user_stats_current;
|
||||
DELETE FROM user_stats_historical;
|
||||
```
|
||||
|
||||
### Regenerate stats (all subjects)
|
||||
|
||||
```sql
|
||||
BEGIN;
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
|
||||
COMMIT;
|
||||
|
||||
DELETE FROM room_stats_current;
|
||||
DELETE FROM user_stats_current;
|
||||
```
|
||||
|
||||
then follow the steps below for **'Regenerate stats (missing subjects only)'**
|
||||
|
||||
### Regenerate stats (missing subjects only)
|
||||
|
||||
```sql
|
||||
-- Set up staging tables
|
||||
-- we depend on current_state_events_membership because this is used
|
||||
-- in our counting.
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('populate_stats_prepare', '{}', 'current_state_events_membership');
|
||||
|
||||
-- Run through each room and update stats
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
|
||||
|
||||
-- Run through each user and update stats.
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
|
||||
|
||||
-- Clean up staging tables
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_cleanup', '{}', 'populate_stats_process_users');
|
||||
```
|
||||
|
||||
then **restart Synapse**.
|
||||
|
||||
|
||||
# Synapse Developer Documentation
|
||||
|
||||
## High-Level Concepts
|
||||
|
||||
### Definitions
|
||||
|
||||
* **subject**: Something we are tracking stats about – currently a room or user.
|
||||
* **current row**: An entry for a subject in the appropriate current statistics
|
||||
table. Each subject can have only one.
|
||||
* **historical row**: An entry for a subject in the appropriate historical
|
||||
statistics table. Each subject can have any number of these.
|
||||
|
||||
### Overview
|
||||
|
||||
Stats are maintained as time series. There are two kinds of column:
|
||||
|
||||
* absolute columns – where the value is correct for the time given by `end_ts`
|
||||
in the stats row. (Imagine a line graph for these values)
|
||||
* per-slice columns – where the value corresponds to how many of the occurrences
|
||||
occurred within the time slice given by `(end_ts − bucket_size)…end_ts`
|
||||
or `start_ts…end_ts`. (Imagine a histogram for these values)
|
||||
|
||||
Currently, only absolute columns are in use.
|
||||
|
||||
Stats are maintained in two tables (for each type): current and historical.
|
||||
|
||||
Current stats correspond to the present values. Each subject can only have one
|
||||
entry.
|
||||
|
||||
Historical stats correspond to values in the past. Subjects may have multiple
|
||||
entries.
|
||||
|
||||
## Concepts around the management of stats
|
||||
|
||||
### current rows
|
||||
|
||||
#### dirty current rows
|
||||
|
||||
Current rows can be **dirty**, which means that they have changed since the
|
||||
latest historical row for the same subject.
|
||||
**Dirty** current rows possess an end timestamp, `end_ts`.
|
||||
|
||||
#### old current rows and old collection
|
||||
|
||||
When a (necessarily dirty) current row has an `end_ts` in the past, it is said
|
||||
to be **old**.
|
||||
Old current rows must be copied into a historical row, and cleared of their dirty
|
||||
status, before further statistics can be tracked for that subject.
|
||||
The process which does this is referred to as **old collection**.
|
||||
|
||||
#### incomplete current rows
|
||||
|
||||
There are also **incomplete** current rows, which are current rows that do not
|
||||
contain a full count yet – this is because they are waiting for the regeneration
|
||||
process to give them an initial count. Incomplete current rows DO NOT contain
|
||||
correct and up-to-date values. As such, *incomplete rows are not old-collected*.
|
||||
Instead, old incomplete rows will be extended so they are no longer old.
|
||||
|
||||
### historical rows
|
||||
|
||||
Historical rows can always be considered to be valid for the time slice and
|
||||
end time specified. (This, of course, assumes a lack of defects in the code
|
||||
to track the statistics, and assumes integrity of the database).
|
||||
|
||||
Even still, there are two considerations that we may need to bear in mind:
|
||||
|
||||
* historical rows will not exist for every time slice – they will be omitted
|
||||
if there were no changes. In this case, the following assumptions can be
|
||||
made to interpolate/recreate missing rows:
|
||||
- absolute fields have the same values as in the preceding row
|
||||
- per-slice fields are zero (`0`)
|
||||
* historical rows will not be retained forever – rows older than a configurable
|
||||
time will be purged.
|
||||
|
||||
#### purge
|
||||
|
||||
The purging of historical rows is not yet implemented.
|
||||
|
||||
+1
-1
@@ -35,4 +35,4 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.3.0rc1"
|
||||
__version__ = "1.3.1"
|
||||
|
||||
@@ -22,6 +22,7 @@ from netaddr import IPAddress
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
import synapse.types
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
@@ -178,6 +179,7 @@ class Auth(object):
|
||||
def get_public_keys(self, invite_event):
|
||||
return event_auth.get_public_keys(invite_event)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_user_by_req(
|
||||
self, request, allow_guest=False, rights="access", allow_expired=False
|
||||
@@ -209,6 +211,7 @@ class Auth(object):
|
||||
user_id, app_service = yield self._get_appservice_user_id(request)
|
||||
if user_id:
|
||||
request.authenticated_entity = user_id
|
||||
opentracing.set_tag("authenticated_entity", user_id)
|
||||
|
||||
if ip_addr and self.hs.config.track_appservice_user_ips:
|
||||
yield self.store.insert_client_ip(
|
||||
@@ -259,6 +262,7 @@ class Auth(object):
|
||||
)
|
||||
|
||||
request.authenticated_entity = user.to_string()
|
||||
opentracing.set_tag("authenticated_entity", user.to_string())
|
||||
|
||||
return synapse.types.create_requester(
|
||||
user, token_id, is_guest, device_id, app_service=app_service
|
||||
|
||||
+37
-10
@@ -17,10 +17,10 @@ import gc
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import sdnotify
|
||||
from daemonize import Daemonize
|
||||
|
||||
from twisted.internet import defer, error, reactor
|
||||
@@ -246,13 +246,12 @@ def start(hs, listeners=None):
|
||||
def handle_sighup(*args, **kwargs):
|
||||
# Tell systemd our state, if we're using it. This will silently fail if
|
||||
# we're not using systemd.
|
||||
sd_channel = sdnotify.SystemdNotifier()
|
||||
sd_channel.notify("RELOADING=1")
|
||||
sdnotify(b"RELOADING=1")
|
||||
|
||||
for i in _sighup_callbacks:
|
||||
i(hs)
|
||||
|
||||
sd_channel.notify("READY=1")
|
||||
sdnotify(b"READY=1")
|
||||
|
||||
signal.signal(signal.SIGHUP, handle_sighup)
|
||||
|
||||
@@ -308,16 +307,12 @@ def setup_sdnotify(hs):
|
||||
|
||||
# Tell systemd our state, if we're using it. This will silently fail if
|
||||
# we're not using systemd.
|
||||
sd_channel = sdnotify.SystemdNotifier()
|
||||
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
"after",
|
||||
"startup",
|
||||
lambda: sd_channel.notify("READY=1\nMAINPID=%s" % (os.getpid())),
|
||||
"after", "startup", sdnotify, b"READY=1\nMAINPID=%i" % (os.getpid(),)
|
||||
)
|
||||
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
"before", "shutdown", lambda: sd_channel.notify("STOPPING=1")
|
||||
"before", "shutdown", sdnotify, b"STOPPING=1"
|
||||
)
|
||||
|
||||
|
||||
@@ -414,3 +409,35 @@ class _DeferredResolutionReceiver(object):
|
||||
def resolutionComplete(self):
|
||||
self._deferred.callback(())
|
||||
self._receiver.resolutionComplete()
|
||||
|
||||
|
||||
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
|
||||
|
||||
|
||||
def sdnotify(state):
|
||||
"""
|
||||
Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
|
||||
|
||||
This function is based on the sdnotify python package, but since it's only a few
|
||||
lines of code, it's easier to duplicate it here than to add a dependency on a
|
||||
package which many OSes don't include as a matter of principle.
|
||||
|
||||
Args:
|
||||
state (bytes): notification to send
|
||||
"""
|
||||
if not isinstance(state, bytes):
|
||||
raise TypeError("sdnotify should be called with a bytes")
|
||||
if not sdnotify_sockaddr:
|
||||
return
|
||||
addr = sdnotify_sockaddr
|
||||
if addr[0] == "@":
|
||||
addr = "\0" + addr[1:]
|
||||
|
||||
try:
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.connect(addr)
|
||||
sock.sendall(state)
|
||||
except Exception as e:
|
||||
# this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
|
||||
# unless systemd is expecting us to notify it.
|
||||
logger.warning("Unable to send notification to systemd: %s", e)
|
||||
|
||||
@@ -447,7 +447,7 @@ def setup(config_options):
|
||||
reactor.stop()
|
||||
sys.exit(1)
|
||||
|
||||
reactor.addSystemEventTrigger("before", "startup", start)
|
||||
reactor.callWhenRunning(start)
|
||||
|
||||
return hs
|
||||
|
||||
|
||||
@@ -132,21 +132,21 @@ class EmailConfig(Config):
|
||||
self.email_password_reset_template_text = email_config.get(
|
||||
"password_reset_template_text", "password_reset.txt"
|
||||
)
|
||||
self.email_password_reset_failure_template = email_config.get(
|
||||
"password_reset_failure_template", "password_reset_failure.html"
|
||||
self.email_password_reset_template_failure_html = email_config.get(
|
||||
"password_reset_template_failure_html", "password_reset_failure.html"
|
||||
)
|
||||
# This template does not support any replaceable variables, so we will
|
||||
# read it from the disk once during setup
|
||||
email_password_reset_success_template = email_config.get(
|
||||
"password_reset_success_template", "password_reset_success.html"
|
||||
email_password_reset_template_success_html = email_config.get(
|
||||
"password_reset_template_success_html", "password_reset_success.html"
|
||||
)
|
||||
|
||||
# Check templates exist
|
||||
for f in [
|
||||
self.email_password_reset_template_html,
|
||||
self.email_password_reset_template_text,
|
||||
self.email_password_reset_failure_template,
|
||||
email_password_reset_success_template,
|
||||
self.email_password_reset_template_failure_html,
|
||||
email_password_reset_template_success_html,
|
||||
]:
|
||||
p = os.path.join(self.email_template_dir, f)
|
||||
if not os.path.isfile(p):
|
||||
@@ -154,9 +154,9 @@ class EmailConfig(Config):
|
||||
|
||||
# Retrieve content of web templates
|
||||
filepath = os.path.join(
|
||||
self.email_template_dir, email_password_reset_success_template
|
||||
self.email_template_dir, email_password_reset_template_success_html
|
||||
)
|
||||
self.email_password_reset_success_html_content = self.read_file(
|
||||
self.email_password_reset_template_success_html_content = self.read_file(
|
||||
filepath, "email.password_reset_template_success_html"
|
||||
)
|
||||
|
||||
|
||||
@@ -19,8 +19,9 @@ import functools
|
||||
import logging
|
||||
import re
|
||||
|
||||
from twisted.internet.defer import maybeDeferred
|
||||
|
||||
import synapse
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.api.urls import (
|
||||
@@ -37,6 +38,7 @@ from synapse.http.servlet import (
|
||||
parse_string_from_args,
|
||||
)
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.opentracing import start_active_span_from_context, tags
|
||||
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
@@ -287,16 +289,17 @@ class BaseFederationServlet(object):
|
||||
raise
|
||||
|
||||
# Start an opentracing span
|
||||
with opentracing.start_active_span_from_context(
|
||||
with start_active_span_from_context(
|
||||
request.requestHeaders,
|
||||
"incoming-federation-request",
|
||||
tags={
|
||||
"request_id": request.get_request_id(),
|
||||
opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER,
|
||||
opentracing.tags.HTTP_METHOD: request.get_method(),
|
||||
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
|
||||
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: request.get_method(),
|
||||
tags.HTTP_URL: request.get_redacted_uri(),
|
||||
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
},
|
||||
):
|
||||
if origin:
|
||||
@@ -745,8 +748,12 @@ class PublicRoomList(BaseFederationServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID(None, None)
|
||||
|
||||
data = await self.handler.get_local_public_room_list(
|
||||
limit, since_token, network_tuple=network_tuple, from_federation=True
|
||||
data = await maybeDeferred(
|
||||
self.handler.get_local_public_room_list,
|
||||
limit,
|
||||
since_token,
|
||||
network_tuple=network_tuple,
|
||||
from_federation=True,
|
||||
)
|
||||
return 200, data
|
||||
|
||||
|
||||
@@ -94,6 +94,16 @@ class AdminHandler(BaseHandler):
|
||||
|
||||
return ret
|
||||
|
||||
def set_user_server_admin(self, user, admin):
|
||||
"""
|
||||
Set the admin bit on a user.
|
||||
|
||||
Args:
|
||||
user_id (UserID): the (necessarily local) user to manipulate
|
||||
admin (bool): whether or not the user should be an admin of this server
|
||||
"""
|
||||
return self.store.set_server_admin(user, admin)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def export_user_data(self, user_id, writer):
|
||||
"""Write all data we have on the user to the given writer.
|
||||
|
||||
@@ -560,6 +560,18 @@ class RoomCreationHandler(BaseHandler):
|
||||
|
||||
yield self.event_creation_handler.assert_accepted_privacy_policy(requester)
|
||||
|
||||
power_level_content_override = config.get("power_level_content_override")
|
||||
if (
|
||||
power_level_content_override
|
||||
and "users" in power_level_content_override
|
||||
and user_id not in power_level_content_override["users"]
|
||||
):
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Not a valid power_level_content_override: 'users' did not contain %s"
|
||||
% (user_id,),
|
||||
)
|
||||
|
||||
invite_3pid_list = config.get("invite_3pid", [])
|
||||
|
||||
visibility = config.get("visibility", None)
|
||||
@@ -604,7 +616,7 @@ class RoomCreationHandler(BaseHandler):
|
||||
initial_state=initial_state,
|
||||
creation_content=creation_content,
|
||||
room_alias=room_alias,
|
||||
power_level_content_override=config.get("power_level_content_override"),
|
||||
power_level_content_override=power_level_content_override,
|
||||
creator_join_profile=creator_join_profile,
|
||||
)
|
||||
|
||||
|
||||
+73
-125
@@ -49,6 +49,9 @@ class StatsHandler(StateDeltasHandler):
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
@@ -62,58 +65,43 @@ class StatsHandler(StateDeltasHandler):
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
lock = self.store.stats_delta_processing_lock
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
lock.release()
|
||||
self._is_processing = False
|
||||
|
||||
if lock.acquire(blocking=False):
|
||||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
try:
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
except: # noqa: E722 – re-raised so fine
|
||||
lock.release()
|
||||
raise
|
||||
self._is_processing = True
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
if self.pos is None:
|
||||
self.pos = yield self.store.get_stats_stream_pos()
|
||||
|
||||
# If still None then the initial background update hasn't started yet
|
||||
if self.pos is None or None in self.pos.values():
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
while True:
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
break
|
||||
return
|
||||
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_stream_pos(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
if self.pos is not None:
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
|
||||
with Measure(self.clock, "stats_total_events"):
|
||||
self.pos = yield self.store.incremental_update_total_events(self.pos)
|
||||
event_processing_positions.labels("stats").set(self.pos)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
@@ -131,7 +119,7 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_stats("room", room_id)
|
||||
token = yield self.store.get_earliest_token_for_room_stats(room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
@@ -156,56 +144,44 @@ class StatsHandler(StateDeltasHandler):
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
now = int(now) // 1000
|
||||
|
||||
room_stats_delta = {}
|
||||
room_stats_complete = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
room_stats_delta["current_state_events"] = (
|
||||
room_stats_delta.get("current_state_events", 0) + 1
|
||||
)
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
prev_event_content = {}
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif prev_membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = (
|
||||
room_stats_delta.get("joined_members", 0) - 1
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] = (
|
||||
room_stats_delta.get("invited_members", 0) - 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
room_stats_delta["left_members"] = (
|
||||
room_stats_delta.get("left_members", 0) - 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = (
|
||||
room_stats_delta.get("banned_members", 0) - 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", -1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
@@ -213,20 +189,20 @@ class StatsHandler(StateDeltasHandler):
|
||||
raise ValueError(err)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = (
|
||||
room_stats_delta.get("joined_members", 0) + 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", +1
|
||||
)
|
||||
elif membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] = (
|
||||
room_stats_delta.get("invited_members", 0) + 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", +1
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
room_stats_delta["left_members"] = (
|
||||
room_stats_delta.get("left_members", 0) + 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", +1
|
||||
)
|
||||
elif membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = (
|
||||
room_stats_delta.get("banned_members", 0) + 1
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", +1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
@@ -234,19 +210,26 @@ class StatsHandler(StateDeltasHandler):
|
||||
raise ValueError(err)
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id) and membership in (
|
||||
Membership.JOIN,
|
||||
Membership.LEAVE,
|
||||
):
|
||||
if self.is_mine_id(user_id):
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
yield self.store.update_stats_delta(
|
||||
now, "user", user_id, {field: delta}
|
||||
)
|
||||
if membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
-1,
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
+1,
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
# Newly created room. Add it with all blank portions.
|
||||
@@ -263,46 +246,28 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
room_stats_complete = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
# whether the room would be public anyway,
|
||||
# because of history_visibility
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["history_visibility"] == "world_readable"
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
# whether the room would be public anyway,
|
||||
# because of join_rule
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["join_rules"] == JoinRules.PUBLIC
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
@@ -325,20 +290,6 @@ class StatsHandler(StateDeltasHandler):
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if room_stats_complete:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
@@ -357,13 +308,10 @@ class StatsHandler(StateDeltasHandler):
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
ts, "user", user_id, "public_rooms", +1 if is_public else -1
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "private_rooms", -1 if is_public else +1
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -786,9 +786,8 @@ class SyncHandler(object):
|
||||
batch.events[0].event_id, state_filter=state_filter
|
||||
)
|
||||
else:
|
||||
# Its not clear how we get here, but empirically we do
|
||||
# (#5407). Logging has been added elsewhere to try and
|
||||
# figure out where this state comes from.
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_start = yield self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
)
|
||||
@@ -1771,20 +1770,9 @@ class SyncHandler(object):
|
||||
newly_joined_room=newly_joined,
|
||||
)
|
||||
|
||||
if not batch and batch.limited:
|
||||
# This resulted in #5407, which is weird, so lets log! We do it
|
||||
# here as we have the maximum amount of information.
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.info(
|
||||
"Issue #5407: Found limited batch with no events. user %s, room %s,"
|
||||
" sync_config %s, newly_joined %s, events %s, batch %s.",
|
||||
user_id,
|
||||
room_id,
|
||||
sync_config,
|
||||
newly_joined,
|
||||
events,
|
||||
batch,
|
||||
)
|
||||
# Note: `batch` can be both empty and limited here in the case where
|
||||
# `_load_filtered_recents` can't find any events the user should see
|
||||
# (e.g. due to having ignored the sender of the last 50 events).
|
||||
|
||||
if newly_joined:
|
||||
# debug for https://github.com/matrix-org/synapse/issues/4422
|
||||
|
||||
@@ -44,6 +44,12 @@ WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
|
||||
# lower bound for .well-known cache period
|
||||
WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
|
||||
|
||||
# Attempt to refetch a cached well-known N% of the TTL before it expires.
|
||||
# e.g. if set to 0.2 and we have a cached entry with a TTL of 5mins, then
|
||||
# we'll start trying to refetch 1 minute before it expires.
|
||||
WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -80,15 +86,38 @@ class WellKnownResolver(object):
|
||||
Deferred[WellKnownLookupResult]: The result of the lookup
|
||||
"""
|
||||
try:
|
||||
result = self._well_known_cache[server_name]
|
||||
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
|
||||
server_name
|
||||
)
|
||||
|
||||
now = self._clock.time()
|
||||
if now < expiry - WELL_KNOWN_GRACE_PERIOD_FACTOR * ttl:
|
||||
return WellKnownLookupResult(delegated_server=prev_result)
|
||||
except KeyError:
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
prev_result = None
|
||||
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
try:
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
result, cache_period = yield self._do_get_well_known(server_name)
|
||||
|
||||
if cache_period > 0:
|
||||
self._well_known_cache.set(server_name, result, cache_period)
|
||||
except _FetchWellKnownFailure as e:
|
||||
if prev_result and e.temporary:
|
||||
# This is a temporary failure and we have a still valid cached
|
||||
# result, so lets return that. Hopefully the next time we ask
|
||||
# the remote will be back up again.
|
||||
return WellKnownLookupResult(delegated_server=prev_result)
|
||||
|
||||
result = None
|
||||
|
||||
# add some randomness to the TTL to avoid a stampeding herd every hour
|
||||
# after startup
|
||||
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
|
||||
if cache_period > 0:
|
||||
self._well_known_cache.set(server_name, result, cache_period)
|
||||
|
||||
return WellKnownLookupResult(delegated_server=result)
|
||||
|
||||
@@ -99,40 +128,42 @@ class WellKnownResolver(object):
|
||||
Args:
|
||||
server_name (bytes): name of the server, from the requested url
|
||||
|
||||
Raises:
|
||||
_FetchWellKnownFailure if we fail to lookup a result
|
||||
|
||||
Returns:
|
||||
Deferred[Tuple[bytes|None|object],int]:
|
||||
result, cache period, where result is one of:
|
||||
- the new server name from the .well-known (as a `bytes`)
|
||||
- None if there was no .well-known file.
|
||||
- INVALID_WELL_KNOWN if the .well-known was invalid
|
||||
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
|
||||
"""
|
||||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||
uri_str = uri.decode("ascii")
|
||||
logger.info("Fetching %s", uri_str)
|
||||
|
||||
# We do this in two steps to differentiate between possibly transient
|
||||
# errors (e.g. can't connect to host, 503 response) and more permenant
|
||||
# errors (such as getting a 404 response).
|
||||
try:
|
||||
response = yield make_deferred_yieldable(
|
||||
self._well_known_agent.request(b"GET", uri)
|
||||
)
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 500 <= response.code < 600:
|
||||
raise Exception("Non-200 response %s" % (response.code,))
|
||||
except Exception as e:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
raise _FetchWellKnownFailure(temporary=True)
|
||||
|
||||
try:
|
||||
if response.code != 200:
|
||||
raise Exception("Non-200 response %s" % (response.code,))
|
||||
|
||||
parsed_body = json.loads(body.decode("utf-8"))
|
||||
logger.info("Response from .well-known: %s", parsed_body)
|
||||
if not isinstance(parsed_body, dict):
|
||||
raise Exception("not a dict")
|
||||
if "m.server" not in parsed_body:
|
||||
raise Exception("Missing key 'm.server'")
|
||||
|
||||
result = parsed_body["m.server"].encode("ascii")
|
||||
except Exception as e:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
|
||||
# add some randomness to the TTL to avoid a stampeding herd every hour
|
||||
# after startup
|
||||
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
return (None, cache_period)
|
||||
|
||||
result = parsed_body["m.server"].encode("ascii")
|
||||
raise _FetchWellKnownFailure(temporary=False)
|
||||
|
||||
cache_period = _cache_period_from_headers(
|
||||
response.headers, time_now=self._reactor.seconds
|
||||
@@ -185,3 +216,10 @@ def _parse_cache_control(headers):
|
||||
v = splits[1] if len(splits) > 1 else None
|
||||
cache_controls[k] = v
|
||||
return cache_controls
|
||||
|
||||
|
||||
@attr.s()
|
||||
class _FetchWellKnownFailure(Exception):
|
||||
# True if we didn't get a non-5xx HTTP response, i.e. this may or may not be
|
||||
# a temporary failure.
|
||||
temporary = attr.ib()
|
||||
|
||||
@@ -36,7 +36,6 @@ from twisted.internet.task import _EPSILON, Cooperator
|
||||
from twisted.web._newclient import ResponseDone
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
import synapse.metrics
|
||||
import synapse.util.retryutils
|
||||
from synapse.api.errors import (
|
||||
@@ -50,6 +49,12 @@ from synapse.http import QuieterFileBodyProducer
|
||||
from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
|
||||
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.opentracing import (
|
||||
inject_active_span_byte_dict,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
tags,
|
||||
)
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -341,20 +346,20 @@ class MatrixFederationHttpClient(object):
|
||||
query_bytes = b""
|
||||
|
||||
# Retreive current span
|
||||
scope = opentracing.start_active_span(
|
||||
scope = start_active_span(
|
||||
"outgoing-federation-request",
|
||||
tags={
|
||||
opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT,
|
||||
opentracing.tags.PEER_ADDRESS: request.destination,
|
||||
opentracing.tags.HTTP_METHOD: request.method,
|
||||
opentracing.tags.HTTP_URL: request.path,
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
||||
tags.PEER_ADDRESS: request.destination,
|
||||
tags.HTTP_METHOD: request.method,
|
||||
tags.HTTP_URL: request.path,
|
||||
},
|
||||
finish_on_close=True,
|
||||
)
|
||||
|
||||
# Inject the span into the headers
|
||||
headers_dict = {}
|
||||
opentracing.inject_active_span_byte_dict(headers_dict, request.destination)
|
||||
inject_active_span_byte_dict(headers_dict, request.destination)
|
||||
|
||||
headers_dict[b"User-Agent"] = [self.version_string_bytes]
|
||||
|
||||
@@ -436,9 +441,7 @@ class MatrixFederationHttpClient(object):
|
||||
response.phrase.decode("ascii", errors="replace"),
|
||||
)
|
||||
|
||||
opentracing.set_tag(
|
||||
opentracing.tags.HTTP_STATUS_CODE, response.code
|
||||
)
|
||||
set_tag(tags.HTTP_STATUS_CODE, response.code)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
pass
|
||||
|
||||
@@ -72,7 +72,6 @@ REQUIREMENTS = [
|
||||
"netaddr>=0.7.18",
|
||||
"Jinja2>=2.9",
|
||||
"bleach>=1.4.3",
|
||||
"sdnotify>=0.3",
|
||||
]
|
||||
|
||||
CONDITIONAL_REQUIREMENTS = {
|
||||
|
||||
@@ -43,6 +43,7 @@ from synapse.rest.admin._base import (
|
||||
)
|
||||
from synapse.rest.admin.media import register_servlets_for_media_repo
|
||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||
from synapse.rest.admin.users import UserAdminServlet
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
@@ -740,6 +741,7 @@ def register_servlets(hs, http_server):
|
||||
register_servlets_for_client_rest_resource(hs, http_server)
|
||||
SendServerNoticeServlet(hs).register(http_server)
|
||||
VersionServlet(hs).register(http_server)
|
||||
UserAdminServlet(hs).register(http_server)
|
||||
|
||||
|
||||
def register_servlets_for_client_rest_resource(hs, http_server):
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import re
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
assert_params_in_dict,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.rest.admin import assert_requester_is_admin
|
||||
from synapse.types import UserID
|
||||
|
||||
|
||||
class UserAdminServlet(RestServlet):
|
||||
"""
|
||||
Set whether or not a user is a server administrator.
|
||||
|
||||
Note that only local users can be server administrators, and that an
|
||||
administrator may not demote themselves.
|
||||
|
||||
Only server administrators can use this API.
|
||||
|
||||
Example:
|
||||
PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
"""
|
||||
|
||||
PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.handlers = hs.get_handlers()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield assert_requester_is_admin(self.auth, request)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
auth_user = requester.user
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
assert_params_in_dict(body, ["admin"])
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Only local users can be admins of this homeserver")
|
||||
|
||||
set_admin_to = bool(body["admin"])
|
||||
|
||||
if target_user == auth_user and not set_admin_to:
|
||||
raise SynapseError(400, "You may not demote yourself.")
|
||||
|
||||
yield self.handlers.admin_handler.set_user_server_admin(
|
||||
target_user, set_admin_to
|
||||
)
|
||||
|
||||
return (200, {})
|
||||
@@ -282,13 +282,13 @@ class PasswordResetSubmitTokenServlet(RestServlet):
|
||||
return None
|
||||
|
||||
# Otherwise show the success template
|
||||
html = self.config.email_password_reset_success_html_content
|
||||
html = self.config.email_password_reset_template_success_html_content
|
||||
request.setResponseCode(200)
|
||||
except ThreepidValidationError as e:
|
||||
# Show a failure page with a reason
|
||||
html = self.load_jinja2_template(
|
||||
self.config.email_template_dir,
|
||||
self.config.email_password_reset_failure_template,
|
||||
self.config.email_password_reset_template_failure_html,
|
||||
template_vars={"failure_reason": e.msg},
|
||||
)
|
||||
request.setResponseCode(e.code)
|
||||
|
||||
@@ -82,11 +82,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
table="e2e_room_keys",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
"version": version,
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
},
|
||||
values={
|
||||
"version": version,
|
||||
"first_message_index": room_key["first_message_index"],
|
||||
"forwarded_count": room_key["forwarded_count"],
|
||||
"is_verified": room_key["is_verified"],
|
||||
|
||||
@@ -272,6 +272,14 @@ class RegistrationWorkerStore(SQLBaseStore):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_server_admin(self, user):
|
||||
"""Determines if a user is an admin of this homeserver.
|
||||
|
||||
Args:
|
||||
user (UserID): user ID of the user to test
|
||||
|
||||
Returns (bool):
|
||||
true iff the user is a server admin, false otherwise.
|
||||
"""
|
||||
res = yield self._simple_select_one_onecol(
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
@@ -282,6 +290,21 @@ class RegistrationWorkerStore(SQLBaseStore):
|
||||
|
||||
return res if res else False
|
||||
|
||||
def set_server_admin(self, user, admin):
|
||||
"""Sets whether a user is an admin of this homeserver.
|
||||
|
||||
Args:
|
||||
user (UserID): user ID of the user to test
|
||||
admin (bool): true iff the user is to be a server admin,
|
||||
false otherwise.
|
||||
"""
|
||||
return self._simple_update_one(
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
updatevalues={"admin": 1 if admin else 0},
|
||||
desc="set_server_admin",
|
||||
)
|
||||
|
||||
def _query_for_auth(self, txn, token):
|
||||
sql = (
|
||||
"SELECT users.name, users.is_guest, access_tokens.id as token_id,"
|
||||
@@ -845,17 +868,6 @@ class RegistrationStore(
|
||||
(user_id_obj.localpart, create_profile_with_displayname),
|
||||
)
|
||||
|
||||
if self.hs.config.stats_enabled:
|
||||
# we create a new completed user statistics row
|
||||
|
||||
# we don't strictly need current_token since this user really can't
|
||||
# have any state deltas before now (as it is a new user), but still,
|
||||
# we include it for completeness.
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
self._update_stats_delta_txn(
|
||||
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
txn.call_after(self.is_guest.invalidate, (user_id,))
|
||||
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
/* Copyright 2019 Matrix.org Foundation CIC
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- version is supposed to be part of the room keys index
|
||||
CREATE UNIQUE INDEX e2e_room_keys_with_version_idx ON e2e_room_keys(user_id, version, room_id, session_id);
|
||||
DROP INDEX IF EXISTS e2e_room_keys_idx;
|
||||
@@ -1,168 +0,0 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
----- First clean up from previous versions of room stats.
|
||||
|
||||
-- First remove old stats stuff
|
||||
DROP TABLE IF EXISTS room_stats;
|
||||
DROP TABLE IF EXISTS user_stats;
|
||||
DROP TABLE IF EXISTS room_stats_earliest_tokens;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_position;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
|
||||
DROP TABLE IF EXISTS stats_stream_pos;
|
||||
|
||||
-- Unschedule old background updates if they're still scheduled
|
||||
DELETE FROM background_updates WHERE update_name IN (
|
||||
'populate_stats_createtables',
|
||||
'populate_stats_process_rooms',
|
||||
'populate_stats_cleanup',
|
||||
'regen_stats'
|
||||
);
|
||||
|
||||
----- Create tables for our version of room stats.
|
||||
|
||||
-- single-row table to track position of incremental updates
|
||||
CREATE TABLE IF NOT EXISTS stats_incremental_position (
|
||||
-- the stream_id of the last-processed state delta
|
||||
state_delta_stream_id BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed backfilled event
|
||||
-- (this is negative)
|
||||
total_events_min_stream_ordering BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed normally-created event
|
||||
-- (this is positive)
|
||||
total_events_max_stream_ordering BIGINT,
|
||||
|
||||
-- If true, this represents the contract agreed upon by the background
|
||||
-- population processor.
|
||||
-- If false, this is suitable for use by the delta/incremental processor.
|
||||
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
|
||||
);
|
||||
|
||||
-- insert a null row and make sure it is the only one.
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
|
||||
|
||||
-- represents PRESENT room statistics for a room
|
||||
CREATE TABLE IF NOT EXISTS room_stats_current (
|
||||
room_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
-- These starts cover the time from start_ts...end_ts (in seconds).
|
||||
-- Note that end_ts is quantised, and start_ts usually so.
|
||||
start_ts BIGINT,
|
||||
end_ts BIGINT,
|
||||
|
||||
current_state_events INT NOT NULL DEFAULT 0,
|
||||
total_events INT NOT NULL DEFAULT 0,
|
||||
joined_members INT NOT NULL DEFAULT 0,
|
||||
invited_members INT NOT NULL DEFAULT 0,
|
||||
left_members INT NOT NULL DEFAULT 0,
|
||||
banned_members INT NOT NULL DEFAULT 0,
|
||||
|
||||
-- If initial background count is still to be performed: NULL
|
||||
-- If initial background count has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT,
|
||||
|
||||
CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
|
||||
);
|
||||
|
||||
|
||||
-- represents HISTORICAL room statistics for a room
|
||||
CREATE TABLE IF NOT EXISTS room_stats_historical (
|
||||
room_id TEXT NOT NULL,
|
||||
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
|
||||
-- Note that end_ts is quantised, and start_ts usually so.
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (room_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient room stats.
|
||||
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular room.)
|
||||
|
||||
|
||||
-- represents PRESENT statistics for a user
|
||||
CREATE TABLE IF NOT EXISTS user_stats_current (
|
||||
user_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
-- The timestamp that represents the start of the
|
||||
start_ts BIGINT,
|
||||
end_ts BIGINT,
|
||||
|
||||
public_rooms INT DEFAULT 0 NOT NULL,
|
||||
private_rooms INT DEFAULT 0 NOT NULL,
|
||||
|
||||
-- If initial background count is still to be performed: NULL
|
||||
-- If initial background count has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
-- represents HISTORICAL statistics for a user
|
||||
CREATE TABLE IF NOT EXISTS user_stats_historical (
|
||||
user_id TEXT NOT NULL,
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (user_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient user stats.
|
||||
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular user.)
|
||||
|
||||
|
||||
-- Set up staging tables
|
||||
-- we depend on current_state_events_membership because this is used
|
||||
-- in our counting.
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_prepare', '{}', 'current_state_events_membership');
|
||||
|
||||
-- Run through each room and update stats
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
|
||||
|
||||
-- Run through each user and update stats.
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
|
||||
|
||||
-- Clean up staging tables
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_cleanup', '{}', 'populate_stats_process_users');
|
||||
@@ -1,87 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This schema delta will be run after 'stats_separated1.sql' due to lexicographic
|
||||
# ordering. Note that it MUST be so.
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
|
||||
|
||||
def _run_create_generic(stats_type, cursor, database_engine):
|
||||
"""
|
||||
Creates the pertinent (partial, if supported) indices for one kind of stats.
|
||||
Args:
|
||||
stats_type: "room" or "user" - the type of stats
|
||||
cursor: Database Cursor
|
||||
database_engine: Database Engine
|
||||
"""
|
||||
if isinstance(database_engine, Sqlite3Engine):
|
||||
# even though SQLite >= 3.8 can support partial indices, we won't enable
|
||||
# them, in case the SQLite database may be later used on another system.
|
||||
# It's also the case that SQLite is only likely to be used in small
|
||||
# deployments or testing, where the optimisations gained by use of a
|
||||
# partial index are not a big concern.
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
|
||||
ON %s_stats_current (end_ts);
|
||||
"""
|
||||
% (stats_type, stats_type)
|
||||
)
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
|
||||
ON %s_stats_current (completed_delta_stream_id, %s_id);
|
||||
"""
|
||||
% (stats_type, stats_type, stats_type)
|
||||
)
|
||||
elif isinstance(database_engine, PostgresEngine):
|
||||
# This partial index helps us with finding dirty stats rows
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
|
||||
ON %s_stats_current (end_ts)
|
||||
WHERE end_ts IS NOT NULL;
|
||||
"""
|
||||
% (stats_type, stats_type)
|
||||
)
|
||||
# This partial index helps us with old collection
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
|
||||
ON %s_stats_current (%s_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
"""
|
||||
% (stats_type, stats_type, stats_type)
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine.")
|
||||
|
||||
|
||||
def run_create(cursor, database_engine):
|
||||
"""
|
||||
This function is called as part of the schema delta.
|
||||
It will create indices - partial, if supported - for the new 'separated'
|
||||
room & user statistics.
|
||||
"""
|
||||
_run_create_generic("room", cursor, database_engine)
|
||||
_run_create_generic("user", cursor, database_engine)
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, config):
|
||||
"""
|
||||
This function is run on a database upgrade (of a non-empty database).
|
||||
We have no need to do anything specific here.
|
||||
"""
|
||||
pass
|
||||
@@ -31,7 +31,7 @@ class StateDeltasStore(SQLBaseStore):
|
||||
- state_key (str):
|
||||
- event_id (str|None): new event_id for this state key. None if the
|
||||
state has been deleted.
|
||||
- prev_event_id (str): previous event_id for this state key. None
|
||||
- prev_event_id (str|None): previous event_id for this state key. None
|
||||
if it's new state.
|
||||
|
||||
Args:
|
||||
|
||||
+204
-845
File diff suppressed because it is too large
Load Diff
@@ -55,7 +55,7 @@ class TTLCache(object):
|
||||
if e != SENTINEL:
|
||||
self._expiry_list.remove(e)
|
||||
|
||||
entry = _CacheEntry(expiry_time=expiry, key=key, value=value)
|
||||
entry = _CacheEntry(expiry_time=expiry, ttl=ttl, key=key, value=value)
|
||||
self._data[key] = entry
|
||||
self._expiry_list.add(entry)
|
||||
|
||||
@@ -87,7 +87,8 @@ class TTLCache(object):
|
||||
key: key to look up
|
||||
|
||||
Returns:
|
||||
Tuple[Any, float]: the value from the cache, and the expiry time
|
||||
Tuple[Any, float, float]: the value from the cache, the expiry time
|
||||
and the TTL
|
||||
|
||||
Raises:
|
||||
KeyError if the entry is not found
|
||||
@@ -99,7 +100,7 @@ class TTLCache(object):
|
||||
self._metrics.inc_misses()
|
||||
raise
|
||||
self._metrics.inc_hits()
|
||||
return e.value, e.expiry_time
|
||||
return e.value, e.expiry_time, e.ttl
|
||||
|
||||
def pop(self, key, default=SENTINEL):
|
||||
"""Remove a value from the cache
|
||||
@@ -158,5 +159,6 @@ class _CacheEntry(object):
|
||||
|
||||
# expiry_time is the first attribute, so that entries are sorted by expiry.
|
||||
expiry_time = attr.ib()
|
||||
ttl = attr.ib()
|
||||
key = attr.ib()
|
||||
value = attr.ib()
|
||||
|
||||
+10
-577
@@ -17,18 +17,12 @@ from mock import Mock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse import storage
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests import unittest
|
||||
|
||||
# The expected number of state events in a fresh public room.
|
||||
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5
|
||||
# The expected number of state events in a fresh private room.
|
||||
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6
|
||||
|
||||
|
||||
class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
@@ -39,6 +33,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.handler = self.hs.get_stats_handler()
|
||||
|
||||
@@ -52,7 +47,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
@@ -61,17 +56,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
{
|
||||
"update_name": "populate_stats_process_rooms",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_prepare",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_users",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
"depends_on": "populate_stats_createtables",
|
||||
},
|
||||
)
|
||||
)
|
||||
@@ -81,33 +66,11 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_users",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
def _get_current_stats(self, stats_type, stat_id):
|
||||
table, id_col = storage.stats.TYPE_TO_TABLE[stats_type]
|
||||
|
||||
cols = (
|
||||
["start_ts", "end_ts", "completed_delta_stream_id"]
|
||||
+ list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type])
|
||||
+ list(storage.stats.PER_SLICE_FIELDS[stats_type])
|
||||
)
|
||||
|
||||
return self.get_success(
|
||||
self.store._simple_select_one(
|
||||
table + "_current", {id_col: stat_id}, cols, allow_none=True
|
||||
)
|
||||
)
|
||||
|
||||
def _perform_background_initial_update(self):
|
||||
# Do the initial population of the stats via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
def test_initial_room(self):
|
||||
"""
|
||||
The background updates will build the table from scratch.
|
||||
@@ -151,7 +114,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
Ingestion via notify_new_event will ignore tokens that the background
|
||||
update have already processed.
|
||||
"""
|
||||
|
||||
self.reactor.advance(86401)
|
||||
|
||||
self.hs.config.stats_enabled = False
|
||||
@@ -176,12 +138,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
self.store._all_done = False
|
||||
self.get_success(self.store.update_stats_positions(None))
|
||||
self.get_success(self.store.update_stats_stream_pos(None))
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -192,8 +154,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u2, tok=u2_token)
|
||||
|
||||
# orig_delta_processor = self.store.
|
||||
|
||||
# Now do the initial ingestion.
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
@@ -225,13 +185,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u3, tok=u3_token)
|
||||
|
||||
# self.handler.notify_new_event()
|
||||
|
||||
# We need to let the delta processor advance…
|
||||
self.pump(10 * 60)
|
||||
|
||||
# Get the slices! There should be two -- day 1, and day 2.
|
||||
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
|
||||
# Get the deltas! There should be two -- day 1, and day 2.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
|
||||
# The oldest has 2 joined members
|
||||
self.assertEqual(r[-1]["joined_members"], 2)
|
||||
@@ -304,7 +259,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
|
||||
# Do the initial population of the stats via the background update
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
@@ -344,528 +299,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
# One delta, with two joined members -- the room creator, and our fake
|
||||
# user.
|
||||
r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["joined_members"], 2)
|
||||
|
||||
def test_create_user(self):
|
||||
"""
|
||||
When we create a user, it should have statistics already ready.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
|
||||
u1stats = self._get_current_stats("user", u1)
|
||||
|
||||
self.assertIsNotNone(u1stats)
|
||||
|
||||
# row is complete
|
||||
self.assertIsNotNone(u1stats["completed_delta_stream_id"])
|
||||
|
||||
# not in any rooms by default
|
||||
self.assertEqual(u1stats["public_rooms"], 0)
|
||||
self.assertEqual(u1stats["private_rooms"], 0)
|
||||
|
||||
def test_create_room(self):
|
||||
"""
|
||||
When we create a room, it should have statistics already ready.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
r1stats = self._get_current_stats("room", r1)
|
||||
r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
|
||||
r2stats = self._get_current_stats("room", r2)
|
||||
|
||||
self.assertIsNotNone(r1stats)
|
||||
self.assertIsNotNone(r2stats)
|
||||
|
||||
# row is complete
|
||||
self.assertIsNotNone(r1stats["completed_delta_stream_id"])
|
||||
self.assertIsNotNone(r2stats["completed_delta_stream_id"])
|
||||
|
||||
# contains the default things you'd expect in a fresh room
|
||||
self.assertEqual(
|
||||
r1stats["total_events"],
|
||||
EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM,
|
||||
"Wrong number of total_events in new room's stats!"
|
||||
" You may need to update this if more state events are added to"
|
||||
" the room creation process.",
|
||||
)
|
||||
self.assertEqual(
|
||||
r2stats["total_events"],
|
||||
EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
|
||||
"Wrong number of total_events in new room's stats!"
|
||||
" You may need to update this if more state events are added to"
|
||||
" the room creation process.",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
|
||||
)
|
||||
self.assertEqual(
|
||||
r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM
|
||||
)
|
||||
|
||||
self.assertEqual(r1stats["joined_members"], 1)
|
||||
self.assertEqual(r1stats["invited_members"], 0)
|
||||
self.assertEqual(r1stats["banned_members"], 0)
|
||||
|
||||
self.assertEqual(r2stats["joined_members"], 1)
|
||||
self.assertEqual(r2stats["invited_members"], 0)
|
||||
self.assertEqual(r2stats["banned_members"], 0)
|
||||
|
||||
def test_send_message_increments_total_events(self):
|
||||
"""
|
||||
When we send a message, it increments total_events.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.send(r1, "hiss", tok=u1token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
|
||||
def test_send_state_event_nonoverwriting(self):
|
||||
"""
|
||||
When we send a non-overwriting state event, it increments total_events AND current_state_events
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
self.helper.send_state(
|
||||
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
|
||||
)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.send_state(
|
||||
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy"
|
||||
)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
1,
|
||||
)
|
||||
|
||||
def test_send_state_event_overwriting(self):
|
||||
"""
|
||||
When we send an overwriting state event, it increments total_events ONLY
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
self.helper.send_state(
|
||||
r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
|
||||
)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.send_state(
|
||||
r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby"
|
||||
)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
0,
|
||||
)
|
||||
|
||||
def test_join_first_time(self):
|
||||
"""
|
||||
When a user joins a room for the first time, total_events, current_state_events and
|
||||
joined_members should increase by exactly 1.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2token = self.login("u2", "pass")
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
1,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1
|
||||
)
|
||||
|
||||
def test_join_after_leave(self):
|
||||
"""
|
||||
When a user joins a room after being previously left, total_events and
|
||||
joined_members should increase by exactly 1.
|
||||
current_state_events should not increase.
|
||||
left_members should decrease by exactly 1.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2token = self.login("u2", "pass")
|
||||
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
self.helper.leave(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
0,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["left_members"] - r1stats_ante["left_members"], -1
|
||||
)
|
||||
|
||||
def test_invited(self):
|
||||
"""
|
||||
When a user invites another user, current_state_events, total_events and
|
||||
invited_members should increase by exactly 1.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.invite(r1, u1, u2, tok=u1token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
1,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1
|
||||
)
|
||||
|
||||
def test_join_after_invite(self):
|
||||
"""
|
||||
When a user joins a room after being invited, total_events and
|
||||
joined_members should increase by exactly 1.
|
||||
current_state_events should not increase.
|
||||
invited_members should decrease by exactly 1.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2token = self.login("u2", "pass")
|
||||
|
||||
self.helper.invite(r1, u1, u2, tok=u1token)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
0,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1
|
||||
)
|
||||
|
||||
def test_left(self):
|
||||
"""
|
||||
When a user leaves a room after joining, total_events and
|
||||
left_members should increase by exactly 1.
|
||||
current_state_events should not increase.
|
||||
joined_members should decrease by exactly 1.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2token = self.login("u2", "pass")
|
||||
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.leave(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
0,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["left_members"] - r1stats_ante["left_members"], +1
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
|
||||
)
|
||||
|
||||
def test_banned(self):
|
||||
"""
|
||||
When a user is banned from a room after joining, total_events and
|
||||
left_members should increase by exactly 1.
|
||||
current_state_events should not increase.
|
||||
banned_members should decrease by exactly 1.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2token = self.login("u2", "pass")
|
||||
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
|
||||
self.helper.change_membership(r1, u1, u2, "ban", tok=u1token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
|
||||
self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
|
||||
0,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
|
||||
)
|
||||
|
||||
def test_initial_background_update(self):
|
||||
"""
|
||||
Test that statistics can be generated by the initial background update
|
||||
handler.
|
||||
|
||||
This test also tests that stats rows are not created for new subjects
|
||||
when stats are disabled. However, it may be desirable to change this
|
||||
behaviour eventually to still keep current rows.
|
||||
"""
|
||||
|
||||
self.hs.config.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token)
|
||||
|
||||
# test that these subjects, which were created during a time of disabled
|
||||
# stats, do not have stats.
|
||||
self.assertIsNone(self._get_current_stats("room", r1))
|
||||
self.assertIsNone(self._get_current_stats("user", u1))
|
||||
|
||||
self.hs.config.stats_enabled = True
|
||||
|
||||
self._perform_background_initial_update()
|
||||
|
||||
r1stats = self._get_current_stats("room", r1)
|
||||
u1stats = self._get_current_stats("user", u1)
|
||||
|
||||
self.assertIsNotNone(r1stats["completed_delta_stream_id"])
|
||||
self.assertIsNotNone(u1stats["completed_delta_stream_id"])
|
||||
|
||||
self.assertEqual(r1stats["joined_members"], 1)
|
||||
self.assertEqual(
|
||||
r1stats["total_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
|
||||
)
|
||||
|
||||
self.assertEqual(u1stats["public_rooms"], 1)
|
||||
|
||||
def test_incomplete_stats(self):
|
||||
"""
|
||||
This tests that we track incomplete statistics.
|
||||
|
||||
We first test that incomplete stats are incrementally generated,
|
||||
following the preparation of a background regen.
|
||||
|
||||
We then test that these incomplete rows are completed by the background
|
||||
regen.
|
||||
"""
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1token = self.login("u1", "pass")
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2token = self.login("u2", "pass")
|
||||
u3 = self.register_user("u3", "pass")
|
||||
r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
|
||||
|
||||
# preparation stage of the initial background update
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_prepare", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_delete(
|
||||
"room_stats_current", {"1": 1}, "test_delete_stats"
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_delete(
|
||||
"user_stats_current", {"1": 1}, "test_delete_stats"
|
||||
)
|
||||
)
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
r1stats_ante = self._get_current_stats("room", r1)
|
||||
u1stats_ante = self._get_current_stats("user", u1)
|
||||
u2stats_ante = self._get_current_stats("user", u2)
|
||||
|
||||
self.helper.invite(r1, u1, u2, tok=u1token)
|
||||
self.helper.join(r1, u2, tok=u2token)
|
||||
self.helper.invite(r1, u1, u3, tok=u1token)
|
||||
self.helper.send(r1, "thou shalt yield", tok=u1token)
|
||||
|
||||
r1stats_post = self._get_current_stats("room", r1)
|
||||
u1stats_post = self._get_current_stats("user", u1)
|
||||
u2stats_post = self._get_current_stats("user", u2)
|
||||
|
||||
# now let the background update continue & finish
|
||||
|
||||
self.store._all_done = False
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_rooms",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_prepare",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_users",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_users",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
r1stats_complete = self._get_current_stats("room", r1)
|
||||
u1stats_complete = self._get_current_stats("user", u1)
|
||||
u2stats_complete = self._get_current_stats("user", u2)
|
||||
|
||||
# now we make our assertions
|
||||
|
||||
# first check that none of the stats rows were complete before
|
||||
# the background update occurred.
|
||||
self.assertIsNone(r1stats_ante["completed_delta_stream_id"])
|
||||
self.assertIsNone(r1stats_post["completed_delta_stream_id"])
|
||||
self.assertIsNone(u1stats_ante["completed_delta_stream_id"])
|
||||
self.assertIsNone(u1stats_post["completed_delta_stream_id"])
|
||||
self.assertIsNone(u2stats_ante["completed_delta_stream_id"])
|
||||
self.assertIsNone(u2stats_post["completed_delta_stream_id"])
|
||||
|
||||
# check that _ante rows are all skeletons without any deltas applied
|
||||
self.assertEqual(r1stats_ante["joined_members"], 0)
|
||||
self.assertEqual(r1stats_ante["invited_members"], 0)
|
||||
self.assertEqual(r1stats_ante["total_events"], 0)
|
||||
self.assertEqual(r1stats_ante["current_state_events"], 0)
|
||||
|
||||
self.assertEqual(u1stats_ante["public_rooms"], 0)
|
||||
self.assertEqual(u1stats_ante["private_rooms"], 0)
|
||||
self.assertEqual(u2stats_ante["public_rooms"], 0)
|
||||
self.assertEqual(u2stats_ante["private_rooms"], 0)
|
||||
|
||||
# check that _post rows have the expected deltas applied
|
||||
self.assertEqual(r1stats_post["joined_members"], 1)
|
||||
self.assertEqual(r1stats_post["invited_members"], 1)
|
||||
self.assertEqual(r1stats_post["total_events"], 4)
|
||||
self.assertEqual(r1stats_post["current_state_events"], 2)
|
||||
|
||||
self.assertEqual(u1stats_post["public_rooms"], 0)
|
||||
self.assertEqual(u1stats_post["private_rooms"], 0)
|
||||
self.assertEqual(u2stats_post["public_rooms"], 0)
|
||||
self.assertEqual(u2stats_post["private_rooms"], 1)
|
||||
|
||||
# check that _complete rows are complete and correct
|
||||
self.assertEqual(r1stats_complete["joined_members"], 2)
|
||||
self.assertEqual(r1stats_complete["invited_members"], 1)
|
||||
self.assertEqual(
|
||||
r1stats_complete["total_events"],
|
||||
4 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
|
||||
)
|
||||
self.assertEqual(
|
||||
r1stats_complete["current_state_events"],
|
||||
2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
|
||||
)
|
||||
|
||||
self.assertEqual(u1stats_complete["public_rooms"], 0)
|
||||
self.assertEqual(u1stats_complete["private_rooms"], 1)
|
||||
self.assertEqual(u2stats_complete["public_rooms"], 0)
|
||||
self.assertEqual(u2stats_complete["private_rooms"], 1)
|
||||
|
||||
@@ -987,6 +987,75 @@ class MatrixFederationAgentTests(TestCase):
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, b"other-server")
|
||||
|
||||
def test_well_known_cache_with_temp_failure(self):
|
||||
"""Test that we refetch well-known before the cache expires, and that
|
||||
it ignores transient errors.
|
||||
"""
|
||||
|
||||
well_known_resolver = WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
well_known_cache=self.well_known_cache,
|
||||
)
|
||||
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
# there should be an attempt to connect on port 443 for the .well-known
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
self.assertEqual(host, "1.2.3.4")
|
||||
self.assertEqual(port, 443)
|
||||
|
||||
well_known_server = self._handle_well_known_connection(
|
||||
client_factory,
|
||||
expected_sni=b"testserv",
|
||||
response_headers={b"Cache-Control": b"max-age=1000"},
|
||||
content=b'{ "m.server": "target-server" }',
|
||||
)
|
||||
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, b"target-server")
|
||||
|
||||
# close the tcp connection
|
||||
well_known_server.loseConnection()
|
||||
|
||||
# Get close to the cache expiry, this will cause the resolver to do
|
||||
# another lookup.
|
||||
self.reactor.pump((900.0,))
|
||||
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
clients = self.reactor.tcpClients
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
|
||||
# fonx the connection attempt, this will be treated as a temporary
|
||||
# failure.
|
||||
client_factory.clientConnectionFailed(None, Exception("nope"))
|
||||
|
||||
# attemptdelay on the hostnameendpoint is 0.3, so takes that long before the
|
||||
# .well-known request fails.
|
||||
self.reactor.pump((0.4,))
|
||||
|
||||
# Resolver should return cached value, despite the lookup failing.
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, b"target-server")
|
||||
|
||||
# Expire the cache and repeat the request
|
||||
self.reactor.pump((100.0,))
|
||||
|
||||
# Repated the request, this time it should fail if the lookup fails.
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
clients = self.reactor.tcpClients
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
client_factory.clientConnectionFailed(None, Exception("nope"))
|
||||
self.reactor.pump((0.4,))
|
||||
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, None)
|
||||
|
||||
|
||||
class TestCachePeriodFromHeaders(TestCase):
|
||||
def test_cache_control(self):
|
||||
|
||||
@@ -128,12 +128,8 @@ class RestHelper(object):
|
||||
|
||||
return channel.json_body
|
||||
|
||||
def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""):
|
||||
path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
|
||||
room_id,
|
||||
event_type,
|
||||
state_key,
|
||||
)
|
||||
def send_state(self, room_id, event_type, body, tok, expect_code=200):
|
||||
path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
|
||||
if tok:
|
||||
path = path + "?access_token=%s" % tok
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ class CacheTestCase(unittest.TestCase):
|
||||
self.assertTrue("one" in self.cache)
|
||||
self.assertEqual(self.cache.get("one"), "1")
|
||||
self.assertEqual(self.cache["one"], "1")
|
||||
self.assertEqual(self.cache.get_with_expiry("one"), ("1", 110))
|
||||
self.assertEqual(self.cache.get_with_expiry("one"), ("1", 110, 10))
|
||||
self.assertEqual(self.cache._metrics.hits, 3)
|
||||
self.assertEqual(self.cache._metrics.misses, 0)
|
||||
|
||||
@@ -77,7 +77,7 @@ class CacheTestCase(unittest.TestCase):
|
||||
self.assertEqual(self.cache["two"], "2")
|
||||
self.assertEqual(self.cache["three"], "3")
|
||||
|
||||
self.assertEqual(self.cache.get_with_expiry("two"), ("2", 120))
|
||||
self.assertEqual(self.cache.get_with_expiry("two"), ("2", 120, 20))
|
||||
|
||||
self.assertEqual(self.cache._metrics.hits, 5)
|
||||
self.assertEqual(self.cache._metrics.misses, 0)
|
||||
|
||||
Reference in New Issue
Block a user