1
0

Compare commits

..

20 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
19d8d3fc81 Don't populate empty/null fields in publicRooms.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-24 11:29:51 +01:00
Olivier Wilkinson (reivilibre)
69f6a46cb5 Use room_stats and room_state for room directory search
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 15:50:29 +01:00
Olivier Wilkinson (reivilibre)
8502c668bf Changelog for #5691
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Olivier Wilkinson (reivilibre)
dc68c2a101 Update state_events and current_state_events upon receipt of a state
event #5690.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Olivier Wilkinson (reivilibre)
181c1a6072 Don't decrease left_members if the user is joining for the first time.
Fixes #5423

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Olivier Wilkinson (reivilibre)
20ae4afe7e Create room_stats rows for new rooms. #5624
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-07-17 09:46:01 +01:00
Erik Johnston
c831c5b2bb Merge pull request #5597 from matrix-org/erikj/admin_api_cmd
Create basic admin command app
2019-07-16 15:59:36 +01:00
Erik Johnston
5ed7853bb0 Remove pointless description 2019-07-16 11:45:57 +01:00
Erik Johnston
f44354e17f Clean up arg name and remove lying comment 2019-07-16 11:39:40 +01:00
Erik Johnston
d0d479c1af Fix typo in synapse/app/admin_cmd.py
Co-Authored-By: Aaron Raimist <aaron@raim.ist>
2019-07-16 09:52:56 +01:00
Erik Johnston
03cc8c4b5d Fix invoking add_argument from homeserver.py 2019-07-15 14:25:05 +01:00
Erik Johnston
eca4f5ac73 s/exfiltrate_user_data/export_user_data/ 2019-07-15 14:17:28 +01:00
Erik Johnston
1b2067f53d Add FileExfiltrationWriter 2019-07-15 14:15:22 +01:00
Erik Johnston
e8c53b07f2 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/admin_api_cmd 2019-07-15 14:13:22 +01:00
Erik Johnston
c8f35d8d38 Use set_defaults(func=) style 2019-07-15 14:09:35 +01:00
Erik Johnston
fdefb9e29a Move creation of ArgumentParser to caller 2019-07-15 14:09:35 +01:00
Erik Johnston
37b524f971 Fix up comments 2019-07-15 14:09:35 +01:00
Erik Johnston
823e13ddf4 Change add_arguments to be a static method 2019-07-15 14:09:33 +01:00
Erik Johnston
10fe904d88 Newsfile 2019-07-02 17:21:27 +01:00
Erik Johnston
9f3c0a8556 Add basic admin cmd app 2019-07-02 17:12:48 +01:00
17 changed files with 672 additions and 191 deletions

1
changelog.d/5597.feature Normal file
View File

@@ -0,0 +1 @@
Add a basic admin command app to allow server operators to run Synapse admin commands separately from the main production instance.

View File

@@ -1 +0,0 @@
Use `M_USER_DEACTIVATED` instead of `M_UNKNOWN` for errcode when a deactivated user attempts to login.

1
changelog.d/5691.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix improper or missing room_stats updates when handling state events (deltas).

View File

