Compare commits
3 Commits
v0.27.3
...
erikj/as_u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86090eadb0 | ||
|
|
edbeed06ca | ||
|
|
277d2c506d |
63
CHANGES.rst
63
CHANGES.rst
@@ -1,66 +1,3 @@
|
||||
Changes in synapse v0.27.3 (2018-04-11)
|
||||
======================================
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* URL quote path segments over federation (#3082)
|
||||
|
||||
Changes in synapse v0.27.3-rc2 (2018-04-09)
|
||||
==========================================
|
||||
|
||||
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
|
||||
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
|
||||
|
||||
Changes in synapse v0.27.3-rc1 (2018-04-09)
|
||||
=======================================
|
||||
|
||||
Notable changes include API support for joinability of groups. Also new metrics
|
||||
and phone home stats. Phone home stats include better visibility of system usage
|
||||
so we can tweak synpase to work better for all users rather than our own experience
|
||||
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
|
||||
overal growth of the Matrix ecosystem. It is defined as:-
|
||||
|
||||
Counts the number of native 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days"
|
||||
|
||||
|
||||
Features:
|
||||
|
||||
* Add joinability for groups (PR #3045)
|
||||
* Implement group join API (PR #3046)
|
||||
* Add counter metrics for calculating state delta (PR #3033)
|
||||
* R30 stats (PR #3041)
|
||||
* Measure time it takes to calculate state group ID (PR #3043)
|
||||
* Add basic performance statistics to phone home (PR #3044)
|
||||
* Add response size metrics (PR #3071)
|
||||
* phone home cache size configurations (PR #3063)
|
||||
|
||||
Changes:
|
||||
|
||||
* Add a blurb explaining the main synapse worker (PR #2886) Thanks to @turt2live!
|
||||
* Replace old style error catching with 'as' keyword (PR #3000) Thanks to @NotAFile!
|
||||
* Use .iter* to avoid copies in StateHandler (PR #3006)
|
||||
* Linearize calls to _generate_user_id (PR #3029)
|
||||
* Remove last usage of ujson (PR #3030)
|
||||
* Use simplejson throughout (PR #3048)
|
||||
* Use static JSONEncoders (PR #3049)
|
||||
* Remove uses of events.content (PR #3060)
|
||||
* Improve database cache performance (PR #3068)
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
|
||||
* Fix replication after switch to simplejson (PR #3015)
|
||||
* Fix replication after switch to simplejson (PR #3015)
|
||||
* 404 correctly on missing paths via NoResource (PR #3022)
|
||||
* Fix error when claiming e2e keys from offline servers (PR #3034)
|
||||
* fix tests/storage/test_user_directory.py (PR #3042)
|
||||
* use PUT instead of POST for federating groups/m.join_policy (PR #3070) Thanks to @krombel!
|
||||
* postgres port script: fix state_groups_pkey error (PR #3072)
|
||||
|
||||
|
||||
Changes in synapse v0.27.2 (2018-03-26)
|
||||
=======================================
|
||||
|
||||
|
||||
@@ -157,8 +157,8 @@ if you prefer.
|
||||
|
||||
In case of problems, please see the _`Troubleshooting` section below.
|
||||
|
||||
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
|
||||
above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
|
||||
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
|
||||
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
|
||||
|
||||
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
|
||||
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
|
||||
|
||||
@@ -55,12 +55,7 @@ synapse process.)
|
||||
|
||||
You then create a set of configs for the various worker processes. These
|
||||
should be worker configuration files, and should be stored in a dedicated
|
||||
subdirectory, to allow synctl to manipulate them. An additional configuration
|
||||
for the master synapse process will need to be created because the process will
|
||||
not be started automatically. That configuration should look like this::
|
||||
|
||||
worker_app: synapse.app.homeserver
|
||||
daemonize: true
|
||||
subdirectory, to allow synctl to manipulate them.
|
||||
|
||||
Each worker configuration file inherits the configuration of the main homeserver
|
||||
configuration file. You can then override configuration specific to that worker,
|
||||
@@ -235,11 +230,9 @@ file. For example::
|
||||
``synapse.app.event_creator``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles some event creation. It can handle REST endpoints matching::
|
||||
Handles non-state event creation. It can handle REST endpoints matching::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/join/
|
||||
|
||||
It will create events locally and then send them on to the main synapse
|
||||
instance to be persisted and handled.
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -251,12 +250,6 @@ class Porter(object):
|
||||
@defer.inlineCallbacks
|
||||
def handle_table(self, table, postgres_size, table_size, forward_chunk,
|
||||
backward_chunk):
|
||||
logger.info(
|
||||
"Table %s: %i/%i (rows %i-%i) already ported",
|
||||
table, postgres_size, table_size,
|
||||
backward_chunk+1, forward_chunk-1,
|
||||
)
|
||||
|
||||
if not table_size:
|
||||
return
|
||||
|
||||
@@ -474,10 +467,31 @@ class Porter(object):
|
||||
self.progress.set_state("Preparing PostgreSQL")
|
||||
self.setup_db(postgres_config, postgres_engine)
|
||||
|
||||
self.progress.set_state("Creating port tables")
|
||||
# Step 2. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
|
||||
table="sqlite_master",
|
||||
keyvalues={
|
||||
"type": "table",
|
||||
},
|
||||
retcol="name",
|
||||
)
|
||||
|
||||
postgres_tables = yield self.postgres_store._simple_select_onecol(
|
||||
table="information_schema.tables",
|
||||
keyvalues={},
|
||||
retcol="distinct table_name",
|
||||
)
|
||||
|
||||
tables = set(sqlite_tables) & set(postgres_tables)
|
||||
|
||||
self.progress.set_state("Creating tables")
|
||||
|
||||
logger.info("Found %d tables", len(tables))
|
||||
|
||||
def create_port_table(txn):
|
||||
txn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
|
||||
"CREATE TABLE port_from_sqlite3 ("
|
||||
" table_name varchar(100) NOT NULL UNIQUE,"
|
||||
" forward_rowid bigint NOT NULL,"
|
||||
" backward_rowid bigint NOT NULL"
|
||||
@@ -503,33 +517,18 @@ class Porter(object):
|
||||
"alter_table", alter_table
|
||||
)
|
||||
except Exception as e:
|
||||
pass
|
||||
logger.info("Failed to create port table: %s", e)
|
||||
|
||||
yield self.postgres_store.runInteraction(
|
||||
"create_port_table", create_port_table
|
||||
)
|
||||
try:
|
||||
yield self.postgres_store.runInteraction(
|
||||
"create_port_table", create_port_table
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info("Failed to create port table: %s", e)
|
||||
|
||||
# Step 2. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
|
||||
table="sqlite_master",
|
||||
keyvalues={
|
||||
"type": "table",
|
||||
},
|
||||
retcol="name",
|
||||
)
|
||||
self.progress.set_state("Setting up")
|
||||
|
||||
postgres_tables = yield self.postgres_store._simple_select_onecol(
|
||||
table="information_schema.tables",
|
||||
keyvalues={},
|
||||
retcol="distinct table_name",
|
||||
)
|
||||
|
||||
tables = set(sqlite_tables) & set(postgres_tables)
|
||||
logger.info("Found %d tables", len(tables))
|
||||
|
||||
# Step 3. Figure out what still needs copying
|
||||
self.progress.set_state("Checking on port progress")
|
||||
# Set up tables.
|
||||
setup_res = yield defer.gatherResults(
|
||||
[
|
||||
self.setup_table(table)
|
||||
@@ -540,8 +539,7 @@ class Porter(object):
|
||||
consumeErrors=True,
|
||||
)
|
||||
|
||||
# Step 4. Do the copying.
|
||||
self.progress.set_state("Copying to postgres")
|
||||
# Process tables.
|
||||
yield defer.gatherResults(
|
||||
[
|
||||
self.handle_table(*res)
|
||||
@@ -550,9 +548,6 @@ class Porter(object):
|
||||
consumeErrors=True,
|
||||
)
|
||||
|
||||
# Step 5. Do final post-processing
|
||||
yield self._setup_state_group_id_seq()
|
||||
|
||||
self.progress.done()
|
||||
except:
|
||||
global end_error_exec_info
|
||||
@@ -712,16 +707,6 @@ class Porter(object):
|
||||
|
||||
defer.returnValue((done, remaining + done))
|
||||
|
||||
def _setup_state_group_id_seq(self):
|
||||
def r(txn):
|
||||
txn.execute("SELECT MAX(id) FROM state_groups")
|
||||
next_id = txn.fetchone()[0]+1
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
|
||||
(next_id,),
|
||||
)
|
||||
return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
|
||||
|
||||
|
||||
##############################################
|
||||
###### The following is simply UI stuff ######
|
||||
|
||||
@@ -16,4 +16,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.27.3"
|
||||
__version__ = "0.27.2"
|
||||
|
||||
@@ -42,7 +42,7 @@ logger = logging.getLogger("synapse.app.appservice")
|
||||
|
||||
|
||||
class AppserviceSlaveStore(
|
||||
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
|
||||
DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
):
|
||||
pass
|
||||
|
||||
@@ -50,11 +50,11 @@ logger = logging.getLogger("synapse.app.client_reader")
|
||||
|
||||
|
||||
class ClientReaderSlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
TransactionStore,
|
||||
SlavedClientIpStore,
|
||||
|
||||
@@ -430,10 +430,6 @@ def run(hs):
|
||||
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
|
||||
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
|
||||
|
||||
r30_results = yield hs.get_datastore().count_r30_users()
|
||||
for name, count in r30_results.iteritems():
|
||||
stats["r30_users_" + name] = count
|
||||
|
||||
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
|
||||
stats["daily_sent_messages"] = daily_sent_messages
|
||||
stats["cache_factor"] = CACHE_SIZE_FACTOR
|
||||
|
||||
@@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
|
||||
class SynchrotronSlavedStore(
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedFilteringStore,
|
||||
@@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
|
||||
SlavedGroupServerStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
|
||||
@@ -252,7 +252,6 @@ def main():
|
||||
for running_pid in running_pids:
|
||||
while pid_running(running_pid):
|
||||
time.sleep(0.2)
|
||||
write("All processes exited; now restarting...")
|
||||
|
||||
if action == "start" or action == "restart":
|
||||
if start_stop_synapse:
|
||||
|
||||
@@ -49,8 +49,8 @@ logger = logging.getLogger("synapse.app.user_dir")
|
||||
|
||||
|
||||
class UserDirectorySlaveStore(
|
||||
SlavedEventStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
UserDirectoryStore,
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.types import GroupID, get_domain_from_id
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -173,6 +172,7 @@ class ApplicationService(object):
|
||||
|
||||
if self.is_interested_in_user(event.sender):
|
||||
defer.returnValue(True)
|
||||
|
||||
# also check m.room.member state key
|
||||
if (event.type == EventTypes.Member and
|
||||
self.is_interested_in_user(event.state_key)):
|
||||
@@ -181,20 +181,18 @@ class ApplicationService(object):
|
||||
if not store:
|
||||
defer.returnValue(False)
|
||||
|
||||
does_match = yield self._matches_user_in_member_list(event.room_id, store)
|
||||
does_match = yield self._matches_user_in_member_list(
|
||||
event, store,
|
||||
)
|
||||
defer.returnValue(does_match)
|
||||
|
||||
@cachedInlineCallbacks(num_args=1, cache_context=True)
|
||||
def _matches_user_in_member_list(self, room_id, store, cache_context):
|
||||
member_list = yield store.get_users_in_room(
|
||||
room_id, on_invalidate=cache_context.invalidate
|
||||
@defer.inlineCallbacks
|
||||
def _matches_user_in_member_list(self, event, store):
|
||||
ases = yield store.get_appservices_with_user_in_room(
|
||||
event,
|
||||
)
|
||||
|
||||
# check joined member events
|
||||
for user_id in member_list:
|
||||
if self.is_interested_in_user(user_id):
|
||||
defer.returnValue(True)
|
||||
defer.returnValue(False)
|
||||
defer.returnValue(self.id in ases)
|
||||
|
||||
def _matches_room_id(self, event):
|
||||
if hasattr(event, "room_id"):
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -21,7 +20,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -51,7 +49,7 @@ class TransportLayerClient(object):
|
||||
logger.debug("get_room_state dest=%s, room=%s",
|
||||
destination, room_id)
|
||||
|
||||
path = _create_path(PREFIX, "/state/%s/", room_id)
|
||||
path = PREFIX + "/state/%s/" % room_id
|
||||
return self.client.get_json(
|
||||
destination, path=path, args={"event_id": event_id},
|
||||
)
|
||||
@@ -73,7 +71,7 @@ class TransportLayerClient(object):
|
||||
logger.debug("get_room_state_ids dest=%s, room=%s",
|
||||
destination, room_id)
|
||||
|
||||
path = _create_path(PREFIX, "/state_ids/%s/", room_id)
|
||||
path = PREFIX + "/state_ids/%s/" % room_id
|
||||
return self.client.get_json(
|
||||
destination, path=path, args={"event_id": event_id},
|
||||
)
|
||||
@@ -95,7 +93,7 @@ class TransportLayerClient(object):
|
||||
logger.debug("get_pdu dest=%s, event_id=%s",
|
||||
destination, event_id)
|
||||
|
||||
path = _create_path(PREFIX, "/event/%s/", event_id)
|
||||
path = PREFIX + "/event/%s/" % (event_id, )
|
||||
return self.client.get_json(destination, path=path, timeout=timeout)
|
||||
|
||||
@log_function
|
||||
@@ -121,7 +119,7 @@ class TransportLayerClient(object):
|
||||
# TODO: raise?
|
||||
return
|
||||
|
||||
path = _create_path(PREFIX, "/backfill/%s/", room_id)
|
||||
path = PREFIX + "/backfill/%s/" % (room_id,)
|
||||
|
||||
args = {
|
||||
"v": event_tuples,
|
||||
@@ -159,11 +157,9 @@ class TransportLayerClient(object):
|
||||
# generated by the json_data_callback.
|
||||
json_data = transaction.get_dict()
|
||||
|
||||
path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
transaction.destination,
|
||||
path=path,
|
||||
path=PREFIX + "/send/%s/" % transaction.transaction_id,
|
||||
data=json_data,
|
||||
json_data_callback=json_data_callback,
|
||||
long_retries=True,
|
||||
@@ -181,7 +177,7 @@ class TransportLayerClient(object):
|
||||
@log_function
|
||||
def make_query(self, destination, query_type, args, retry_on_dns_fail,
|
||||
ignore_backoff=False):
|
||||
path = _create_path(PREFIX, "/query/%s", query_type)
|
||||
path = PREFIX + "/query/%s" % query_type
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -226,7 +222,7 @@ class TransportLayerClient(object):
|
||||
"make_membership_event called with membership='%s', must be one of %s" %
|
||||
(membership, ",".join(valid_memberships))
|
||||
)
|
||||
path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
|
||||
path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
|
||||
|
||||
ignore_backoff = False
|
||||
retry_on_dns_fail = False
|
||||
@@ -252,7 +248,7 @@ class TransportLayerClient(object):
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_join(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
|
||||
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
@@ -265,7 +261,7 @@ class TransportLayerClient(object):
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_leave(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
|
||||
path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
@@ -284,7 +280,7 @@ class TransportLayerClient(object):
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_invite(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
|
||||
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
@@ -326,7 +322,7 @@ class TransportLayerClient(object):
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def exchange_third_party_invite(self, destination, room_id, event_dict):
|
||||
path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
|
||||
path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
@@ -339,7 +335,7 @@ class TransportLayerClient(object):
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_event_auth(self, destination, room_id, event_id):
|
||||
path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
|
||||
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -351,7 +347,7 @@ class TransportLayerClient(object):
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_query_auth(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
|
||||
path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -413,7 +409,7 @@ class TransportLayerClient(object):
|
||||
Returns:
|
||||
A dict containg the device keys.
|
||||
"""
|
||||
path = _create_path(PREFIX, "/user/devices/%s", user_id)
|
||||
path = PREFIX + "/user/devices/" + user_id
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -463,7 +459,7 @@ class TransportLayerClient(object):
|
||||
@log_function
|
||||
def get_missing_events(self, destination, room_id, earliest_events,
|
||||
latest_events, limit, min_depth, timeout):
|
||||
path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
|
||||
path = PREFIX + "/get_missing_events/%s" % (room_id,)
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -483,7 +479,7 @@ class TransportLayerClient(object):
|
||||
def get_group_profile(self, destination, group_id, requester_user_id):
|
||||
"""Get a group profile
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
|
||||
path = PREFIX + "/groups/%s/profile" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -502,7 +498,7 @@ class TransportLayerClient(object):
|
||||
requester_user_id (str)
|
||||
content (dict): The new profile of the group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
|
||||
path = PREFIX + "/groups/%s/profile" % (group_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -516,7 +512,7 @@ class TransportLayerClient(object):
|
||||
def get_group_summary(self, destination, group_id, requester_user_id):
|
||||
"""Get a group summary
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
|
||||
path = PREFIX + "/groups/%s/summary" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -529,7 +525,7 @@ class TransportLayerClient(object):
|
||||
def get_rooms_in_group(self, destination, group_id, requester_user_id):
|
||||
"""Get all rooms in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
|
||||
path = PREFIX + "/groups/%s/rooms" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -542,7 +538,7 @@ class TransportLayerClient(object):
|
||||
content):
|
||||
"""Add a room to a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
|
||||
path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -556,10 +552,7 @@ class TransportLayerClient(object):
|
||||
config_key, content):
|
||||
"""Update room in group
|
||||
"""
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/room/%s/config/%s",
|
||||
group_id, room_id, config_key,
|
||||
)
|
||||
path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -572,7 +565,7 @@ class TransportLayerClient(object):
|
||||
def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
|
||||
"""Remove a room from a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
|
||||
path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
@@ -585,7 +578,7 @@ class TransportLayerClient(object):
|
||||
def get_users_in_group(self, destination, group_id, requester_user_id):
|
||||
"""Get users in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users", group_id,)
|
||||
path = PREFIX + "/groups/%s/users" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -598,7 +591,7 @@ class TransportLayerClient(object):
|
||||
def get_invited_users_in_group(self, destination, group_id, requester_user_id):
|
||||
"""Get users that have been invited to a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
|
||||
path = PREFIX + "/groups/%s/invited_users" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -611,23 +604,7 @@ class TransportLayerClient(object):
|
||||
def accept_group_invite(self, destination, group_id, user_id, content):
|
||||
"""Accept a group invite
|
||||
"""
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/users/%s/accept_invite",
|
||||
group_id, user_id,
|
||||
)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
@log_function
|
||||
def join_group(self, destination, group_id, user_id, content):
|
||||
"""Attempts to join a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
|
||||
path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -640,7 +617,7 @@ class TransportLayerClient(object):
|
||||
def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
|
||||
"""Invite a user to a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
|
||||
path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -656,7 +633,7 @@ class TransportLayerClient(object):
|
||||
invited.
|
||||
"""
|
||||
|
||||
path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
|
||||
path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -670,7 +647,7 @@ class TransportLayerClient(object):
|
||||
user_id, content):
|
||||
"""Remove a user fron a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
|
||||
path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -687,7 +664,7 @@ class TransportLayerClient(object):
|
||||
kicked from the group.
|
||||
"""
|
||||
|
||||
path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
|
||||
path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -702,7 +679,7 @@ class TransportLayerClient(object):
|
||||
the attestations
|
||||
"""
|
||||
|
||||
path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
|
||||
path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -717,12 +694,11 @@ class TransportLayerClient(object):
|
||||
"""Update a room entry in a group summary
|
||||
"""
|
||||
if category_id:
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
|
||||
path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
|
||||
group_id, category_id, room_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
|
||||
path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -738,12 +714,11 @@ class TransportLayerClient(object):
|
||||
"""Delete a room entry in a group summary
|
||||
"""
|
||||
if category_id:
|
||||
path = _create_path(
|
||||
PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
|
||||
path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
|
||||
group_id, category_id, room_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
|
||||
path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
@@ -756,7 +731,7 @@ class TransportLayerClient(object):
|
||||
def get_group_categories(self, destination, group_id, requester_user_id):
|
||||
"""Get all categories in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
|
||||
path = PREFIX + "/groups/%s/categories" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -769,7 +744,7 @@ class TransportLayerClient(object):
|
||||
def get_group_category(self, destination, group_id, requester_user_id, category_id):
|
||||
"""Get category info in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
|
||||
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -783,7 +758,7 @@ class TransportLayerClient(object):
|
||||
content):
|
||||
"""Update a category in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
|
||||
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -798,7 +773,7 @@ class TransportLayerClient(object):
|
||||
category_id):
|
||||
"""Delete a category in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
|
||||
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
@@ -811,7 +786,7 @@ class TransportLayerClient(object):
|
||||
def get_group_roles(self, destination, group_id, requester_user_id):
|
||||
"""Get all roles in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
|
||||
path = PREFIX + "/groups/%s/roles" % (group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -824,7 +799,7 @@ class TransportLayerClient(object):
|
||||
def get_group_role(self, destination, group_id, requester_user_id, role_id):
|
||||
"""Get a roles info
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
|
||||
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
@@ -838,7 +813,7 @@ class TransportLayerClient(object):
|
||||
content):
|
||||
"""Update a role in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
|
||||
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -852,7 +827,7 @@ class TransportLayerClient(object):
|
||||
def delete_group_role(self, destination, group_id, requester_user_id, role_id):
|
||||
"""Delete a role in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
|
||||
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
@@ -867,12 +842,11 @@ class TransportLayerClient(object):
|
||||
"""Update a users entry in a group
|
||||
"""
|
||||
if role_id:
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
|
||||
path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
|
||||
group_id, role_id, user_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
|
||||
path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
@@ -882,33 +856,17 @@ class TransportLayerClient(object):
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
@log_function
|
||||
def set_group_join_policy(self, destination, group_id, requester_user_id,
|
||||
content):
|
||||
"""Sets the join policy for a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
|
||||
|
||||
return self.client.put_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
args={"requester_user_id": requester_user_id},
|
||||
data=content,
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
@log_function
|
||||
def delete_group_summary_user(self, destination, group_id, requester_user_id,
|
||||
user_id, role_id):
|
||||
"""Delete a users entry in a group
|
||||
"""
|
||||
if role_id:
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
|
||||
path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
|
||||
group_id, role_id, user_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
|
||||
path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
@@ -931,22 +889,3 @@ class TransportLayerClient(object):
|
||||
data=content,
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
|
||||
def _create_path(prefix, path, *args):
|
||||
"""Creates a path from the prefix, path template and args. Ensures that
|
||||
all args are url encoded.
|
||||
|
||||
Example:
|
||||
|
||||
_create_path(PREFIX, "/event/%s/", event_id)
|
||||
|
||||
Args:
|
||||
prefix (str)
|
||||
path (str): String template for the path
|
||||
args: ([str]): Args to insert into path. Each arg will be url encoded
|
||||
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -803,23 +802,6 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
|
||||
defer.returnValue((200, new_content))
|
||||
|
||||
|
||||
class FederationGroupsJoinServlet(BaseFederationServlet):
|
||||
"""Attempt to join a group
|
||||
"""
|
||||
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, origin, content, query, group_id, user_id):
|
||||
if get_domain_from_id(user_id) != origin:
|
||||
raise SynapseError(403, "user_id doesn't match origin")
|
||||
|
||||
new_content = yield self.handler.join_group(
|
||||
group_id, user_id, content,
|
||||
)
|
||||
|
||||
defer.returnValue((200, new_content))
|
||||
|
||||
|
||||
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
|
||||
"""Leave or kick a user from the group
|
||||
"""
|
||||
@@ -1142,24 +1124,6 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
|
||||
defer.returnValue((200, resp))
|
||||
|
||||
|
||||
class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
|
||||
"""Sets whether a group is joinable without an invite or knock
|
||||
"""
|
||||
PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, origin, content, query, group_id):
|
||||
requester_user_id = parse_string_from_args(query, "requester_user_id")
|
||||
if get_domain_from_id(requester_user_id) != origin:
|
||||
raise SynapseError(403, "requester_user_id doesn't match origin")
|
||||
|
||||
new_content = yield self.handler.set_group_join_policy(
|
||||
group_id, requester_user_id, content
|
||||
)
|
||||
|
||||
defer.returnValue((200, new_content))
|
||||
|
||||
|
||||
FEDERATION_SERVLET_CLASSES = (
|
||||
FederationSendServlet,
|
||||
FederationPullServlet,
|
||||
@@ -1199,7 +1163,6 @@ GROUP_SERVER_SERVLET_CLASSES = (
|
||||
FederationGroupsInvitedUsersServlet,
|
||||
FederationGroupsInviteServlet,
|
||||
FederationGroupsAcceptInviteServlet,
|
||||
FederationGroupsJoinServlet,
|
||||
FederationGroupsRemoveUserServlet,
|
||||
FederationGroupsSummaryRoomsServlet,
|
||||
FederationGroupsCategoriesServlet,
|
||||
@@ -1209,7 +1172,6 @@ GROUP_SERVER_SERVLET_CLASSES = (
|
||||
FederationGroupsSummaryUsersServlet,
|
||||
FederationGroupsAddRoomsServlet,
|
||||
FederationGroupsAddRoomsConfigServlet,
|
||||
FederationGroupsSettingJoinPolicyServlet,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -206,28 +205,6 @@ class GroupsServerHandler(object):
|
||||
|
||||
defer.returnValue({})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_group_join_policy(self, group_id, requester_user_id, content):
|
||||
"""Sets the group join policy.
|
||||
|
||||
Currently supported policies are:
|
||||
- "invite": an invite must be received and accepted in order to join.
|
||||
- "open": anyone can join.
|
||||
"""
|
||||
yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
|
||||
)
|
||||
|
||||
join_policy = _parse_join_policy_from_contents(content)
|
||||
if join_policy is None:
|
||||
raise SynapseError(
|
||||
400, "No value specified for 'm.join_policy'"
|
||||
)
|
||||
|
||||
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
|
||||
|
||||
defer.returnValue({})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_categories(self, group_id, requester_user_id):
|
||||
"""Get all categories in a group (as seen by user)
|
||||
@@ -404,16 +381,9 @@ class GroupsServerHandler(object):
|
||||
|
||||
yield self.check_group_is_ours(group_id, requester_user_id)
|
||||
|
||||
group = yield self.store.get_group(group_id)
|
||||
|
||||
if group:
|
||||
cols = [
|
||||
"name", "short_description", "long_description",
|
||||
"avatar_url", "is_public",
|
||||
]
|
||||
group_description = {key: group[key] for key in cols}
|
||||
group_description["is_openly_joinable"] = group["join_policy"] == "open"
|
||||
group_description = yield self.store.get_group(group_id)
|
||||
|
||||
if group_description:
|
||||
defer.returnValue(group_description)
|
||||
else:
|
||||
raise SynapseError(404, "Unknown group")
|
||||
@@ -684,40 +654,6 @@ class GroupsServerHandler(object):
|
||||
else:
|
||||
raise SynapseError(502, "Unknown state returned by HS")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _add_user(self, group_id, user_id, content):
|
||||
"""Add a user to a group based on a content dict.
|
||||
|
||||
See accept_invite, join_group.
|
||||
"""
|
||||
if not self.hs.is_mine_id(user_id):
|
||||
local_attestation = self.attestations.create_attestation(
|
||||
group_id, user_id,
|
||||
)
|
||||
|
||||
remote_attestation = content["attestation"]
|
||||
|
||||
yield self.attestations.verify_attestation(
|
||||
remote_attestation,
|
||||
user_id=user_id,
|
||||
group_id=group_id,
|
||||
)
|
||||
else:
|
||||
local_attestation = None
|
||||
remote_attestation = None
|
||||
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
yield self.store.add_user_to_group(
|
||||
group_id, user_id,
|
||||
is_admin=False,
|
||||
is_public=is_public,
|
||||
local_attestation=local_attestation,
|
||||
remote_attestation=remote_attestation,
|
||||
)
|
||||
|
||||
defer.returnValue(local_attestation)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def accept_invite(self, group_id, requester_user_id, content):
|
||||
"""User tries to accept an invite to the group.
|
||||
@@ -734,27 +670,30 @@ class GroupsServerHandler(object):
|
||||
if not is_invited:
|
||||
raise SynapseError(403, "User not invited to group")
|
||||
|
||||
local_attestation = yield self._add_user(group_id, requester_user_id, content)
|
||||
if not self.hs.is_mine_id(requester_user_id):
|
||||
local_attestation = self.attestations.create_attestation(
|
||||
group_id, requester_user_id,
|
||||
)
|
||||
remote_attestation = content["attestation"]
|
||||
|
||||
defer.returnValue({
|
||||
"state": "join",
|
||||
"attestation": local_attestation,
|
||||
})
|
||||
yield self.attestations.verify_attestation(
|
||||
remote_attestation,
|
||||
user_id=requester_user_id,
|
||||
group_id=group_id,
|
||||
)
|
||||
else:
|
||||
local_attestation = None
|
||||
remote_attestation = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def join_group(self, group_id, requester_user_id, content):
|
||||
"""User tries to join the group.
|
||||
is_public = _parse_visibility_from_contents(content)
|
||||
|
||||
This will error if the group requires an invite/knock to join
|
||||
"""
|
||||
|
||||
group_info = yield self.check_group_is_ours(
|
||||
group_id, requester_user_id, and_exists=True
|
||||
yield self.store.add_user_to_group(
|
||||
group_id, requester_user_id,
|
||||
is_admin=False,
|
||||
is_public=is_public,
|
||||
local_attestation=local_attestation,
|
||||
remote_attestation=remote_attestation,
|
||||
)
|
||||
if group_info['join_policy'] != "open":
|
||||
raise SynapseError(403, "Group is not publicly joinable")
|
||||
|
||||
local_attestation = yield self._add_user(group_id, requester_user_id, content)
|
||||
|
||||
defer.returnValue({
|
||||
"state": "join",
|
||||
@@ -896,31 +835,6 @@ class GroupsServerHandler(object):
|
||||
})
|
||||
|
||||
|
||||
def _parse_join_policy_from_contents(content):
|
||||
"""Given a content for a request, return the specified join policy or None
|
||||
"""
|
||||
|
||||
join_policy_dict = content.get("m.join_policy")
|
||||
if join_policy_dict:
|
||||
return _parse_join_policy_dict(join_policy_dict)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _parse_join_policy_dict(join_policy_dict):
|
||||
"""Given a dict for the "m.join_policy" config return the join policy specified
|
||||
"""
|
||||
join_policy_type = join_policy_dict.get("type")
|
||||
if not join_policy_type:
|
||||
return "invite"
|
||||
|
||||
if join_policy_type not in ("invite", "open"):
|
||||
raise SynapseError(
|
||||
400, "Synapse only supports 'invite'/'open' join rule"
|
||||
)
|
||||
return join_policy_type
|
||||
|
||||
|
||||
def _parse_visibility_from_contents(content):
|
||||
"""Given a content for a request parse out whether the entity should be
|
||||
public or not
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -91,8 +90,6 @@ class GroupsLocalHandler(object):
|
||||
get_group_role = _create_rerouter("get_group_role")
|
||||
get_group_roles = _create_rerouter("get_group_roles")
|
||||
|
||||
set_group_join_policy = _create_rerouter("set_group_join_policy")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_group_summary(self, group_id, requester_user_id):
|
||||
"""Get the group summary for a group.
|
||||
@@ -229,45 +226,7 @@ class GroupsLocalHandler(object):
|
||||
def join_group(self, group_id, user_id, content):
|
||||
"""Request to join a group
|
||||
"""
|
||||
if self.is_mine_id(group_id):
|
||||
yield self.groups_server_handler.join_group(
|
||||
group_id, user_id, content
|
||||
)
|
||||
local_attestation = None
|
||||
remote_attestation = None
|
||||
else:
|
||||
local_attestation = self.attestations.create_attestation(group_id, user_id)
|
||||
content["attestation"] = local_attestation
|
||||
|
||||
res = yield self.transport_client.join_group(
|
||||
get_domain_from_id(group_id), group_id, user_id, content,
|
||||
)
|
||||
|
||||
remote_attestation = res["attestation"]
|
||||
|
||||
yield self.attestations.verify_attestation(
|
||||
remote_attestation,
|
||||
group_id=group_id,
|
||||
user_id=user_id,
|
||||
server_name=get_domain_from_id(group_id),
|
||||
)
|
||||
|
||||
# TODO: Check that the group is public and we're being added publically
|
||||
is_publicised = content.get("publicise", False)
|
||||
|
||||
token = yield self.store.register_user_group_membership(
|
||||
group_id, user_id,
|
||||
membership="join",
|
||||
is_admin=False,
|
||||
local_attestation=local_attestation,
|
||||
remote_attestation=remote_attestation,
|
||||
is_publicised=is_publicised,
|
||||
)
|
||||
self.notifier.on_new_event(
|
||||
"groups_key", token, users=[user_id],
|
||||
)
|
||||
|
||||
defer.returnValue({})
|
||||
raise NotImplementedError() # TODO
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def accept_invite(self, group_id, user_id, content):
|
||||
|
||||
@@ -286,8 +286,7 @@ class MatrixFederationHttpClient(object):
|
||||
headers_dict[b"Authorization"] = auth_headers
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def put_json(self, destination, path, args={}, data={},
|
||||
json_data_callback=None,
|
||||
def put_json(self, destination, path, data={}, json_data_callback=None,
|
||||
long_retries=False, timeout=None,
|
||||
ignore_backoff=False,
|
||||
backoff_on_404=False):
|
||||
@@ -297,7 +296,6 @@ class MatrixFederationHttpClient(object):
|
||||
destination (str): The remote server to send the HTTP request
|
||||
to.
|
||||
path (str): The HTTP path.
|
||||
args (dict): query params
|
||||
data (dict): A dict containing the data that will be used as
|
||||
the request body. This will be encoded as JSON.
|
||||
json_data_callback (callable): A callable returning the dict to
|
||||
@@ -344,7 +342,6 @@ class MatrixFederationHttpClient(object):
|
||||
path,
|
||||
body_callback=body_callback,
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query_bytes=encode_query_args(args),
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
@@ -376,7 +373,6 @@ class MatrixFederationHttpClient(object):
|
||||
giving up. None indicates no timeout.
|
||||
ignore_backoff (bool): true to ignore the historical backoff data and
|
||||
try the request anyway.
|
||||
args (dict): query params
|
||||
Returns:
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body.
|
||||
|
||||
@@ -113,11 +113,6 @@ response_db_sched_duration = metrics.register_counter(
|
||||
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
|
||||
)
|
||||
|
||||
# size in bytes of the response written
|
||||
response_size = metrics.register_counter(
|
||||
"response_size", labels=["method", "servlet", "tag"]
|
||||
)
|
||||
|
||||
_next_request_id = 0
|
||||
|
||||
|
||||
@@ -431,8 +426,6 @@ class RequestMetrics(object):
|
||||
context.db_sched_duration_ms / 1000., request.method, self.name, tag
|
||||
)
|
||||
|
||||
response_size.inc_by(request.sentLength, request.method, self.name, tag)
|
||||
|
||||
|
||||
class RootRedirect(resource.Resource):
|
||||
"""Redirects the root '/' path to another path."""
|
||||
|
||||
@@ -17,8 +17,10 @@
|
||||
from synapse.storage.appservice import (
|
||||
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
|
||||
)
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
|
||||
|
||||
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
|
||||
ApplicationServiceWorkerStore):
|
||||
ApplicationServiceWorkerStore,
|
||||
SlavedEventStore):
|
||||
pass
|
||||
|
||||
@@ -655,12 +655,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
|
||||
content=event_content,
|
||||
)
|
||||
|
||||
return_value = {}
|
||||
|
||||
if membership_action == "join":
|
||||
return_value["room_id"] = room_id
|
||||
|
||||
defer.returnValue((200, return_value))
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
def _has_3pid_invite_keys(self, content):
|
||||
for key in {"id_server", "medium", "address"}:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -402,32 +401,6 @@ class GroupInvitedUsersServlet(RestServlet):
|
||||
defer.returnValue((200, result))
|
||||
|
||||
|
||||
class GroupSettingJoinPolicyServlet(RestServlet):
|
||||
"""Set group join policy
|
||||
"""
|
||||
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/settings/m.join_policy$")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(GroupSettingJoinPolicyServlet, self).__init__()
|
||||
self.auth = hs.get_auth()
|
||||
self.groups_handler = hs.get_groups_local_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
result = yield self.groups_handler.set_group_join_policy(
|
||||
group_id,
|
||||
requester_user_id,
|
||||
content,
|
||||
)
|
||||
|
||||
defer.returnValue((200, result))
|
||||
|
||||
|
||||
class GroupCreateServlet(RestServlet):
|
||||
"""Create a group
|
||||
"""
|
||||
@@ -765,7 +738,6 @@ def register_servlets(hs, http_server):
|
||||
GroupInvitedUsersServlet(hs).register(http_server)
|
||||
GroupUsersServlet(hs).register(http_server)
|
||||
GroupRoomServlet(hs).register(http_server)
|
||||
GroupSettingJoinPolicyServlet(hs).register(http_server)
|
||||
GroupCreateServlet(hs).register(http_server)
|
||||
GroupAdminRoomsServlet(hs).register(http_server)
|
||||
GroupAdminRoomsConfigServlet(hs).register(http_server)
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage.devices import DeviceStore
|
||||
from .appservice import (
|
||||
ApplicationServiceStore, ApplicationServiceTransactionStore
|
||||
@@ -242,12 +244,13 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
|
||||
return [UserPresenceState(**row) for row in rows]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_users(self):
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 24 hours.
|
||||
"""
|
||||
def _count_users(txn):
|
||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
|
||||
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
@@ -261,91 +264,8 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
count, = txn.fetchone()
|
||||
return count
|
||||
|
||||
return self.runInteraction("count_users", _count_users)
|
||||
|
||||
def count_r30_users(self):
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days
|
||||
|
||||
Returns counts globaly for a given user as well as breaking
|
||||
by platform
|
||||
"""
|
||||
def _count_r30_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time())
|
||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||
|
||||
sql = """
|
||||
SELECT platform, COALESCE(count(*), 0) FROM (
|
||||
SELECT
|
||||
users.name, platform, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen,
|
||||
CASE
|
||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
AS platform
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND users.appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, platform, users.creation_ts
|
||||
) u GROUP BY platform
|
||||
"""
|
||||
|
||||
results = {}
|
||||
txn.execute(sql, (thirty_days_ago_in_secs,
|
||||
thirty_days_ago_in_secs))
|
||||
|
||||
for row in txn:
|
||||
if row[0] is 'unknown':
|
||||
pass
|
||||
results[row[0]] = row[1]
|
||||
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT users.name, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, users.creation_ts
|
||||
) u
|
||||
"""
|
||||
|
||||
txn.execute(sql, (thirty_days_ago_in_secs,
|
||||
thirty_days_ago_in_secs))
|
||||
|
||||
count, = txn.fetchone()
|
||||
results['all'] = count
|
||||
|
||||
return results
|
||||
|
||||
return self.runInteraction("count_r30_users", _count_r30_users)
|
||||
ret = yield self.runInteraction("count_users", _count_users)
|
||||
defer.returnValue(ret)
|
||||
|
||||
def get_users(self):
|
||||
"""Function to reterive a list of users in users table.
|
||||
|
||||
@@ -18,9 +18,14 @@ import re
|
||||
import simplejson as json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.appservice import AppServiceTransaction
|
||||
from synapse.config.appservice import load_appservices
|
||||
from synapse.storage.events import EventsWorkerStore
|
||||
from synapse.storage.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.state import StateGroupWorkerStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.async import Linearizer
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
|
||||
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
|
||||
return exclusive_user_regex
|
||||
|
||||
|
||||
class ApplicationServiceWorkerStore(SQLBaseStore):
|
||||
class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
|
||||
SQLBaseStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
self.services_cache = load_appservices(
|
||||
hs.hostname,
|
||||
@@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
|
||||
return service
|
||||
return None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservices_with_user_in_room(self, event):
|
||||
"""Get the list of appservices in the room at the given event
|
||||
|
||||
Args:
|
||||
event (Event)
|
||||
|
||||
Returns:
|
||||
Deferred[set(str)]: The IDs of all ASes in the room
|
||||
"""
|
||||
state_group = yield self._get_state_group_for_event(event.event_id)
|
||||
|
||||
if not state_group:
|
||||
raise Exception("No state group for event %s", event.event_id)
|
||||
|
||||
ases_in_room = yield self._get_appservices_with_user_in_room(
|
||||
event.room_id, state_group,
|
||||
)
|
||||
|
||||
defer.returnValue(ases_in_room)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000)
|
||||
def _get_appservices_with_user_in_room(self, room_id, state_group):
|
||||
cache = self._get_appservices_with_user_in_room_cache(room_id)
|
||||
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
|
||||
|
||||
defer.returnValue(ases_in_room)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def _get_appservices_with_user_in_room_cache(self, room_id):
|
||||
return _AppserviceUsersCache(self, room_id)
|
||||
|
||||
|
||||
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
||||
# This is currently empty due to there not being any AS storage functions
|
||||
@@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
|
||||
" (SELECT stream_ordering FROM appservice_stream_position)"
|
||||
" < e.stream_ordering"
|
||||
" AND e.stream_ordering <= ?"
|
||||
" AND NOT e.outlier"
|
||||
" ORDER BY e.stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
@@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
|
||||
# to keep consistency with the other stores, we keep this empty class for
|
||||
# now.
|
||||
pass
|
||||
|
||||
|
||||
class _AppserviceUsersCache(object):
|
||||
"""Attempts to calculate which appservices have users in a given room by
|
||||
looking at state groups and their delta_ids
|
||||
"""
|
||||
|
||||
def __init__(self, store, room_id):
|
||||
self.store = store
|
||||
self.room_id = room_id
|
||||
|
||||
self.linearizer = Linearizer("_AppserviceUsersCache")
|
||||
|
||||
# The last state group we calculated the ASes in the room for.
|
||||
self.state_group = object()
|
||||
|
||||
# A dict of all appservices in the room at the above state group,
|
||||
# along with a user_id of an AS user in the room.
|
||||
# Dict of as_id -> user_id.
|
||||
self.appservices_in_room = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservices_in_room_by_user(self, state_group):
|
||||
"""
|
||||
Args:
|
||||
state_group(str)
|
||||
|
||||
Returns:
|
||||
Deferred[set(str)]: The IDs of all ASes in the room
|
||||
"""
|
||||
assert state_group is not None
|
||||
|
||||
if state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
with (yield self.linearizer.queue(())):
|
||||
# Set of ASes that we need to recalculate their membership of
|
||||
# the room
|
||||
uhandled_ases = set()
|
||||
|
||||
# If the state groups match then there is nothing to do
|
||||
if state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
|
||||
|
||||
# If the prev_group matches the last state group we can calculate
|
||||
# the new value by looking at the deltas
|
||||
if prev_group and prev_group == self.state_group:
|
||||
for (typ, state_key), event_id in delta_ids.iteritems():
|
||||
if typ != EventTypes.Member:
|
||||
continue
|
||||
|
||||
user_id = state_key
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
|
||||
is_join = event.membership == Membership.JOIN
|
||||
for appservice in self.store.get_app_services():
|
||||
as_id = appservice.id
|
||||
|
||||
# If this is a join and the appservice is already in
|
||||
# the room then its a noop
|
||||
if is_join:
|
||||
if as_id in self.appservices_in_room:
|
||||
continue
|
||||
# If this is not a join, then we only need to recalculate
|
||||
# if the AS is in the room and the cached joined AS user
|
||||
# matches this event.
|
||||
elif self.appservices_in_room.get(as_id, None) != user_id:
|
||||
continue
|
||||
|
||||
# If the AS is not interested in the user then its a
|
||||
# noop.
|
||||
if not appservice.is_interested_in_user(user_id):
|
||||
continue
|
||||
|
||||
if is_join:
|
||||
# If an AS user is joining then the AS is now
|
||||
# interested in the room
|
||||
self.appservices_in_room[as_id] = user_id
|
||||
else:
|
||||
# If an AS user has left then we need to
|
||||
# recalcualte if they're in the room.
|
||||
uhandled_ases.add(appservice)
|
||||
self.appservices_in_room.pop(as_id, None)
|
||||
else:
|
||||
uhandled_ases = set(self.store.get_app_services())
|
||||
|
||||
if uhandled_ases:
|
||||
# We need to recalculate which ASes are in the room, so lets
|
||||
# get the current state and try and find a join event
|
||||
# that the AS is interested in.
|
||||
|
||||
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
|
||||
|
||||
for appservice in uhandled_ases:
|
||||
as_id = appservice.id
|
||||
|
||||
self.appservices_in_room.pop(as_id, None)
|
||||
|
||||
for (etype, state_key), event_id in current_state_ids.iteritems():
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
||||
if not appservice.is_interested_in_user(state_key):
|
||||
continue
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
if event.membership == Membership.JOIN:
|
||||
self.appservices_in_room[as_id] = state_key
|
||||
break
|
||||
|
||||
self.state_group = state_group
|
||||
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
@@ -48,13 +48,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
columns=["user_id", "device_id", "last_seen"],
|
||||
)
|
||||
|
||||
self.register_background_index_update(
|
||||
"user_ips_last_seen_index",
|
||||
index_name="user_ips_last_seen",
|
||||
table="user_ips",
|
||||
columns=["user_id", "last_seen"],
|
||||
)
|
||||
|
||||
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
|
||||
self._batch_row_update = {}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -30,24 +29,6 @@ _DEFAULT_ROLE_ID = ""
|
||||
|
||||
|
||||
class GroupServerStore(SQLBaseStore):
|
||||
def set_group_join_policy(self, group_id, join_policy):
|
||||
"""Set the join policy of a group.
|
||||
|
||||
join_policy can be one of:
|
||||
* "invite"
|
||||
* "open"
|
||||
"""
|
||||
return self._simple_update_one(
|
||||
table="groups",
|
||||
keyvalues={
|
||||
"group_id": group_id,
|
||||
},
|
||||
updatevalues={
|
||||
"join_policy": join_policy,
|
||||
},
|
||||
desc="set_group_join_policy",
|
||||
)
|
||||
|
||||
def get_group(self, group_id):
|
||||
return self._simple_select_one(
|
||||
table="groups",
|
||||
@@ -55,11 +36,10 @@ class GroupServerStore(SQLBaseStore):
|
||||
"group_id": group_id,
|
||||
},
|
||||
retcols=(
|
||||
"name", "short_description", "long_description",
|
||||
"avatar_url", "is_public", "join_policy",
|
||||
"name", "short_description", "long_description", "avatar_url", "is_public"
|
||||
),
|
||||
allow_none=True,
|
||||
desc="get_group",
|
||||
desc="is_user_in_group",
|
||||
)
|
||||
|
||||
def get_users_in_group(self, group_id, include_private=False):
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 - 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -26,7 +25,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 48
|
||||
SCHEMA_VERSION = 47
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
@@ -594,8 +594,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
|
||||
while next_token:
|
||||
sql = """
|
||||
SELECT stream_ordering, json FROM events
|
||||
JOIN event_json USING (event_id)
|
||||
SELECT stream_ordering, content FROM events
|
||||
WHERE room_id = ?
|
||||
AND stream_ordering < ?
|
||||
AND contains_url = ? AND outlier = ?
|
||||
@@ -607,8 +606,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
next_token = None
|
||||
for stream_ordering, content_json in txn:
|
||||
next_token = stream_ordering
|
||||
event_json = json.loads(content_json)
|
||||
content = event_json["content"]
|
||||
content = json.loads(content_json)
|
||||
|
||||
content_url = content.get("url")
|
||||
thumbnail_url = content.get("info", {}).get("thumbnail_url")
|
||||
|
||||
|
||||
@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
|
||||
# @defer.inlineCallbacks
|
||||
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
|
||||
# We don't use `state_group`, its there so that we can cache based
|
||||
# on it. However, its important that its never None, since two current_state's
|
||||
@@ -645,9 +644,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||
|
||||
def add_membership_profile_txn(txn):
|
||||
sql = ("""
|
||||
SELECT stream_ordering, event_id, events.room_id, event_json.json
|
||||
SELECT stream_ordering, event_id, events.room_id, content
|
||||
FROM events
|
||||
INNER JOIN event_json USING (event_id)
|
||||
INNER JOIN room_memberships USING (event_id)
|
||||
WHERE ? <= stream_ordering AND stream_ordering < ?
|
||||
AND type = 'm.room.member'
|
||||
@@ -668,8 +666,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||
event_id = row["event_id"]
|
||||
room_id = row["room_id"]
|
||||
try:
|
||||
event_json = json.loads(row["json"])
|
||||
content = event_json['content']
|
||||
content = json.loads(row["content"])
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('user_ips_last_seen_index', '{}');
|
||||
@@ -1,22 +0,0 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This isn't a real ENUM because sqlite doesn't support it
|
||||
* and we use a default of NULL for inserted rows and interpret
|
||||
* NULL at the python store level as necessary so that existing
|
||||
* rows are given the correct default policy.
|
||||
*/
|
||||
ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';
|
||||
@@ -75,9 +75,8 @@ class SearchStore(BackgroundUpdateStore):
|
||||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, room_id, type, json, "
|
||||
"SELECT stream_ordering, event_id, room_id, type, content, "
|
||||
" origin_server_ts FROM events"
|
||||
" JOIN event_json USING (event_id)"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" AND (%s)"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
@@ -105,8 +104,7 @@ class SearchStore(BackgroundUpdateStore):
|
||||
stream_ordering = row["stream_ordering"]
|
||||
origin_server_ts = row["origin_server_ts"]
|
||||
try:
|
||||
event_json = json.loads(row["json"])
|
||||
content = event_json["content"]
|
||||
content = json.loads(row["content"])
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -40,11 +39,12 @@ _CacheSentinel = object()
|
||||
|
||||
class CacheEntry(object):
|
||||
__slots__ = [
|
||||
"deferred", "callbacks", "invalidated"
|
||||
"deferred", "sequence", "callbacks", "invalidated"
|
||||
]
|
||||
|
||||
def __init__(self, deferred, callbacks):
|
||||
def __init__(self, deferred, sequence, callbacks):
|
||||
self.deferred = deferred
|
||||
self.sequence = sequence
|
||||
self.callbacks = set(callbacks)
|
||||
self.invalidated = False
|
||||
|
||||
@@ -62,6 +62,7 @@ class Cache(object):
|
||||
"max_entries",
|
||||
"name",
|
||||
"keylen",
|
||||
"sequence",
|
||||
"thread",
|
||||
"metrics",
|
||||
"_pending_deferred_cache",
|
||||
@@ -79,6 +80,7 @@ class Cache(object):
|
||||
|
||||
self.name = name
|
||||
self.keylen = keylen
|
||||
self.sequence = 0
|
||||
self.thread = None
|
||||
self.metrics = register_cache(name, self.cache)
|
||||
|
||||
@@ -111,10 +113,11 @@ class Cache(object):
|
||||
callbacks = [callback] if callback else []
|
||||
val = self._pending_deferred_cache.get(key, _CacheSentinel)
|
||||
if val is not _CacheSentinel:
|
||||
val.callbacks.update(callbacks)
|
||||
if update_metrics:
|
||||
self.metrics.inc_hits()
|
||||
return val.deferred
|
||||
if val.sequence == self.sequence:
|
||||
val.callbacks.update(callbacks)
|
||||
if update_metrics:
|
||||
self.metrics.inc_hits()
|
||||
return val.deferred
|
||||
|
||||
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
|
||||
if val is not _CacheSentinel:
|
||||
@@ -134,9 +137,12 @@ class Cache(object):
|
||||
self.check_thread()
|
||||
entry = CacheEntry(
|
||||
deferred=value,
|
||||
sequence=self.sequence,
|
||||
callbacks=callbacks,
|
||||
)
|
||||
|
||||
entry.callbacks.update(callbacks)
|
||||
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry:
|
||||
existing_entry.invalidate()
|
||||
@@ -144,25 +150,13 @@ class Cache(object):
|
||||
self._pending_deferred_cache[key] = entry
|
||||
|
||||
def shuffle(result):
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
if self.sequence == entry.sequence:
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
else:
|
||||
entry.invalidate()
|
||||
else:
|
||||
# oops, the _pending_deferred_cache has been updated since
|
||||
# we started our query, so we are out of date.
|
||||
#
|
||||
# Better put back whatever we took out. (We do it this way
|
||||
# round, rather than peeking into the _pending_deferred_cache
|
||||
# and then removing on a match, to make the common case faster)
|
||||
if existing_entry is not None:
|
||||
self._pending_deferred_cache[key] = existing_entry
|
||||
|
||||
# we're not going to put this entry into the cache, so need
|
||||
# to make sure that the invalidation callbacks are called.
|
||||
# That was probably done when _pending_deferred_cache was
|
||||
# updated, but it's possible that `set` was called without
|
||||
# `invalidate` being previously called, in which case it may
|
||||
# not have been. Either way, let's double-check now.
|
||||
entry.invalidate()
|
||||
return result
|
||||
|
||||
@@ -174,29 +168,25 @@ class Cache(object):
|
||||
|
||||
def invalidate(self, key):
|
||||
self.check_thread()
|
||||
self.cache.pop(key, None)
|
||||
|
||||
# if we have a pending lookup for this key, remove it from the
|
||||
# _pending_deferred_cache, which will (a) stop it being returned
|
||||
# for future queries and (b) stop it being persisted as a proper entry
|
||||
# in self.cache.
|
||||
# Increment the sequence number so that any SELECT statements that
|
||||
# raced with the INSERT don't update the cache (SYN-369)
|
||||
self.sequence += 1
|
||||
entry = self._pending_deferred_cache.pop(key, None)
|
||||
|
||||
# run the invalidation callbacks now, rather than waiting for the
|
||||
# deferred to resolve.
|
||||
if entry:
|
||||
entry.invalidate()
|
||||
|
||||
self.cache.pop(key, None)
|
||||
|
||||
def invalidate_many(self, key):
|
||||
self.check_thread()
|
||||
if not isinstance(key, tuple):
|
||||
raise TypeError(
|
||||
"The cache key must be a tuple not %r" % (type(key),)
|
||||
)
|
||||
self.sequence += 1
|
||||
self.cache.del_multi(key)
|
||||
|
||||
# if we have a pending lookup for this key, remove it from the
|
||||
# _pending_deferred_cache, as above
|
||||
entry_dict = self._pending_deferred_cache.pop(key, None)
|
||||
if entry_dict is not None:
|
||||
for entry in iterate_tree_cache_entry(entry_dict):
|
||||
@@ -204,10 +194,8 @@ class Cache(object):
|
||||
|
||||
def invalidate_all(self):
|
||||
self.check_thread()
|
||||
self.sequence += 1
|
||||
self.cache.clear()
|
||||
for entry in self._pending_deferred_cache.itervalues():
|
||||
entry.invalidate()
|
||||
self._pending_deferred_cache.clear()
|
||||
|
||||
|
||||
class _CacheDescriptorBase(object):
|
||||
|
||||
@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.event.sender = "@irc_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_user_id_prefix_no_match(self):
|
||||
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.event.sender = "@someone_else:matrix.org"
|
||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
||||
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_member_is_checked(self):
|
||||
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
self.event.sender = "@someone_else:matrix.org"
|
||||
self.event.type = "m.room.member"
|
||||
self.event.state_key = "@irc_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_id_match(self):
|
||||
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||
)
|
||||
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_id_no_match(self):
|
||||
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||
)
|
||||
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
|
||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
||||
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_alias_match(self):
|
||||
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
self.store.get_aliases_for_room.return_value = [
|
||||
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
|
||||
]
|
||||
self.store.get_users_in_room.return_value = []
|
||||
self.store.get_appservices_with_user_in_room.return_value = []
|
||||
self.assertFalse((yield self.service.is_interested(
|
||||
self.event, self.store
|
||||
)))
|
||||
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
}
|
||||
self.event.state_key = self.service.sender
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_member_list_match(self):
|
||||
self.service.namespaces[ApplicationService.NS_USERS].append(
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.store.get_users_in_room.return_value = [
|
||||
"@alice:here",
|
||||
"@irc_fo:here", # AS user
|
||||
"@bob:here",
|
||||
]
|
||||
self.store.get_aliases_for_room.return_value = []
|
||||
|
||||
self.event.sender = "@xmpp_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(
|
||||
event=self.event, store=self.store
|
||||
)))
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -13,7 +12,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from functools import partial
|
||||
import logging
|
||||
|
||||
import mock
|
||||
@@ -27,50 +25,6 @@ from tests import unittest
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CacheTestCase(unittest.TestCase):
|
||||
def test_invalidate_all(self):
|
||||
cache = descriptors.Cache("testcache")
|
||||
|
||||
callback_record = [False, False]
|
||||
|
||||
def record_callback(idx):
|
||||
callback_record[idx] = True
|
||||
|
||||
# add a couple of pending entries
|
||||
d1 = defer.Deferred()
|
||||
cache.set("key1", d1, partial(record_callback, 0))
|
||||
|
||||
d2 = defer.Deferred()
|
||||
cache.set("key2", d2, partial(record_callback, 1))
|
||||
|
||||
# lookup should return the deferreds
|
||||
self.assertIs(cache.get("key1"), d1)
|
||||
self.assertIs(cache.get("key2"), d2)
|
||||
|
||||
# let one of the lookups complete
|
||||
d2.callback("result2")
|
||||
self.assertEqual(cache.get("key2"), "result2")
|
||||
|
||||
# now do the invalidation
|
||||
cache.invalidate_all()
|
||||
|
||||
# lookup should return none
|
||||
self.assertIsNone(cache.get("key1", None))
|
||||
self.assertIsNone(cache.get("key2", None))
|
||||
|
||||
# both callbacks should have been callbacked
|
||||
self.assertTrue(
|
||||
callback_record[0], "Invalidation callback for key1 not called",
|
||||
)
|
||||
self.assertTrue(
|
||||
callback_record[1], "Invalidation callback for key2 not called",
|
||||
)
|
||||
|
||||
# letting the other lookup complete should do nothing
|
||||
d1.callback("result1")
|
||||
self.assertIsNone(cache.get("key1", None))
|
||||
|
||||
|
||||
class DescriptorTestCase(unittest.TestCase):
|
||||
@defer.inlineCallbacks
|
||||
def test_cache(self):
|
||||
|
||||
Reference in New Issue
Block a user