1
0

Compare commits

..

18 Commits
hhs-3 ... hhs-4

Author SHA1 Message Date
Neil Johnson
301cb60d0b assert rather than warn 2018-08-31 17:29:35 +01:00
Neil Johnson
0b01281e77 move threepid checker to config, add missing yields 2018-08-31 17:11:11 +01:00
Neil Johnson
e8e540630e fix reference to is_threepid_reserved 2018-08-31 16:09:15 +01:00
Neil Johnson
a796bdd35e typo 2018-08-31 15:57:33 +01:00
Neil Johnson
09f3cf1a7e ensure post registration auth checks do not fail erroneously 2018-08-31 15:42:51 +01:00
Neil Johnson
3d6aa06577 news fragemnt 2018-08-31 10:56:37 +01:00
Neil Johnson
ea068d6f3c fix bug where preserved threepid user comes to sign up and server is mau blocked 2018-08-31 10:49:14 +01:00
Amber Brown
14e4d4f4bf Port storage/ to Python 3 (#3725) 2018-08-31 00:19:58 +10:00
Richard van der Hoff
475253a88e Merge pull request #3764 from matrix-org/rav/close_db_conn_after_init
Make sure that we close db connections opened during init
2018-08-30 10:47:27 +01:00
Erik Johnston
7f0399586d Merge pull request #3768 from krombel/fix_3445
fix #3445 - do not use itervalues() on SortedDict()
2018-08-29 16:29:57 +01:00
Krombel
8c0c51ecb3 changelog 2018-08-29 16:49:26 +02:00
Krombel
79a8a347a6 fix #3445
itervalues(d) calls d.itervalues() [PY2] and d.values() [PY3]
but SortedDict only implements d.values()
2018-08-29 16:28:25 +02:00
Amber Brown
82276a18d1 Update CONTRIBUTING to clarify miscs & Markdown (#3730) 2018-08-29 19:06:59 +10:00
Matthew Hodgson
b1580f50fe don't return non-LL-member state in incremental sync state blocks (#3760)
don't return non-LL-member state in incremental sync state blocks
2018-08-28 23:25:58 +01:00
Richard van der Hoff
414fa36f3e Fix up tests 2018-08-28 17:21:05 +01:00
Richard van der Hoff
32eb1dedd2 use abc.abstractproperty
This gives clearer messages when someone gets it wrong
2018-08-28 17:10:43 +01:00
Richard van der Hoff
71990b3cae changelog 2018-08-28 13:42:20 +01:00
Richard van der Hoff
0b07f02e19 Make sure that we close db connections opened during init
We should explicitly close any db connections we open, because failing to do so
can block other transactions as per
https://github.com/matrix-org/synapse/issues/3682.

Let's also try to factor out some of the boilerplate by having server classes
define their datastore class rather than duplicating the whole of `setup`.
2018-08-28 13:39:49 +01:00
46 changed files with 344 additions and 101 deletions

View File

@@ -59,9 +59,10 @@ To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``PRnumber.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
the file is your changelog entry, which can contain Markdown
formatting. Adding credits to the changelog is encouraged, we value
your contributions and would like to have you shouted out in the
release notes!
For example, a fix in PR #1234 would have its changelog entry in
``changelog.d/1234.bugfix``, and contain content like "The security levels of

1
changelog.d/3725.misc Normal file
View File

@@ -0,0 +1 @@
The synapse.storage module has been ported to Python 3.

1
changelog.d/3730.misc Normal file
View File

@@ -0,0 +1 @@
The CONTRIBUTING guidelines have been updated to mention our use of Markdown and that .misc files have content.

1
changelog.d/3760.bugfix Normal file
View File

@@ -0,0 +1 @@
Don't return non-LL-member state in incremental sync state blocks

1
changelog.d/3764.misc Normal file
View File

@@ -0,0 +1 @@
Make sure that we close db connections opened during init

1
changelog.d/3768.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug in sending presence over federation

1
changelog.d/3777.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug where preserved threepid user comes to sign up and server is mau blocked

View File

@@ -31,5 +31,5 @@ $TOX_BIN/pip install 'setuptools>=18.5'
$TOX_BIN/pip install 'pip>=10'
{ python synapse/python_dependencies.py
echo lxml psycopg2
echo lxml
} | xargs $TOX_BIN/pip install

View File

@@ -26,6 +26,7 @@ import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
@@ -775,13 +776,19 @@ class Auth(object):
)
@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None):
def check_auth_blocking(self, user_id=None, threepid=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
threepid(dict|None): If present, checks for presence against configured
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
"""
# Never fail an auth check for the server notices users
@@ -797,6 +804,8 @@ class Auth(object):
limit_type=self.hs.config.hs_disabled_limit_type
)
if self.hs.config.limit_usage_by_mau is True:
assert not (user_id and threepid)
# If the user is already part of the MAU cohort or a trial user
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
@@ -806,12 +815,16 @@ class Auth(object):
is_trial = yield self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
# If the user does not exist yet, but is signing up with a
# reserved threepid then pass auth check
if is_threepid_reserved(self.hs.config, threepid):
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise ResourceLimitError(
403, "Monthly Active User Limit Exceeded",
admin_contact=self.hs.config.admin_contact,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type="monthly_active_user"

View File

@@ -51,10 +51,7 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = AppserviceSlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -74,10 +74,7 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = ClientReaderSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -90,10 +90,7 @@ class EventCreatorSlavedStore(
class EventCreatorServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = EventCreatorSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -72,10 +72,7 @@ class FederationReaderSlavedStore(
class FederationReaderServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = FederationReaderSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -78,10 +78,7 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = FederationSenderSlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -148,10 +148,7 @@ class FrontendProxySlavedStore(
class FrontendProxyServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = FrontendProxySlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage import are_all_users_on_domain
from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -111,6 +111,8 @@ def build_resource_for_web_client(hs):
class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
def _listener_http(self, config, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
@@ -356,13 +358,13 @@ def setup(config_options):
logger.info("Preparing database: %s...", config.database_config['name'])
try:
db_conn = hs.get_db_conn(run_new_connection=False)
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
with hs.get_db_conn(run_new_connection=False) as db_conn:
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
hs.run_startup_checks(db_conn, database_engine)
hs.run_startup_checks(db_conn, database_engine)
db_conn.commit()
db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"

View File

@@ -60,10 +60,7 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = MediaRepositorySlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -78,10 +78,7 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = PusherSlaveStore
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

View File

@@ -249,10 +249,7 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = SynchrotronSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -94,10 +94,7 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
DATASTORE_CLASS = UserDirectorySlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -404,6 +404,23 @@ class ServerConfig(Config):
" service on the given port.")
def is_threepid_reserved(config, threepid):
"""Check the threepid against the reserved threepid config
Args:
config(ServerConfig) - to access server config attributes
threepid(dict) - The threepid to test for
Returns:
boolean Is the threepid undertest reserved_user
"""
for tp in config.mau_limits_reserved_threepids:
if (threepid['medium'] == tp['medium']
and threepid['address'] == tp['address']):
return True
return False
def read_gc_thresholds(thresholds):
"""Reads the three integer thresholds for garbage collection. Ensures that
the thresholds are integers if thresholds are supplied.

View File

@@ -32,7 +32,7 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
from six import iteritems, itervalues
from six import iteritems
from sortedcontainers import SortedDict
@@ -117,7 +117,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
for uids in itervalues(self.presence_changed)
for uids in self.presence_changed.values()
for user_id in uids
)