@@ -61,7 +61,6 @@ class Codes(object):
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
USER_DEACTIVATED = "M_USER_DEACTIVATED"
class CodeMessageException(RuntimeError):
@@ -152,7 +151,7 @@ class UserDeactivatedError(SynapseError):
msg (str): The human-readable error message
"""
super(UserDeactivatedError, self).__init__(
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.UNKNOWN
)

View File

@@ -48,7 +48,7 @@ def register_sighup(func):
_sighup_callbacks.append(func)
def start_worker_reactor(appname, config):
def start_worker_reactor(appname, config, run_command=reactor.run):
""" Run the reactor in the main process
Daemonizes if necessary, and then configures some resources, before starting
@@ -57,6 +57,7 @@ def start_worker_reactor(appname, config):
Args:
appname (str): application name which will be sent to syslog
config (synapse.config.Config): config object
run_command (Callable[]): callable that actually runs the reactor
"""
logger = logging.getLogger(config.worker_app)
@@ -69,11 +70,19 @@ def start_worker_reactor(appname, config):
daemonize=config.worker_daemonize,
print_pidfile=config.print_pidfile,
logger=logger,
run_command=run_command,
)
def start_reactor(
appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
appname,
soft_file_limit,
gc_thresholds,
pid_file,
daemonize,
print_pidfile,
logger,
run_command=reactor.run,
):
""" Run the reactor in the main process
@@ -88,6 +97,7 @@ def start_reactor(
daemonize (bool): true to run the reactor in a background process
print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
run_command (Callable[]): callable that actually runs the reactor
"""
install_dns_limiter(reactor)
@@ -97,7 +107,7 @@ def start_reactor(
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
reactor.run()
run_command()
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused

264
synapse/app/admin_cmd.py Normal file
View File

@@ -0,0 +1,264 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import os
import sys
import tempfile
from canonicaljson import json
from twisted.internet import defer, task
import synapse
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.admin import ExfiltrationWriter
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.logcontext import LoggingContext
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.admin_cmd")
class AdminCmdSlavedStore(
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedFilteringStore,
SlavedPresenceStore,
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
RoomStore,
BaseSlavedStore,
):
pass
class AdminCmdServer(HomeServer):
DATASTORE_CLASS = AdminCmdSlavedStore
def _listen_http(self, listener_config):
pass
def start_listening(self, listeners):
pass
def build_tcp_replication(self):
return AdminCmdReplicationHandler(self)
class AdminCmdReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
pass
def get_streams_to_replicate(self):
return {}
@defer.inlineCallbacks
def export_data_command(hs, args):
"""Export data for a user.
Args:
hs (HomeServer)
args (argparse.Namespace)
"""
user_id = args.user_id
directory = args.output_directory
res = yield hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
print(res)
class FileExfiltrationWriter(ExfiltrationWriter):
"""An ExfiltrationWriter that writes the users data to a directory.
Returns the directory location on completion.
Note: This writes to disk on the main reactor thread.
Args:
user_id (str): The user whose data is being exfiltrated.
directory (str|None): The directory to write the data to, if None then
will write to a temporary directory.
"""
def __init__(self, user_id, directory=None):
self.user_id = user_id
if directory:
self.base_directory = directory
else:
self.base_directory = tempfile.mkdtemp(
prefix="synapse-exfiltrate__%s__" % (user_id,)
)
os.makedirs(self.base_directory, exist_ok=True)
if list(os.listdir(self.base_directory)):
raise Exception("Directory must be empty")
def write_events(self, room_id, events):
room_directory = os.path.join(self.base_directory, "rooms", room_id)
os.makedirs(room_directory, exist_ok=True)
events_file = os.path.join(room_directory, "events")
with open(events_file, "a") as f:
for event in events:
print(json.dumps(event.get_pdu_json()), file=f)
def write_state(self, room_id, event_id, state):
room_directory = os.path.join(self.base_directory, "rooms", room_id)
state_directory = os.path.join(room_directory, "state")
os.makedirs(state_directory, exist_ok=True)
event_file = os.path.join(state_directory, event_id)
with open(event_file, "a") as f:
for event in state.values():
print(json.dumps(event.get_pdu_json()), file=f)
def write_invite(self, room_id, event, state):
self.write_events(room_id, [event])
# We write the invite state somewhere else as they aren't full events
# and are only a subset of the state at the event.
room_directory = os.path.join(self.base_directory, "rooms", room_id)
os.makedirs(room_directory, exist_ok=True)
invite_state = os.path.join(room_directory, "invite_state")
with open(invite_state, "a") as f:
for event in state.values():
print(json.dumps(event), file=f)
def finished(self):
return self.base_directory
def start(config_options):
parser = argparse.ArgumentParser(description="Synapse Admin Command")
HomeServerConfig.add_arguments_to_parser(parser)
subparser = parser.add_subparsers(
title="Admin Commands",
required=True,
dest="command",
metavar="<admin_command>",
help="The admin command to perform.",
)
export_data_parser = subparser.add_parser(
"export-data", help="Export all data for a user"
)
export_data_parser.add_argument("user_id", help="User to extra data from")
export_data_parser.add_argument(
"--output-directory",
action="store",
metavar="DIRECTORY",
required=False,
help="The directory to store the exported data in. Must be empty. Defaults"
" to creating a temp directory.",
)
export_data_parser.set_defaults(func=export_data_command)
try:
config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if config.worker_app is not None:
assert config.worker_app == "synapse.app.admin_cmd"
# Update the config with some basic overrides so that don't have to specify
# a full worker config.
config.worker_app = "synapse.app.admin_cmd"
if (
not config.worker_daemonize
and not config.worker_log_file
and not config.worker_log_config
):
# Since we're meant to be run as a "command" let's not redirect stdio
# unless we've actually set log config.
config.no_redirect_stdio = True
# Explicitly disable background processes
config.update_user_directory = False
config.start_pushers = False
config.send_federation = False
setup_logging(config, use_worker_options=True)
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = AdminCmdServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
# We use task.react as the basic run command as it correctly handles tearing
# down the reactor when the deferreds resolve and setting the return value.
# We also make sure that `_base.start` gets run before we actually run the
# command.
@defer.inlineCallbacks
def run(_reactor):
with LoggingContext("command"):
yield _base.start(ss, [])
yield args.func(ss, args)
_base.start_worker_reactor(
"synapse-admin-cmd", config, run_command=lambda: task.react(run)
)
if __name__ == "__main__":
with LoggingContext("main"):
start(sys.argv[1:])

View File

@@ -137,12 +137,42 @@ class Config(object):
return file_stream.read()
def invoke_all(self, name, *args, **kargs):
"""Invoke all instance methods with the given name and arguments in the
class's MRO.
Args:
name (str): Name of function to invoke
*args
**kwargs
Returns:
list: The list of the return values from each method called
"""
results = []
for cls in type(self).mro():
if name in cls.__dict__:
results.append(getattr(cls, name)(self, *args, **kargs))
return results
@classmethod
def invoke_all_static(cls, name, *args, **kargs):
"""Invoke all static methods with the given name and arguments in the
class's MRO.
Args:
name (str): Name of function to invoke
*args
**kwargs
Returns:
list: The list of the return values from each method called
"""
results = []
for c in cls.mro():
if name in c.__dict__:
results.append(getattr(c, name)(*args, **kargs))
return results
def generate_config(
self,
config_dir_path,
@@ -202,6 +232,23 @@ class Config(object):
Returns: Config object.
"""
config_parser = argparse.ArgumentParser(description=description)
cls.add_arguments_to_parser(config_parser)
obj, _ = cls.load_config_with_parser(config_parser, argv)
return obj
@classmethod
def add_arguments_to_parser(cls, config_parser):
"""Adds all the config flags to an ArgumentParser.
Doesn't support config-file-generation: used by the worker apps.
Used for workers where we want to add extra flags/subcommands.
Args:
config_parser (ArgumentParser): App description
"""
config_parser.add_argument(
"-c",
"--config-path",
@@ -219,16 +266,34 @@ class Config(object):
" Defaults to the directory containing the last config file",
)
cls.invoke_all_static("add_arguments", config_parser)
@classmethod
def load_config_with_parser(cls, parser, argv):
"""Parse the commandline and config files with the given parser
Doesn't support config-file-generation: used by the worker apps.
Used for workers where we want to add extra flags/subcommands.
Args:
parser (ArgumentParser)
argv (list[str])
Returns:
tuple[HomeServerConfig, argparse.Namespace]: Returns the parsed
config object and the parsed argparse.Namespace object from
`parser.parse_args(..)`
"""
obj = cls()
obj.invoke_all("add_arguments", config_parser)
config_args = config_parser.parse_args(argv)
config_args = parser.parse_args(argv)
config_files = find_config_files(search_paths=config_args.config_path)
if not config_files:
config_parser.error("Must supply a config file.")
parser.error("Must supply a config file.")
if config_args.keys_directory:
config_dir_path = config_args.keys_directory
@@ -244,7 +309,7 @@ class Config(object):
obj.invoke_all("read_arguments", config_args)
return obj
return obj, config_args
@classmethod
def load_or_generate_config(cls, description, argv):
@@ -401,7 +466,7 @@ class Config(object):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
obj.invoke_all("add_arguments", parser)
obj.invoke_all_static("add_arguments", parser)
args = parser.parse_args(remaining_args)
config_dict = read_config_files(config_files)

View File

@@ -69,7 +69,8 @@ class DatabaseConfig(Config):
if database_path is not None:
self.database_config["args"]["database"] = database_path
def add_arguments(self, parser):
@staticmethod
def add_arguments(parser):
db_group = parser.add_argument_group("database")
db_group.add_argument(
"-d",

View File

@@ -103,7 +103,8 @@ class LoggingConfig(Config):
if args.log_file is not None:
self.log_file = args.log_file
def add_arguments(cls, parser):
@staticmethod
def add_arguments(parser):
logging_group = parser.add_argument_group("logging")
logging_group.add_argument(
"-v",

View File

@@ -237,7 +237,8 @@ class RegistrationConfig(Config):
% locals()
)
def add_arguments(self, parser):
@staticmethod
def add_arguments(parser):
reg_group = parser.add_argument_group("registration")
reg_group.add_argument(
"--enable-registration",

View File

@@ -639,7 +639,8 @@ class ServerConfig(Config):
if args.print_pidfile is not None:
self.print_pidfile = args.print_pidfile
def add_arguments(self, parser):
@staticmethod
def add_arguments(parser):
server_group = parser.add_argument_group("server")
server_group.add_argument(
"-D",

View File

@@ -764,6 +764,10 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None
data = yield self.handler.get_local_public_room_list(
limit, since_token, network_tuple=network_tuple, from_federation=True
)

View File

@@ -17,16 +17,15 @@ import logging
from collections import namedtuple
from six import PY3, iteritems
from six.moves import range
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
from twisted.internet.defer import maybeDeferred
from synapse.api.constants import EventTypes, JoinRules
from synapse.types import ThirdPartyInstanceID
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -36,7 +35,6 @@ logger = logging.getLogger(__name__)
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
# This is used to indicate we should only return rooms published to the main list.
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
@@ -71,6 +69,8 @@ class RoomListHandler(BaseHandler):
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
from_federation (bool): true iff the request comes from the federation
API
"""
if not self.enable_room_list_search:
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -132,200 +132,127 @@ class RoomListHandler(BaseHandler):
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
timeout (int|None): Amount of seconds to wait for a response before
timing out.
timing out. TODO
"""
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
pagination_token = None
if since_token and since_token != "END": # todo ought we support END and START?
if since_token[0] in ("+", "-"):
forwards = since_token[0] == "+"
pagination_token = since_token[1:]
else:
raise SyntaxError("shrug ") # TODO
else:
since_token = None
forwards = True
rooms_to_order_value = {}
rooms_to_num_joined = {}
# we request one more than wanted to see if there are more pages to come
probing_limit = limit + 1 if limit is not None else None
newly_visible = []
newly_unpublished = []
if since_token:
stream_token = since_token.stream_ordering
current_public_id = yield self.store.get_current_public_room_stream_id()
public_room_stream_id = since_token.public_room_stream_id
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
public_room_stream_id, current_public_id, network_tuple=network_tuple
)
else:
stream_token = yield self.store.get_room_max_stream_ordering()
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
room_ids = yield self.store.get_public_room_ids_at_stream_id(
public_room_stream_id, network_tuple=network_tuple
results = yield self.store.get_largest_public_rooms(
network_tuple, search_filter, probing_limit, pagination_token, forwards
)
# We want to return rooms in a particular order: the number of joined
# users. We then arbitrarily use the room_id as a tie breaker.
def build_room_entry(room):
entry = {
"room_id": room["room_id"],
"name": room["name"],
"topic": room["topic"],
"canonical_alias": room["canonical_alias"],
"num_joined_members": room["joined_members"],
"avatar_url": room["avatar"],
"world_readable": room["history_visibility"] == "world_readable",
}
@defer.inlineCallbacks
def get_order_for_room(room_id):
# Most of the rooms won't have changed between the since token and
# now (especially if the since token is "now"). So, we can ask what
# the current users are in a room (that will hit a cache) and then
# check if the room has changed since the since token. (We have to
# do it in that order to avoid races).
# If things have changed then fall back to getting the current state
# at the since token.
joined_users = yield self.store.get_users_in_room(room_id)
if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token
)
# Filter out Nones rather omit the field altogether
return {
k: v for k, v in entry.items() if v is not None
}
if not latest_event_ids:
return
joined_users = yield self.state_handler.get_current_users_in_room(
room_id, latest_event_ids
)
num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users
if num_joined_users == 0:
return
# We want larger rooms to be first, hence negating num_joined_users
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
logger.info(
"Getting ordering for %i rooms since %s", len(room_ids), stream_token
)
yield concurrently_execute(get_order_for_room, room_ids, 10)
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
sorted_rooms = [room_id for room_id, _ in sorted_entries]
# `sorted_rooms` should now be a list of all public room ids that is
# stable across pagination. Therefore, we can use indices into this
# list as our pagination tokens.
# Filter out rooms that we don't want to return
rooms_to_scan = [
r
for r in sorted_rooms
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
results = [
build_room_entry(r) for r in results
]
total_room_count = len(rooms_to_scan)
if since_token:
# Filter out rooms we've already returned previously
# `since_token.current_limit` is the index of the last room we
# sent down, so we exclude it and everything before/after it.
if since_token.direction_is_forward:
rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
response = {}
num_results = len(results)
if num_results > 0:
final_room_id = results[-1]["room_id"]
initial_room_id = results[0]["room_id"]
if limit is not None:
more_to_come = num_results == probing_limit
results = results[0:limit]
else:
rooms_to_scan = rooms_to_scan[: since_token.current_limit]
rooms_to_scan.reverse()
more_to_come = False
logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
if not forwards or (forwards and more_to_come):
response["next_batch"] = "+%s" % (final_room_id,)
# _append_room_entry_to_chunk will append to chunk but will stop if
# len(chunk) > limit
#
# Normally we will generate enough results on the first iteration here,
# but if there is a search filter, _append_room_entry_to_chunk may
# filter some results out, in which case we loop again.
#
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
#
# XXX if there is no limit, we may end up DoSing the server with
# calls to get_current_state_ids for every single room on the
# server. Surely we should cap this somehow?
#
if limit:
step = limit + 1
else:
# step cannot be zero
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
if since_token and (forwards or (not forwards and more_to_come)):
if num_results > 0:
response["prev_batch"] = "-%s" % (initial_room_id,)
else:
response["prev_batch"] = "-%s" % (pagination_token,)
chunk = []
for i in range(0, len(rooms_to_scan), step):
if timeout and self.clock.time() > timeout:
raise Exception("Timed out searching room directory")
if from_federation:
# only show rooms with m.federate=True or absent (default is True)
batch = rooms_to_scan[i : i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r,
rooms_to_num_joined[r],
chunk,
limit,
search_filter,
from_federation=from_federation,
),
batch,
5,
# get rooms' state
room_state_ids = yield defer.gatherResults(
[
maybeDeferred(self.store.get_current_state_ids, room["room_id"])
for room in results
],
consumeErrors=True,
)
logger.info("Now %i rooms in result", len(chunk))
if len(chunk) >= limit + 1:
break
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
# get rooms' creation state events' IDs
room_creation_event_ids = {
room["room_id"]: event_ids.get((EventTypes.Create, ""))
for (room, event_ids) in zip(results, room_state_ids)
}
# Work out the new limit of the batch for pagination, or None if we
# know there are no more results that would be returned.
# i.e., [since_token.current_limit..new_limit] is the batch of rooms
# we've returned (or the reverse if we paginated backwards)
# We tried to pull out limit + 1 rooms above, so if we have <= limit
# then we know there are no more results to return
new_limit = None
if chunk and (not limit or len(chunk) > limit):
# get rooms' creation state events
creation_events_by_id = yield self.store.get_events(
room_creation_event_ids.values()
)
if not since_token or since_token.direction_is_forward:
if limit:
chunk = chunk[:limit]
last_room_id = chunk[-1]["room_id"]
else:
if limit:
chunk = chunk[-limit:]
last_room_id = chunk[0]["room_id"]
# associate them with the room IDs
room_creation_events = {
room_id: creation_events_by_id[event_id]
for (room_id, event_id) in room_creation_event_ids.items()
}
new_limit = sorted_rooms.index(last_room_id)
# now filter out rooms with m.federate: False in their create event
results = [
room
for room in results
if room_creation_events[room["room_id"]].content.get("m.federate", True)
]
results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
for room in results:
# populate search result entries with additional fields, namely
# 'aliases' and 'guest_can_join'
room_id = room["room_id"]
if since_token:
results["new_rooms"] = bool(newly_visible)
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
room["aliases"] = aliases
if not since_token or since_token.direction_is_forward:
if new_limit is not None:
results["next_batch"] = RoomListNextBatch(
stream_ordering=stream_token,
public_room_stream_id=public_room_stream_id,
current_limit=new_limit,
direction_is_forward=True,
).to_token()
state_ids = yield self.store.get_current_state_ids(room_id)
guests_can_join = False
guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
if guest_access_state_id is not None:
guest_access = yield self.store.get_event(guest_access_state_id)
if guest_access is not None:
if guest_access.content.get("guest_access") == "can_join":
guests_can_join = True
room["guest_can_join"] = guests_can_join
if since_token:
results["prev_batch"] = since_token.copy_and_replace(
direction_is_forward=False,
current_limit=since_token.current_limit + 1,
).to_token()
else:
if new_limit is not None:
results["prev_batch"] = RoomListNextBatch(
stream_ordering=stream_token,
public_room_stream_id=public_room_stream_id,
current_limit=new_limit,
direction_is_forward=False,
).to_token()
response["chunk"] = results
if since_token:
results["next_batch"] = since_token.copy_and_replace(
direction_is_forward=True,
current_limit=since_token.current_limit - 1,
).to_token()
# TODO for federation, we currently don't remove m.federate=False rooms
# from the total room count estimate.
response["total_room_count_estimate"] = yield self.store.count_public_rooms()
defer.returnValue(results)
defer.returnValue(response)
@defer.inlineCallbacks
def _append_room_entry_to_chunk(
@@ -560,7 +487,6 @@ class RoomListNextBatch(
),
)
):
KEY_DICT = {
"stream_ordering": "s",
"public_room_stream_id": "p",

View File

@@ -148,26 +148,44 @@ class StatsHandler(StateDeltasHandler):
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
yield self.store.update_stats_delta(
now, "room", room_id, "state_events", +1
)
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
yield self.store.update_stats_delta(
now, "room", room_id, "current_state_events", +1
)
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
@@ -246,6 +264,23 @@ class StatsHandler(StateDeltasHandler):
},
)
# Also add room stats with just the one state event
# (the room creation state event)
yield self.store.update_stats(
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": 1,
"joined_members": 0,
"invited_members": 0,
"left_members": 0,
"banned_members": 0,
"state_events": 1,
},
)
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}

