Compare commits
20 Commits
anoa/user_
...
rei/room_d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19d8d3fc81 | ||
|
|
69f6a46cb5 | ||
|
|
8502c668bf | ||
|
|
dc68c2a101 | ||
|
|
181c1a6072 | ||
|
|
20ae4afe7e | ||
|
|
c831c5b2bb | ||
|
|
5ed7853bb0 | ||
|
|
f44354e17f | ||
|
|
d0d479c1af | ||
|
|
03cc8c4b5d | ||
|
|
eca4f5ac73 | ||
|
|
1b2067f53d | ||
|
|
e8c53b07f2 | ||
|
|
c8f35d8d38 | ||
|
|
fdefb9e29a | ||
|
|
37b524f971 | ||
|
|
823e13ddf4 | ||
|
|
10fe904d88 | ||
|
|
9f3c0a8556 |
1
changelog.d/5597.feature
Normal file
1
changelog.d/5597.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add a basic admin command app to allow server operators to run Synapse admin commands separately from the main production instance.
|
||||
@@ -1 +0,0 @@
|
||||
Use `M_USER_DEACTIVATED` instead of `M_UNKNOWN` for errcode when a deactivated user attempts to login.
|
||||
1
changelog.d/5691.bugfix
Normal file
1
changelog.d/5691.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix improper or missing room_stats updates when handling state events (deltas).
|
||||
@@ -61,7 +61,6 @@ class Codes(object):
|
||||
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
|
||||
WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
|
||||
EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT"
|
||||
USER_DEACTIVATED = "M_USER_DEACTIVATED"
|
||||
|
||||
|
||||
class CodeMessageException(RuntimeError):
|
||||
@@ -152,7 +151,7 @@ class UserDeactivatedError(SynapseError):
|
||||
msg (str): The human-readable error message
|
||||
"""
|
||||
super(UserDeactivatedError, self).__init__(
|
||||
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
|
||||
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.UNKNOWN
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ def register_sighup(func):
|
||||
_sighup_callbacks.append(func)
|
||||
|
||||
|
||||
def start_worker_reactor(appname, config):
|
||||
def start_worker_reactor(appname, config, run_command=reactor.run):
|
||||
""" Run the reactor in the main process
|
||||
|
||||
Daemonizes if necessary, and then configures some resources, before starting
|
||||
@@ -57,6 +57,7 @@ def start_worker_reactor(appname, config):
|
||||
Args:
|
||||
appname (str): application name which will be sent to syslog
|
||||
config (synapse.config.Config): config object
|
||||
run_command (Callable[]): callable that actually runs the reactor
|
||||
"""
|
||||
|
||||
logger = logging.getLogger(config.worker_app)
|
||||
@@ -69,11 +70,19 @@ def start_worker_reactor(appname, config):
|
||||
daemonize=config.worker_daemonize,
|
||||
print_pidfile=config.print_pidfile,
|
||||
logger=logger,
|
||||
run_command=run_command,
|
||||
)
|
||||
|
||||
|
||||
def start_reactor(
|
||||
appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
|
||||
appname,
|
||||
soft_file_limit,
|
||||
gc_thresholds,
|
||||
pid_file,
|
||||
daemonize,
|
||||
print_pidfile,
|
||||
logger,
|
||||
run_command=reactor.run,
|
||||
):
|
||||
""" Run the reactor in the main process
|
||||
|
||||
@@ -88,6 +97,7 @@ def start_reactor(
|
||||
daemonize (bool): true to run the reactor in a background process
|
||||
print_pidfile (bool): whether to print the pid file, if daemonize is True
|
||||
logger (logging.Logger): logger instance to pass to Daemonize
|
||||
run_command (Callable[]): callable that actually runs the reactor
|
||||
"""
|
||||
|
||||
install_dns_limiter(reactor)
|
||||
@@ -97,7 +107,7 @@ def start_reactor(
|
||||
change_resource_limit(soft_file_limit)
|
||||
if gc_thresholds:
|
||||
gc.set_threshold(*gc_thresholds)
|
||||
reactor.run()
|
||||
run_command()
|
||||
|
||||
# make sure that we run the reactor with the sentinel log context,
|
||||
# otherwise other PreserveLoggingContext instances will get confused
|
||||
|
||||
264
synapse/app/admin_cmd.py
Normal file
264
synapse/app/admin_cmd.py
Normal file
@@ -0,0 +1,264 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer, task
|
||||
|
||||
import synapse
|
||||
from synapse.app import _base
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.handlers.admin import ExfiltrationWriter
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
|
||||
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
|
||||
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.admin_cmd")
|
||||
|
||||
|
||||
class AdminCmdSlavedStore(
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedFilteringStore,
|
||||
SlavedPresenceStore,
|
||||
SlavedGroupServerStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class AdminCmdServer(HomeServer):
|
||||
DATASTORE_CLASS = AdminCmdSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
pass
|
||||
|
||||
def start_listening(self, listeners):
|
||||
pass
|
||||
|
||||
def build_tcp_replication(self):
|
||||
return AdminCmdReplicationHandler(self)
|
||||
|
||||
|
||||
class AdminCmdReplicationHandler(ReplicationClientHandler):
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
pass
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
return {}
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def export_data_command(hs, args):
|
||||
"""Export data for a user.
|
||||
|
||||
Args:
|
||||
hs (HomeServer)
|
||||
args (argparse.Namespace)
|
||||
"""
|
||||
|
||||
user_id = args.user_id
|
||||
directory = args.output_directory
|
||||
|
||||
res = yield hs.get_handlers().admin_handler.export_user_data(
|
||||
user_id, FileExfiltrationWriter(user_id, directory=directory)
|
||||
)
|
||||
print(res)
|
||||
|
||||
|
||||
class FileExfiltrationWriter(ExfiltrationWriter):
|
||||
"""An ExfiltrationWriter that writes the users data to a directory.
|
||||
Returns the directory location on completion.
|
||||
|
||||
Note: This writes to disk on the main reactor thread.
|
||||
|
||||
Args:
|
||||
user_id (str): The user whose data is being exfiltrated.
|
||||
directory (str|None): The directory to write the data to, if None then
|
||||
will write to a temporary directory.
|
||||
"""
|
||||
|
||||
def __init__(self, user_id, directory=None):
|
||||
self.user_id = user_id
|
||||
|
||||
if directory:
|
||||
self.base_directory = directory
|
||||
else:
|
||||
self.base_directory = tempfile.mkdtemp(
|
||||
prefix="synapse-exfiltrate__%s__" % (user_id,)
|
||||
)
|
||||
|
||||
os.makedirs(self.base_directory, exist_ok=True)
|
||||
if list(os.listdir(self.base_directory)):
|
||||
raise Exception("Directory must be empty")
|
||||
|
||||
def write_events(self, room_id, events):
|
||||
room_directory = os.path.join(self.base_directory, "rooms", room_id)
|
||||
os.makedirs(room_directory, exist_ok=True)
|
||||
events_file = os.path.join(room_directory, "events")
|
||||
|
||||
with open(events_file, "a") as f:
|
||||
for event in events:
|
||||
print(json.dumps(event.get_pdu_json()), file=f)
|
||||
|
||||
def write_state(self, room_id, event_id, state):
|
||||
room_directory = os.path.join(self.base_directory, "rooms", room_id)
|
||||
state_directory = os.path.join(room_directory, "state")
|
||||
os.makedirs(state_directory, exist_ok=True)
|
||||
|
||||
event_file = os.path.join(state_directory, event_id)
|
||||
|
||||
with open(event_file, "a") as f:
|
||||
for event in state.values():
|
||||
print(json.dumps(event.get_pdu_json()), file=f)
|
||||
|
||||
def write_invite(self, room_id, event, state):
|
||||
self.write_events(room_id, [event])
|
||||
|
||||
# We write the invite state somewhere else as they aren't full events
|
||||
# and are only a subset of the state at the event.
|
||||
room_directory = os.path.join(self.base_directory, "rooms", room_id)
|
||||
os.makedirs(room_directory, exist_ok=True)
|
||||
|
||||
invite_state = os.path.join(room_directory, "invite_state")
|
||||
|
||||
with open(invite_state, "a") as f:
|
||||
for event in state.values():
|
||||
print(json.dumps(event), file=f)
|
||||
|
||||
def finished(self):
|
||||
return self.base_directory
|
||||
|
||||
|
||||
def start(config_options):
|
||||
parser = argparse.ArgumentParser(description="Synapse Admin Command")
|
||||
HomeServerConfig.add_arguments_to_parser(parser)
|
||||
|
||||
subparser = parser.add_subparsers(
|
||||
title="Admin Commands",
|
||||
required=True,
|
||||
dest="command",
|
||||
metavar="<admin_command>",
|
||||
help="The admin command to perform.",
|
||||
)
|
||||
export_data_parser = subparser.add_parser(
|
||||
"export-data", help="Export all data for a user"
|
||||
)
|
||||
export_data_parser.add_argument("user_id", help="User to extra data from")
|
||||
export_data_parser.add_argument(
|
||||
"--output-directory",
|
||||
action="store",
|
||||
metavar="DIRECTORY",
|
||||
required=False,
|
||||
help="The directory to store the exported data in. Must be empty. Defaults"
|
||||
" to creating a temp directory.",
|
||||
)
|
||||
export_data_parser.set_defaults(func=export_data_command)
|
||||
|
||||
try:
|
||||
config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + str(e) + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
if config.worker_app is not None:
|
||||
assert config.worker_app == "synapse.app.admin_cmd"
|
||||
|
||||
# Update the config with some basic overrides so that don't have to specify
|
||||
# a full worker config.
|
||||
config.worker_app = "synapse.app.admin_cmd"
|
||||
|
||||
if (
|
||||
not config.worker_daemonize
|
||||
and not config.worker_log_file
|
||||
and not config.worker_log_config
|
||||
):
|
||||
# Since we're meant to be run as a "command" let's not redirect stdio
|
||||
# unless we've actually set log config.
|
||||
config.no_redirect_stdio = True
|
||||
|
||||
# Explicitly disable background processes
|
||||
config.update_user_directory = False
|
||||
config.start_pushers = False
|
||||
config.send_federation = False
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
ss = AdminCmdServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
ss.setup()
|
||||
|
||||
# We use task.react as the basic run command as it correctly handles tearing
|
||||
# down the reactor when the deferreds resolve and setting the return value.
|
||||
# We also make sure that `_base.start` gets run before we actually run the
|
||||
# command.
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run(_reactor):
|
||||
with LoggingContext("command"):
|
||||
yield _base.start(ss, [])
|
||||
yield args.func(ss, args)
|
||||
|
||||
_base.start_worker_reactor(
|
||||
"synapse-admin-cmd", config, run_command=lambda: task.react(run)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
@@ -137,12 +137,42 @@ class Config(object):
|
||||
return file_stream.read()
|
||||
|
||||
def invoke_all(self, name, *args, **kargs):
|
||||
"""Invoke all instance methods with the given name and arguments in the
|
||||
class's MRO.
|
||||
|
||||
Args:
|
||||
name (str): Name of function to invoke
|
||||
*args
|
||||
**kwargs
|
||||
|
||||
Returns:
|
||||
list: The list of the return values from each method called
|
||||
"""
|
||||
results = []
|
||||
for cls in type(self).mro():
|
||||
if name in cls.__dict__:
|
||||
results.append(getattr(cls, name)(self, *args, **kargs))
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
def invoke_all_static(cls, name, *args, **kargs):
|
||||
"""Invoke all static methods with the given name and arguments in the
|
||||
class's MRO.
|
||||
|
||||
Args:
|
||||
name (str): Name of function to invoke
|
||||
*args
|
||||
**kwargs
|
||||
|
||||
Returns:
|
||||
list: The list of the return values from each method called
|
||||
"""
|
||||
results = []
|
||||
for c in cls.mro():
|
||||
if name in c.__dict__:
|
||||
results.append(getattr(c, name)(*args, **kargs))
|
||||
return results
|
||||
|
||||
def generate_config(
|
||||
self,
|
||||
config_dir_path,
|
||||
@@ -202,6 +232,23 @@ class Config(object):
|
||||
Returns: Config object.
|
||||
"""
|
||||
config_parser = argparse.ArgumentParser(description=description)
|
||||
cls.add_arguments_to_parser(config_parser)
|
||||
obj, _ = cls.load_config_with_parser(config_parser, argv)
|
||||
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def add_arguments_to_parser(cls, config_parser):
|
||||
"""Adds all the config flags to an ArgumentParser.
|
||||
|
||||
Doesn't support config-file-generation: used by the worker apps.
|
||||
|
||||
Used for workers where we want to add extra flags/subcommands.
|
||||
|
||||
Args:
|
||||
config_parser (ArgumentParser): App description
|
||||
"""
|
||||
|
||||
config_parser.add_argument(
|
||||
"-c",
|
||||
"--config-path",
|
||||
@@ -219,16 +266,34 @@ class Config(object):
|
||||
" Defaults to the directory containing the last config file",
|
||||
)
|
||||
|
||||
cls.invoke_all_static("add_arguments", config_parser)
|
||||
|
||||
@classmethod
|
||||
def load_config_with_parser(cls, parser, argv):
|
||||
"""Parse the commandline and config files with the given parser
|
||||
|
||||
Doesn't support config-file-generation: used by the worker apps.
|
||||
|
||||
Used for workers where we want to add extra flags/subcommands.
|
||||
|
||||
Args:
|
||||
parser (ArgumentParser)
|
||||
argv (list[str])
|
||||
|
||||
Returns:
|
||||
tuple[HomeServerConfig, argparse.Namespace]: Returns the parsed
|
||||
config object and the parsed argparse.Namespace object from
|
||||
`parser.parse_args(..)`
|
||||
"""
|
||||
|
||||
obj = cls()
|
||||
|
||||
obj.invoke_all("add_arguments", config_parser)
|
||||
|
||||
config_args = config_parser.parse_args(argv)
|
||||
config_args = parser.parse_args(argv)
|
||||
|
||||
config_files = find_config_files(search_paths=config_args.config_path)
|
||||
|
||||
if not config_files:
|
||||
config_parser.error("Must supply a config file.")
|
||||
parser.error("Must supply a config file.")
|
||||
|
||||
if config_args.keys_directory:
|
||||
config_dir_path = config_args.keys_directory
|
||||
@@ -244,7 +309,7 @@ class Config(object):
|
||||
|
||||
obj.invoke_all("read_arguments", config_args)
|
||||
|
||||
return obj
|
||||
return obj, config_args
|
||||
|
||||
@classmethod
|
||||
def load_or_generate_config(cls, description, argv):
|
||||
@@ -401,7 +466,7 @@ class Config(object):
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
|
||||
obj.invoke_all("add_arguments", parser)
|
||||
obj.invoke_all_static("add_arguments", parser)
|
||||
args = parser.parse_args(remaining_args)
|
||||
|
||||
config_dict = read_config_files(config_files)
|
||||
|
||||
@@ -69,7 +69,8 @@ class DatabaseConfig(Config):
|
||||
if database_path is not None:
|
||||
self.database_config["args"]["database"] = database_path
|
||||
|
||||
def add_arguments(self, parser):
|
||||
@staticmethod
|
||||
def add_arguments(parser):
|
||||
db_group = parser.add_argument_group("database")
|
||||
db_group.add_argument(
|
||||
"-d",
|
||||
|
||||
@@ -103,7 +103,8 @@ class LoggingConfig(Config):
|
||||
if args.log_file is not None:
|
||||
self.log_file = args.log_file
|
||||
|
||||
def add_arguments(cls, parser):
|
||||
@staticmethod
|
||||
def add_arguments(parser):
|
||||
logging_group = parser.add_argument_group("logging")
|
||||
logging_group.add_argument(
|
||||
"-v",
|
||||
|
||||
@@ -237,7 +237,8 @@ class RegistrationConfig(Config):
|
||||
% locals()
|
||||
)
|
||||
|
||||
def add_arguments(self, parser):
|
||||
@staticmethod
|
||||
def add_arguments(parser):
|
||||
reg_group = parser.add_argument_group("registration")
|
||||
reg_group.add_argument(
|
||||
"--enable-registration",
|
||||
|
||||
@@ -639,7 +639,8 @@ class ServerConfig(Config):
|
||||
if args.print_pidfile is not None:
|
||||
self.print_pidfile = args.print_pidfile
|
||||
|
||||
def add_arguments(self, parser):
|
||||
@staticmethod
|
||||
def add_arguments(parser):
|
||||
server_group = parser.add_argument_group("server")
|
||||
server_group.add_argument(
|
||||
"-D",
|
||||
|
||||
@@ -764,6 +764,10 @@ class PublicRoomList(BaseFederationServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID(None, None)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
data = yield self.handler.get_local_public_room_list(
|
||||
limit, since_token, network_tuple=network_tuple, from_federation=True
|
||||
)
|
||||
|
||||
@@ -17,16 +17,15 @@ import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from six import PY3, iteritems
|
||||
from six.moves import range
|
||||
|
||||
import msgpack
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import maybeDeferred
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
@@ -36,7 +35,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
|
||||
|
||||
|
||||
# This is used to indicate we should only return rooms published to the main list.
|
||||
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
|
||||
@@ -71,6 +69,8 @@ class RoomListHandler(BaseHandler):
|
||||
This can be (None, None) to indicate the main list, or a particular
|
||||
appservice and network id to use an appservice specific one.
|
||||
Setting to None returns all public rooms across all lists.
|
||||
from_federation (bool): true iff the request comes from the federation
|
||||
API
|
||||
"""
|
||||
if not self.enable_room_list_search:
|
||||
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
|
||||
@@ -132,200 +132,127 @@ class RoomListHandler(BaseHandler):
|
||||
from_federation (bool): Whether this request originated from a
|
||||
federating server or a client. Used for room filtering.
|
||||
timeout (int|None): Amount of seconds to wait for a response before
|
||||
timing out.
|
||||
timing out. TODO
|
||||
"""
|
||||
if since_token and since_token != "END":
|
||||
since_token = RoomListNextBatch.from_token(since_token)
|
||||
pagination_token = None
|
||||
if since_token and since_token != "END": # todo ought we support END and START?
|
||||
if since_token[0] in ("+", "-"):
|
||||
forwards = since_token[0] == "+"
|
||||
pagination_token = since_token[1:]
|
||||
else:
|
||||
raise SyntaxError("shrug ") # TODO
|
||||
else:
|
||||
since_token = None
|
||||
forwards = True
|
||||
|
||||
rooms_to_order_value = {}
|
||||
rooms_to_num_joined = {}
|
||||
# we request one more than wanted to see if there are more pages to come
|
||||
probing_limit = limit + 1 if limit is not None else None
|
||||
|
||||
newly_visible = []
|
||||
newly_unpublished = []
|
||||
if since_token:
|
||||
stream_token = since_token.stream_ordering
|
||||
current_public_id = yield self.store.get_current_public_room_stream_id()
|
||||
public_room_stream_id = since_token.public_room_stream_id
|
||||
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
|
||||
public_room_stream_id, current_public_id, network_tuple=network_tuple
|
||||
)
|
||||
else:
|
||||
stream_token = yield self.store.get_room_max_stream_ordering()
|
||||
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
|
||||
|
||||
room_ids = yield self.store.get_public_room_ids_at_stream_id(
|
||||
public_room_stream_id, network_tuple=network_tuple
|
||||
results = yield self.store.get_largest_public_rooms(
|
||||
network_tuple, search_filter, probing_limit, pagination_token, forwards
|
||||
)
|
||||
|
||||
# We want to return rooms in a particular order: the number of joined
|
||||
# users. We then arbitrarily use the room_id as a tie breaker.
|
||||
def build_room_entry(room):
|
||||
entry = {
|
||||
"room_id": room["room_id"],
|
||||
"name": room["name"],
|
||||
"topic": room["topic"],
|
||||
"canonical_alias": room["canonical_alias"],
|
||||
"num_joined_members": room["joined_members"],
|
||||
"avatar_url": room["avatar"],
|
||||
"world_readable": room["history_visibility"] == "world_readable",
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_order_for_room(room_id):
|
||||
# Most of the rooms won't have changed between the since token and
|
||||
# now (especially if the since token is "now"). So, we can ask what
|
||||
# the current users are in a room (that will hit a cache) and then
|
||||
# check if the room has changed since the since token. (We have to
|
||||
# do it in that order to avoid races).
|
||||
# If things have changed then fall back to getting the current state
|
||||
# at the since token.
|
||||
joined_users = yield self.store.get_users_in_room(room_id)
|
||||
if self.store.has_room_changed_since(room_id, stream_token):
|
||||
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||
room_id, stream_token
|
||||
)
|
||||
# Filter out Nones – rather omit the field altogether
|
||||
return {
|
||||
k: v for k, v in entry.items() if v is not None
|
||||
}
|
||||
|
||||
if not latest_event_ids:
|
||||
return
|
||||
|
||||
joined_users = yield self.state_handler.get_current_users_in_room(
|
||||
room_id, latest_event_ids
|
||||
)
|
||||
|
||||
num_joined_users = len(joined_users)
|
||||
rooms_to_num_joined[room_id] = num_joined_users
|
||||
|
||||
if num_joined_users == 0:
|
||||
return
|
||||
|
||||
# We want larger rooms to be first, hence negating num_joined_users
|
||||
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
|
||||
|
||||
logger.info(
|
||||
"Getting ordering for %i rooms since %s", len(room_ids), stream_token
|
||||
)
|
||||
yield concurrently_execute(get_order_for_room, room_ids, 10)
|
||||
|
||||
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
|
||||
sorted_rooms = [room_id for room_id, _ in sorted_entries]
|
||||
|
||||
# `sorted_rooms` should now be a list of all public room ids that is
|
||||
# stable across pagination. Therefore, we can use indices into this
|
||||
# list as our pagination tokens.
|
||||
|
||||
# Filter out rooms that we don't want to return
|
||||
rooms_to_scan = [
|
||||
r
|
||||
for r in sorted_rooms
|
||||
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
|
||||
results = [
|
||||
build_room_entry(r) for r in results
|
||||
]
|
||||
|
||||
total_room_count = len(rooms_to_scan)
|
||||
|
||||
if since_token:
|
||||
# Filter out rooms we've already returned previously
|
||||
# `since_token.current_limit` is the index of the last room we
|
||||
# sent down, so we exclude it and everything before/after it.
|
||||
if since_token.direction_is_forward:
|
||||
rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
|
||||
response = {}
|
||||
num_results = len(results)
|
||||
if num_results > 0:
|
||||
final_room_id = results[-1]["room_id"]
|
||||
initial_room_id = results[0]["room_id"]
|
||||
if limit is not None:
|
||||
more_to_come = num_results == probing_limit
|
||||
results = results[0:limit]
|
||||
else:
|
||||
rooms_to_scan = rooms_to_scan[: since_token.current_limit]
|
||||
rooms_to_scan.reverse()
|
||||
more_to_come = False
|
||||
|
||||
logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
|
||||
if not forwards or (forwards and more_to_come):
|
||||
response["next_batch"] = "+%s" % (final_room_id,)
|
||||
|
||||
# _append_room_entry_to_chunk will append to chunk but will stop if
|
||||
# len(chunk) > limit
|
||||
#
|
||||
# Normally we will generate enough results on the first iteration here,
|
||||
# but if there is a search filter, _append_room_entry_to_chunk may
|
||||
# filter some results out, in which case we loop again.
|
||||
#
|
||||
# We don't want to scan over the entire range either as that
|
||||
# would potentially waste a lot of work.
|
||||
#
|
||||
# XXX if there is no limit, we may end up DoSing the server with
|
||||
# calls to get_current_state_ids for every single room on the
|
||||
# server. Surely we should cap this somehow?
|
||||
#
|
||||
if limit:
|
||||
step = limit + 1
|
||||
else:
|
||||
# step cannot be zero
|
||||
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
|
||||
if since_token and (forwards or (not forwards and more_to_come)):
|
||||
if num_results > 0:
|
||||
response["prev_batch"] = "-%s" % (initial_room_id,)
|
||||
else:
|
||||
response["prev_batch"] = "-%s" % (pagination_token,)
|
||||
|
||||
chunk = []
|
||||
for i in range(0, len(rooms_to_scan), step):
|
||||
if timeout and self.clock.time() > timeout:
|
||||
raise Exception("Timed out searching room directory")
|
||||
if from_federation:
|
||||
# only show rooms with m.federate=True or absent (default is True)
|
||||
|
||||
batch = rooms_to_scan[i : i + step]
|
||||
logger.info("Processing %i rooms for result", len(batch))
|
||||
yield concurrently_execute(
|
||||
lambda r: self._append_room_entry_to_chunk(
|
||||
r,
|
||||
rooms_to_num_joined[r],
|
||||
chunk,
|
||||
limit,
|
||||
search_filter,
|
||||
from_federation=from_federation,
|
||||
),
|
||||
batch,
|
||||
5,
|
||||
# get rooms' state
|
||||
room_state_ids = yield defer.gatherResults(
|
||||
[
|
||||
maybeDeferred(self.store.get_current_state_ids, room["room_id"])
|
||||
for room in results
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
logger.info("Now %i rooms in result", len(chunk))
|
||||
if len(chunk) >= limit + 1:
|
||||
break
|
||||
|
||||
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
|
||||
# get rooms' creation state events' IDs
|
||||
room_creation_event_ids = {
|
||||
room["room_id"]: event_ids.get((EventTypes.Create, ""))
|
||||
for (room, event_ids) in zip(results, room_state_ids)
|
||||
}
|
||||
|
||||
# Work out the new limit of the batch for pagination, or None if we
|
||||
# know there are no more results that would be returned.
|
||||
# i.e., [since_token.current_limit..new_limit] is the batch of rooms
|
||||
# we've returned (or the reverse if we paginated backwards)
|
||||
# We tried to pull out limit + 1 rooms above, so if we have <= limit
|
||||
# then we know there are no more results to return
|
||||
new_limit = None
|
||||
if chunk and (not limit or len(chunk) > limit):
|
||||
# get rooms' creation state events
|
||||
creation_events_by_id = yield self.store.get_events(
|
||||
room_creation_event_ids.values()
|
||||
)
|
||||
|
||||
if not since_token or since_token.direction_is_forward:
|
||||
if limit:
|
||||
chunk = chunk[:limit]
|
||||
last_room_id = chunk[-1]["room_id"]
|
||||
else:
|
||||
if limit:
|
||||
chunk = chunk[-limit:]
|
||||
last_room_id = chunk[0]["room_id"]
|
||||
# associate them with the room IDs
|
||||
room_creation_events = {
|
||||
room_id: creation_events_by_id[event_id]
|
||||
for (room_id, event_id) in room_creation_event_ids.items()
|
||||
}
|
||||
|
||||
new_limit = sorted_rooms.index(last_room_id)
|
||||
# now filter out rooms with m.federate: False in their create event
|
||||
results = [
|
||||
room
|
||||
for room in results
|
||||
if room_creation_events[room["room_id"]].content.get("m.federate", True)
|
||||
]
|
||||
|
||||
results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
|
||||
for room in results:
|
||||
# populate search result entries with additional fields, namely
|
||||
# 'aliases' and 'guest_can_join'
|
||||
room_id = room["room_id"]
|
||||
|
||||
if since_token:
|
||||
results["new_rooms"] = bool(newly_visible)
|
||||
aliases = yield self.store.get_aliases_for_room(room_id)
|
||||
if aliases:
|
||||
room["aliases"] = aliases
|
||||
|
||||
if not since_token or since_token.direction_is_forward:
|
||||
if new_limit is not None:
|
||||
results["next_batch"] = RoomListNextBatch(
|
||||
stream_ordering=stream_token,
|
||||
public_room_stream_id=public_room_stream_id,
|
||||
current_limit=new_limit,
|
||||
direction_is_forward=True,
|
||||
).to_token()
|
||||
state_ids = yield self.store.get_current_state_ids(room_id)
|
||||
guests_can_join = False
|
||||
guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
|
||||
if guest_access_state_id is not None:
|
||||
guest_access = yield self.store.get_event(guest_access_state_id)
|
||||
if guest_access is not None:
|
||||
if guest_access.content.get("guest_access") == "can_join":
|
||||
guests_can_join = True
|
||||
room["guest_can_join"] = guests_can_join
|
||||
|
||||
if since_token:
|
||||
results["prev_batch"] = since_token.copy_and_replace(
|
||||
direction_is_forward=False,
|
||||
current_limit=since_token.current_limit + 1,
|
||||
).to_token()
|
||||
else:
|
||||
if new_limit is not None:
|
||||
results["prev_batch"] = RoomListNextBatch(
|
||||
stream_ordering=stream_token,
|
||||
public_room_stream_id=public_room_stream_id,
|
||||
current_limit=new_limit,
|
||||
direction_is_forward=False,
|
||||
).to_token()
|
||||
response["chunk"] = results
|
||||
|
||||
if since_token:
|
||||
results["next_batch"] = since_token.copy_and_replace(
|
||||
direction_is_forward=True,
|
||||
current_limit=since_token.current_limit - 1,
|
||||
).to_token()
|
||||
# TODO for federation, we currently don't remove m.federate=False rooms
|
||||
# from the total room count estimate.
|
||||
response["total_room_count_estimate"] = yield self.store.count_public_rooms()
|
||||
|
||||
defer.returnValue(results)
|
||||
defer.returnValue(response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _append_room_entry_to_chunk(
|
||||
@@ -560,7 +487,6 @@ class RoomListNextBatch(
|
||||
),
|
||||
)
|
||||
):
|
||||
|
||||
KEY_DICT = {
|
||||
"stream_ordering": "s",
|
||||
"public_room_stream_id": "p",
|
||||
|
||||
@@ -148,26 +148,44 @@ class StatsHandler(StateDeltasHandler):
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "state_events", +1
|
||||
)
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "current_state_events", +1
|
||||
)
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
prev_event_content = {}
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
@@ -246,6 +264,23 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
# Also add room stats with just the one state event
|
||||
# (the room creation state event)
|
||||
yield self.store.update_stats(
|
||||
"room",
|
||||
room_id,
|
||||
now,
|
||||
{
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
"current_state_events": 1,
|
||||
"joined_members": 0,
|
||||
"invited_members": 0,
|
||||
"left_members": 0,
|
||||
"banned_members": 0,
|
||||
"state_events": 1,
|
||||
},
|
||||
)
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
|
||||
@@ -332,6 +332,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
limit = parse_integer(request, "limit", 0)
|
||||
since_token = parse_string(request, "since", None)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
handler = self.hs.get_room_list_handler()
|
||||
if server:
|
||||
data = yield handler.get_remote_public_room_list(
|
||||
@@ -369,6 +373,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
|
||||
|
||||
if limit == 0:
|
||||
# zero is a special value which corresponds to no limit.
|
||||
limit = None
|
||||
|
||||
handler = self.hs.get_room_list_handler()
|
||||
if server:
|
||||
data = yield handler.get_remote_public_room_list(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -161,6 +162,167 @@ class RoomWorkerStore(SQLBaseStore):
|
||||
"get_public_room_changes", get_public_room_changes_txn
|
||||
)
|
||||
|
||||
def count_public_rooms(self):
|
||||
"""
|
||||
Counts the number of public rooms as tracked in the room_stats and room_state
|
||||
table.
|
||||
A public room is one who has is_public set
|
||||
AND is publicly-joinable and/or world-readable.
|
||||
Returns:
|
||||
number of public rooms on this homeserver's room directory
|
||||
|
||||
"""
|
||||
|
||||
def _count_public_rooms_txn(txn):
|
||||
sql = """
|
||||
SELECT COUNT(*)
|
||||
FROM room_stats
|
||||
JOIN room_state USING (room_id)
|
||||
JOIN rooms USING (room_id)
|
||||
WHERE
|
||||
is_public
|
||||
AND (
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
)
|
||||
"""
|
||||
txn.execute(sql)
|
||||
return txn.fetchone()[0]
|
||||
|
||||
return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_largest_public_rooms(
|
||||
self, network_tuple, search_filter, limit, pagination_token, forwards
|
||||
):
|
||||
"""TODO doc this
|
||||
|
||||
Args:
|
||||
network_tuple (ThirdPartyInstanceID|None):
|
||||
search_filter (dict|None):
|
||||
limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
|
||||
pagination_token (str|None): if present, a room ID which is to be
|
||||
the (first/last) included in the results.
|
||||
forwards (bool): true iff going forwards, going backwards otherwise
|
||||
|
||||
Returns:
|
||||
Rooms in order: biggest number of joined users first.
|
||||
We then arbitrarily use the room_id as a tie breaker.
|
||||
|
||||
"""
|
||||
|
||||
# TODO probably want to use ts_… on Postgres?
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
room_id, name, topic, canonical_alias, joined_members,
|
||||
avatar, history_visibility, joined_members
|
||||
FROM
|
||||
room_stats
|
||||
JOIN room_state USING (room_id)
|
||||
JOIN rooms USING (room_id)
|
||||
"""
|
||||
query_args = []
|
||||
|
||||
if network_tuple:
|
||||
sql += """
|
||||
LEFT JOIN appservice_room_list arl USING (room_id)
|
||||
"""
|
||||
|
||||
sql += """
|
||||
WHERE
|
||||
is_public
|
||||
AND (
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
)
|
||||
"""
|
||||
|
||||
if pagination_token:
|
||||
pt_joined = yield self._simple_select_one_onecol(
|
||||
table="room_stats",
|
||||
keyvalues={"room_id": pagination_token},
|
||||
retcol="joined_members",
|
||||
desc="get_largest_public_rooms",
|
||||
)
|
||||
|
||||
if forwards:
|
||||
sql += """
|
||||
AND (
|
||||
(joined_members < ?)
|
||||
OR (joined_members = ? AND room_id >= ?)
|
||||
)
|
||||
"""
|
||||
else:
|
||||
sql += """
|
||||
AND (
|
||||
(joined_members > ?)
|
||||
OR (joined_members = ? AND room_id <= ?)
|
||||
)
|
||||
"""
|
||||
query_args += [pt_joined, pt_joined, pagination_token]
|
||||
|
||||
if search_filter and search_filter.get("generic_search_term", None):
|
||||
search_term = "%" + search_filter["generic_search_term"] + "%"
|
||||
sql += """
|
||||
AND (
|
||||
name LIKE ?
|
||||
OR topic LIKE ?
|
||||
OR canonical_alias LIKE ?
|
||||
)
|
||||
"""
|
||||
query_args += [search_term, search_term, search_term]
|
||||
|
||||
if network_tuple:
|
||||
sql += "AND ("
|
||||
if network_tuple.appservice_id:
|
||||
sql += "appservice_id = ? AND "
|
||||
query_args.append(network_tuple.appservice_id)
|
||||
else:
|
||||
sql += "appservice_id IS NULL AND "
|
||||
|
||||
if network_tuple.network_id:
|
||||
sql += "network_id = ?)"
|
||||
query_args.append(network_tuple.network_id)
|
||||
else:
|
||||
sql += "network_id IS NULL)"
|
||||
|
||||
if forwards:
|
||||
sql += """
|
||||
ORDER BY
|
||||
joined_members DESC, room_id ASC
|
||||
"""
|
||||
else:
|
||||
sql += """
|
||||
ORDER BY
|
||||
joined_members ASC, room_id DESC
|
||||
"""
|
||||
|
||||
if limit is not None:
|
||||
# be cautious about SQL injection
|
||||
assert isinstance(limit, int)
|
||||
|
||||
sql += """
|
||||
LIMIT %d
|
||||
""" % (
|
||||
limit,
|
||||
)
|
||||
|
||||
def _get_largest_public_rooms_txn(txn):
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
results = self.cursor_to_dict(txn)
|
||||
|
||||
if not forwards:
|
||||
results.reverse()
|
||||
|
||||
return results
|
||||
|
||||
ret_val = yield self.runInteraction(
|
||||
"get_largest_public_rooms", _get_largest_public_rooms_txn
|
||||
)
|
||||
defer.returnValue(ret_val)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
return self._simple_select_one_onecol(
|
||||
|
||||
@@ -68,6 +68,9 @@ class StatsStore(StateDeltasStore):
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
defer.returnValue(1)
|
||||
|
||||
# TODO dev only
|
||||
yield self.delete_all_stats()
|
||||
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn):
|
||||
# Create the temporary tables
|
||||
|
||||
Reference in New Issue
Block a user