View File

@@ -125,6 +125,7 @@ class RegistrationHandler(BaseHandler):
guest_access_token=None,
make_guest=False,
admin=False,
threepid=None,
):
"""Registers a new client on the server.
@@ -145,7 +146,7 @@ class RegistrationHandler(BaseHandler):
RegistrationError if there was a problem registering.
"""
yield self.auth.check_auth_blocking()
yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)

View File

@@ -745,9 +745,16 @@ class SyncHandler(object):
state_ids = {}
if lazy_load_members:
if types:
# We're returning an incremental sync, with no "gap" since
# the previous sync, so normally there would be no state to return
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here, and then dedupe any redundant ones below.
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
filtered_types=None, # we only want members!
)
if lazy_load_members and not include_redundant_members:

View File

@@ -78,6 +78,9 @@ CONDITIONAL_REQUIREMENTS = {
"affinity": {
"affinity": ["affinity"],
},
"postgres": {
"psycopg2>=2.6": ["psycopg2"]
}
}

View File

@@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester
@@ -281,12 +282,20 @@ class RegisterRestServlet(ClientV1RestServlet):
register_json["user"].encode("utf-8")
if "user" in register_json else None
)
threepid = None
if session.get(LoginType.EMAIL_IDENTITY):
threepid = session["threepidCreds"]
handler = self.handlers.registration_handler
(user_id, token) = yield handler.register(
localpart=desired_user_id,
password=password
password=password,
threepid=threepid,
)
# Necessary due to auth checks prior to the threepid being
# written to the db
if is_threepid_reserved(self.hs.config, threepid):
yield self.store.upsert_monthly_active_user(user_id)
if session[LoginType.EMAIL_IDENTITY]:
logger.debug("Binding emails %s to %s" % (

View File

@@ -26,6 +26,7 @@ import synapse
import synapse.types
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError, UnrecognizedRequestError
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
@@ -395,12 +396,21 @@ class RegisterRestServlet(RestServlet):
if desired_username is not None:
desired_username = desired_username.lower()
threepid = None
if auth_result:
threepid = auth_result.get(LoginType.EMAIL_IDENTITY)
(registered_user_id, _) = yield self.registration_handler.register(
localpart=desired_username,
password=new_password,
guest_access_token=guest_access_token,
generate_token=False,
threepid=threepid,
)
# Necessary due to auth checks prior to the threepid being
# written to the db
if is_threepid_reserved(self.hs.config, threepid):
yield self.store.upsert_monthly_active_user(registered_user_id)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)

View File

@@ -19,6 +19,7 @@
# partial one for unit test mocking.
# Imports required for the default HomeServer() implementation
import abc
import logging
from twisted.enterprise import adbapi
@@ -81,7 +82,6 @@ from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStore
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
@@ -111,6 +111,8 @@ class HomeServer(object):
config (synapse.config.homeserver.HomeserverConfig):
"""
__metaclass__ = abc.ABCMeta
DEPENDENCIES = [
'http_client',
'db_pool',
@@ -172,6 +174,11 @@ class HomeServer(object):
'room_context_handler',
]
# This is overridden in derived application classes
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
# instantiated during setup() for future return by get_datastore()
DATASTORE_CLASS = abc.abstractproperty()
def __init__(self, hostname, reactor=None, **kwargs):
"""
Args:
@@ -188,13 +195,16 @@ class HomeServer(object):
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
self.datastore = None
# Other kwargs are explicit dependencies
for depname in kwargs:
setattr(self, depname, kwargs[depname])
def setup(self):
logger.info("Setting up.")
self.datastore = DataStore(self.get_db_conn(), self)
with self.get_db_conn() as conn:
self.datastore = self.DATASTORE_CLASS(conn, self)
logger.info("Finished setting up.")
def get_reactor(self):

View File

@@ -17,9 +17,10 @@ import sys
import threading
import time
from six import iteritems, iterkeys, itervalues
from six import PY2, iteritems, iterkeys, itervalues
from six.moves import intern, range
from canonicaljson import json
from prometheus_client import Histogram
from twisted.internet import defer
@@ -1216,3 +1217,32 @@ class _RollbackButIsFineException(Exception):
something went wrong.
"""
pass
def db_to_json(db_content):
"""
Take some data from a database row and return a JSON-decoded object.
Args:
db_content (memoryview|buffer|bytes|bytearray|unicode)
"""
# psycopg2 on Python 3 returns memoryview objects, which we need to
# cast to bytes to decode
if isinstance(db_content, memoryview):
db_content = db_content.tobytes()
# psycopg2 on Python 2 returns buffer objects, which we need to cast to
# bytes to decode
if PY2 and isinstance(db_content, buffer):
db_content = bytes(db_content)
# Decode it to a Unicode string before feeding it to json.loads, so we
# consistenty get a Unicode-containing object out.
if isinstance(db_content, (bytes, bytearray)):
db_content = db_content.decode('utf8')
try:
return json.loads(db_content)
except Exception:
logging.warning("Tried to decode '%r' as JSON and failed", db_content)
raise

View File

@@ -169,7 +169,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
messages_json_for_user = {}
devices = messages_by_device.keys()
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
sql = (

View File

@@ -24,7 +24,7 @@ from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from ._base import Cache, SQLBaseStore
from ._base import Cache, SQLBaseStore, db_to_json
logger = logging.getLogger(__name__)
@@ -411,7 +411,7 @@ class DeviceStore(SQLBaseStore):
if device is not None:
key_json = device.get("key_json", None)
if key_json:
result["keys"] = json.loads(key_json)
result["keys"] = db_to_json(key_json)
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
@@ -466,7 +466,7 @@ class DeviceStore(SQLBaseStore):
retcol="content",
desc="_get_cached_user_device",
)
defer.returnValue(json.loads(content))
defer.returnValue(db_to_json(content))
@cachedInlineCallbacks()
def _get_cached_devices_for_user(self, user_id):
@@ -479,7 +479,7 @@ class DeviceStore(SQLBaseStore):
desc="_get_cached_devices_for_user",
)
defer.returnValue({
device["device_id"]: json.loads(device["content"])
device["device_id"]: db_to_json(device["content"])
for device in devices
})
@@ -511,7 +511,7 @@ class DeviceStore(SQLBaseStore):
key_json = device.get("key_json", None)
if key_json:
result["keys"] = json.loads(key_json)
result["keys"] = db_to_json(key_json)
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name

View File

@@ -14,13 +14,13 @@
# limitations under the License.
from six import iteritems
from canonicaljson import encode_canonical_json, json
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyStore(SQLBaseStore):
@@ -90,7 +90,7 @@ class EndToEndKeyStore(SQLBaseStore):
for user_id, device_keys in iteritems(results):
for device_id, device_info in iteritems(device_keys):
device_info["keys"] = json.loads(device_info.pop("key_json"))
device_info["keys"] = db_to_json(device_info.pop("key_json"))
defer.returnValue(results)

View File

@@ -41,13 +41,18 @@ class PostgresEngine(object):
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
cursor.execute("SET bytea_output TO escape")
# Asynchronous commit, don't wait for the server to call fsync before
# ending the transaction.
# https://www.postgresql.org/docs/current/static/wal-async-commit.html
if not self.synchronous_commit:
cursor = db_conn.cursor()
cursor.execute("SET synchronous_commit TO OFF")
cursor.close()
cursor.close()
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):

View File

@@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems
from six import iteritems, text_type
from six.moves import range
from canonicaljson import json
@@ -1220,7 +1220,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
"sender": event.sender,
"contains_url": (
"url" in event.content
and isinstance(event.content["url"], basestring)
and isinstance(event.content["url"], text_type)
),
}
for event, _ in events_and_contexts
@@ -1529,7 +1529,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
contains_url = "url" in content
if contains_url:
contains_url &= isinstance(content["url"], basestring)
contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
@@ -1910,9 +1910,9 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
(room_id,)
)
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)
max_depth = max(row[1] for row in rows)
if max_depth <= token.topological:
if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)

View File

@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
@@ -265,7 +266,7 @@ class EventsWorkerStore(SQLBaseStore):
"""
with Measure(self._clock, "_fetch_event_list"):
try:
event_id_lists = zip(*event_list)[0]
event_id_lists = list(zip(*event_list))[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
@@ -299,14 +300,14 @@ class EventsWorkerStore(SQLBaseStore):
logger.exception("do_fetch")
# We only want to resolve deferreds from the main thread
def fire(evs):
def fire(evs, exc):
for _, d in evs:
if not d.called:
with PreserveLoggingContext():
d.errback(e)
d.errback(exc)
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list)
self.hs.get_reactor().callFromThread(fire, event_list, e)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):

View File

@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from canonicaljson import encode_canonical_json, json
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError
from synapse.util.caches.descriptors import cachedInlineCallbacks
from ._base import SQLBaseStore
from ._base import SQLBaseStore, db_to_json
class FilteringStore(SQLBaseStore):
@@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore):
desc="get_user_filter",
)
defer.returnValue(json.loads(bytes(def_json).decode("utf-8")))
defer.returnValue(db_to_json(def_json))
def add_user_filter(self, user_localpart, user_filter):
def_json = encode_canonical_json(user_filter)

View File

@@ -36,7 +36,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
# TODO Why can't I do this in init?
store = self.hs.get_datastore()
reserved_user_list = []

View File

@@ -15,7 +15,8 @@
# limitations under the License.
import logging
import types
import six
from canonicaljson import encode_canonical_json, json
@@ -27,6 +28,11 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
class PusherWorkerStore(SQLBaseStore):
def _decode_pushers_rows(self, rows):
@@ -34,18 +40,18 @@ class PusherWorkerStore(SQLBaseStore):
dataJson = r['data']
r['data'] = None
try:
if isinstance(dataJson, types.BufferType):
if isinstance(dataJson, db_binary_type):
dataJson = str(dataJson).decode("UTF8")
r['data'] = json.loads(dataJson)
except Exception as e:
logger.warn(
"Invalid JSON in data for pusher %d: %s, %s",
r['id'], dataJson, e.message,
r['id'], dataJson, e.args[0],
)
pass
if isinstance(r['pushkey'], types.BufferType):
if isinstance(r['pushkey'], db_binary_type):
r['pushkey'] = str(r['pushkey']).decode("UTF8")
return rows

View File

@@ -18,14 +18,14 @@ from collections import namedtuple
import six
from canonicaljson import encode_canonical_json, json
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
from ._base import SQLBaseStore, db_to_json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
@@ -95,7 +95,8 @@ class TransactionStore(SQLBaseStore):
)
if result and result["response_code"]:
return result["response_code"], json.loads(str(result["response_json"]))
return result["response_code"], db_to_json(result["response_json"])
else:
return None

View File

@@ -467,6 +467,23 @@ class AuthTestCase(unittest.TestCase):
)
yield self.auth.check_auth_blocking()
@defer.inlineCallbacks
def test_reserved_threepid(self):
self.hs.config.limit_usage_by_mau = True
self.hs.config.max_mau_value = 1
threepid = {'medium': 'email', 'address': 'reserved@server.com'}
unknown_threepid = {'medium': 'email', 'address': 'unreserved@server.com'}
self.hs.config.mau_limits_reserved_threepids = [threepid]
yield self.store.register(user_id='user1', token="123", password_hash=None)
with self.assertRaises(ResourceLimitError):
yield self.auth.check_auth_blocking()
with self.assertRaises(ResourceLimitError):
yield self.auth.check_auth_blocking(threepid=unknown_threepid)
yield self.auth.check_auth_blocking(threepid=threepid)
@defer.inlineCallbacks
def test_hs_disabled(self):
self.hs.config.hs_disabled = True

View File

@@ -240,7 +240,6 @@ class RestHelper(object):
self.assertEquals(200, code)
defer.returnValue(response)
@defer.inlineCallbacks
def send(self, room_id, body=None, txn_id=None, tok=None, expect_code=200):
if txn_id is None:
txn_id = "m%s" % (str(time.time()))
@@ -248,9 +247,16 @@ class RestHelper(object):
body = "body_text_here"
path = "/_matrix/client/r0/rooms/%s/send/m.room.message/%s" % (room_id, txn_id)
content = '{"msgtype":"m.text","body":"%s"}' % body
content = {"msgtype": "m.text", "body": body}
if tok:
path = path + "?access_token=%s" % tok
(code, response) = yield self.mock_resource.trigger("PUT", path, content)
self.assertEquals(expect_code, code, msg=str(response))
request, channel = make_request("PUT", path, json.dumps(content).encode('utf8'))
render(request, self.resource, self.hs.get_reactor())
assert int(channel.result["code"]) == expect_code, (
"Expected: %d, got: %d, resp: %r"
% (expect_code, int(channel.result["code"]), channel.result["body"])
)
return channel.json_body

View File

@@ -20,11 +20,11 @@ from mock import Mock
from twisted.internet import defer
from synapse.server import HomeServer
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import create_engine
from tests import unittest
from tests.utils import TestHomeServer
class SQLBaseStoreTestCase(unittest.TestCase):
@@ -51,7 +51,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
config = Mock()
config.event_cache_size = 1
config.database_config = {"name": "sqlite3"}
hs = HomeServer(
hs = TestHomeServer(
"test",
db_pool=self.db_pool,
config=config,

106
tests/storage/test_purge.py Normal file
View File

@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.rest.client.v1 import room
from tests.unittest import HomeserverTestCase
class PurgeTests(HomeserverTestCase):
user_id = "@red:server"
servlets = [room.register_servlets]
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver("server", http_client=None)
return hs
def prepare(self, reactor, clock, hs):
self.room_id = self.helper.create_room_as(self.user_id)
def test_purge(self):
"""
Purging a room will delete everything before the topological point.
"""
# Send four messages to the room
first = self.helper.send(self.room_id, body="test1")
second = self.helper.send(self.room_id, body="test2")
third = self.helper.send(self.room_id, body="test3")
last = self.helper.send(self.room_id, body="test4")
storage = self.hs.get_datastore()
# Get the topological token
event = storage.get_topological_token_for_event(last["event_id"])
self.pump()
event = self.successResultOf(event)
# Purge everything before this topological token
purge = storage.purge_history(self.room_id, event, True)
self.pump()
self.assertEqual(self.successResultOf(purge), None)
# Try and get the events
get_first = storage.get_event(first["event_id"])
get_second = storage.get_event(second["event_id"])
get_third = storage.get_event(third["event_id"])
get_last = storage.get_event(last["event_id"])
self.pump()
# 1-3 should fail and last will succeed, meaning that 1-3 are deleted
# and last is not.
self.failureResultOf(get_first)
self.failureResultOf(get_second)
self.failureResultOf(get_third)
self.successResultOf(get_last)
def test_purge_wont_delete_extrems(self):
"""
Purging a room will delete everything before the topological point.
"""
# Send four messages to the room
first = self.helper.send(self.room_id, body="test1")
second = self.helper.send(self.room_id, body="test2")
third = self.helper.send(self.room_id, body="test3")
last = self.helper.send(self.room_id, body="test4")
storage = self.hs.get_datastore()
# Set the topological token higher than it should be
event = storage.get_topological_token_for_event(last["event_id"])
self.pump()
event = self.successResultOf(event)
event = "t{}-{}".format(
*list(map(lambda x: x + 1, map(int, event[1:].split("-"))))
)
# Purge everything before this topological token
purge = storage.purge_history(self.room_id, event, True)
self.pump()
f = self.failureResultOf(purge)
self.assertIn("greater than forward", f.value.args[0])
# Try and get the events
get_first = storage.get_event(first["event_id"])
get_second = storage.get_event(second["event_id"])
get_third = storage.get_event(third["event_id"])
get_last = storage.get_event(last["event_id"])
self.pump()
# Nothing is deleted.
self.successResultOf(get_first)
self.successResultOf(get_second)
self.successResultOf(get_third)
self.successResultOf(get_last)

View File

@@ -14,12 +14,12 @@
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.server import HomeServer
from synapse.types import GroupID, RoomAlias, UserID
from tests import unittest
from tests.utils import TestHomeServer
mock_homeserver = HomeServer(hostname="my.domain")
mock_homeserver = TestHomeServer(hostname="my.domain")
class UserIDTestCase(unittest.TestCase):

View File

@@ -151,6 +151,7 @@ class HomeserverTestCase(TestCase):
hijack_auth (bool): Whether to hijack auth to return the user specified
in user_id.
"""
servlets = []
hijack_auth = True
@@ -279,3 +280,13 @@ class HomeserverTestCase(TestCase):
kwargs = dict(kwargs)
kwargs.update(self._hs_args)
return setup_test_homeserver(self.addCleanup, *args, **kwargs)
def pump(self):
"""
Pump the reactor enough that Deferreds will fire.
"""
self.reactor.pump([0.0] * 100)
def get_success(self, d):
self.pump()
return self.successResultOf(d)

View File

@@ -26,10 +26,11 @@ from twisted.internet import defer, reactor
from synapse.api.constants import EventTypes
from synapse.api.errors import CodeMessageException, cs_error
from synapse.config.server import ServerConfig
from synapse.federation.transport import server
from synapse.http.server import HttpServer
from synapse.server import HomeServer
from synapse.storage import PostgresEngine
from synapse.storage import DataStore, PostgresEngine
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import (
_get_or_create_schema_state,
@@ -92,10 +93,14 @@ def setupdb():
atexit.register(_cleanup)
class TestHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
@defer.inlineCallbacks
def setup_test_homeserver(
cleanup_func, name="test", datastore=None, config=None, reactor=None,
homeserverToUse=HomeServer, **kargs
homeserverToUse=TestHomeServer, **kargs
):
"""
Setup a homeserver suitable for running tests against. Keyword arguments
@@ -143,6 +148,8 @@ def setup_test_homeserver(
config.max_mau_value = 50
config.mau_limits_reserved_threepids = []
config.admin_contact = None
config.rc_messages_per_second = 10000
config.rc_message_burst_count = 10000
# we need a sane default_room_version, otherwise attempts to create rooms will
# fail.
@@ -152,6 +159,11 @@ def setup_test_homeserver(
# background, which upsets the test runner.
config.update_user_directory = False
def is_threepid_reserved(threepid):
return ServerConfig.is_threepid_reserved(config, threepid)
config.is_threepid_reserved.side_effect = is_threepid_reserved
config.use_frozen_dicts = True
config.ldap_enabled = False