View File

@@ -332,6 +332,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
limit = parse_integer(request, "limit", 0)
since_token = parse_string(request, "since", None)
if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
@@ -369,6 +373,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
else:
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
if limit == 0:
# zero is a special value which corresponds to no limit.
limit = None
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -161,6 +162,167 @@ class RoomWorkerStore(SQLBaseStore):
"get_public_room_changes", get_public_room_changes_txn
)
def count_public_rooms(self):
"""
Counts the number of public rooms as tracked in the room_stats and room_state
table.
A public room is one who has is_public set
AND is publicly-joinable and/or world-readable.
Returns:
number of public rooms on this homeserver's room directory
"""
def _count_public_rooms_txn(txn):
sql = """
SELECT COUNT(*)
FROM room_stats
JOIN room_state USING (room_id)
JOIN rooms USING (room_id)
WHERE
is_public
AND (
join_rules = 'public'
OR history_visibility = 'world_readable'
)
"""
txn.execute(sql)
return txn.fetchone()[0]
return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
@defer.inlineCallbacks
def get_largest_public_rooms(
self, network_tuple, search_filter, limit, pagination_token, forwards
):
"""TODO doc this
Args:
network_tuple (ThirdPartyInstanceID|None):
search_filter (dict|None):
limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
pagination_token (str|None): if present, a room ID which is to be
the (first/last) included in the results.
forwards (bool): true iff going forwards, going backwards otherwise
Returns:
Rooms in order: biggest number of joined users first.
We then arbitrarily use the room_id as a tie breaker.
"""
# TODO probably want to use ts_… on Postgres?
sql = """
SELECT
room_id, name, topic, canonical_alias, joined_members,
avatar, history_visibility, joined_members
FROM
room_stats
JOIN room_state USING (room_id)
JOIN rooms USING (room_id)
"""
query_args = []
if network_tuple:
sql += """
LEFT JOIN appservice_room_list arl USING (room_id)
"""
sql += """
WHERE
is_public
AND (
join_rules = 'public'
OR history_visibility = 'world_readable'
)
"""
if pagination_token:
pt_joined = yield self._simple_select_one_onecol(
table="room_stats",
keyvalues={"room_id": pagination_token},
retcol="joined_members",
desc="get_largest_public_rooms",
)
if forwards:
sql += """
AND (
(joined_members < ?)
OR (joined_members = ? AND room_id >= ?)
)
"""
else:
sql += """
AND (
(joined_members > ?)
OR (joined_members = ? AND room_id <= ?)
)
"""
query_args += [pt_joined, pt_joined, pagination_token]
if search_filter and search_filter.get("generic_search_term", None):
search_term = "%" + search_filter["generic_search_term"] + "%"
sql += """
AND (
name LIKE ?
OR topic LIKE ?
OR canonical_alias LIKE ?
)
"""
query_args += [search_term, search_term, search_term]
if network_tuple:
sql += "AND ("
if network_tuple.appservice_id:
sql += "appservice_id = ? AND "
query_args.append(network_tuple.appservice_id)
else:
sql += "appservice_id IS NULL AND "
if network_tuple.network_id:
sql += "network_id = ?)"
query_args.append(network_tuple.network_id)
else:
sql += "network_id IS NULL)"
if forwards:
sql += """
ORDER BY
joined_members DESC, room_id ASC
"""
else:
sql += """
ORDER BY
joined_members ASC, room_id DESC
"""
if limit is not None:
# be cautious about SQL injection
assert isinstance(limit, int)
sql += """
LIMIT %d
""" % (
limit,
)
def _get_largest_public_rooms_txn(txn):
txn.execute(sql, query_args)
results = self.cursor_to_dict(txn)
if not forwards:
results.reverse()
return results
ret_val = yield self.runInteraction(
"get_largest_public_rooms", _get_largest_public_rooms_txn
)
defer.returnValue(ret_val)
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
return self._simple_select_one_onecol(

View File

@@ -68,6 +68,9 @@ class StatsStore(StateDeltasStore):
yield self._end_background_update("populate_stats_createtables")
defer.returnValue(1)
# TODO dev only
yield self.delete_all_stats()
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables