1
0

Compare commits

..

3 Commits

Author SHA1 Message Date
Erik Johnston
86090eadb0 Don't send outlier events to ASes 2018-04-06 10:21:17 +01:00
Erik Johnston
edbeed06ca Fix MRO for replication stores 2018-04-06 10:20:50 +01:00
Erik Johnston
277d2c506d Add cache for if ASes have users in a room 2018-04-05 17:28:27 +01:00
34 changed files with 329 additions and 762 deletions

View File

@@ -1,66 +1,3 @@
Changes in synapse v0.27.3 (2018-04-11)
======================================
Bug fixes:
* URL quote path segments over federation (#3082)
Changes in synapse v0.27.3-rc2 (2018-04-09)
==========================================
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
Changes in synapse v0.27.3-rc1 (2018-04-09)
=======================================
Notable changes include API support for joinability of groups. Also new metrics
and phone home stats. Phone home stats include better visibility of system usage
so we can tweak synpase to work better for all users rather than our own experience
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
overal growth of the Matrix ecosystem. It is defined as:-
Counts the number of native 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days"
Features:
* Add joinability for groups (PR #3045)
* Implement group join API (PR #3046)
* Add counter metrics for calculating state delta (PR #3033)
* R30 stats (PR #3041)
* Measure time it takes to calculate state group ID (PR #3043)
* Add basic performance statistics to phone home (PR #3044)
* Add response size metrics (PR #3071)
* phone home cache size configurations (PR #3063)
Changes:
* Add a blurb explaining the main synapse worker (PR #2886) Thanks to @turt2live!
* Replace old style error catching with 'as' keyword (PR #3000) Thanks to @NotAFile!
* Use .iter* to avoid copies in StateHandler (PR #3006)
* Linearize calls to _generate_user_id (PR #3029)
* Remove last usage of ujson (PR #3030)
* Use simplejson throughout (PR #3048)
* Use static JSONEncoders (PR #3049)
* Remove uses of events.content (PR #3060)
* Improve database cache performance (PR #3068)
Bug fixes:
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
* Fix replication after switch to simplejson (PR #3015)
* Fix replication after switch to simplejson (PR #3015)
* 404 correctly on missing paths via NoResource (PR #3022)
* Fix error when claiming e2e keys from offline servers (PR #3034)
* fix tests/storage/test_user_directory.py (PR #3042)
* use PUT instead of POST for federating groups/m.join_policy (PR #3070) Thanks to @krombel!
* postgres port script: fix state_groups_pkey error (PR #3072)
Changes in synapse v0.27.2 (2018-03-26)
=======================================

View File

@@ -157,8 +157,8 @@ if you prefer.
In case of problems, please see the _`Troubleshooting` section below.
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy

View File

@@ -55,12 +55,7 @@ synapse process.)
You then create a set of configs for the various worker processes. These
should be worker configuration files, and should be stored in a dedicated
subdirectory, to allow synctl to manipulate them. An additional configuration
for the master synapse process will need to be created because the process will
not be started automatically. That configuration should look like this::
worker_app: synapse.app.homeserver
daemonize: true
subdirectory, to allow synctl to manipulate them.
Each worker configuration file inherits the configuration of the main homeserver
configuration file. You can then override configuration specific to that worker,
@@ -235,11 +230,9 @@ file. For example::
``synapse.app.event_creator``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles some event creation. It can handle REST endpoints matching::
Handles non-state event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.

View File

@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -251,12 +250,6 @@ class Porter(object):
@defer.inlineCallbacks
def handle_table(self, table, postgres_size, table_size, forward_chunk,
backward_chunk):
logger.info(
"Table %s: %i/%i (rows %i-%i) already ported",
table, postgres_size, table_size,
backward_chunk+1, forward_chunk-1,
)
if not table_size:
return
@@ -474,10 +467,31 @@ class Porter(object):
self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
self.progress.set_state("Creating port tables")
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
)
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
)
tables = set(sqlite_tables) & set(postgres_tables)
self.progress.set_state("Creating tables")
logger.info("Found %d tables", len(tables))
def create_port_table(txn):
txn.execute(
"CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
"CREATE TABLE port_from_sqlite3 ("
" table_name varchar(100) NOT NULL UNIQUE,"
" forward_rowid bigint NOT NULL,"
" backward_rowid bigint NOT NULL"
@@ -503,33 +517,18 @@ class Porter(object):
"alter_table", alter_table
)
except Exception as e:
pass
logger.info("Failed to create port table: %s", e)
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
try:
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
except Exception as e:
logger.info("Failed to create port table: %s", e)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
)
self.progress.set_state("Setting up")
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
)
tables = set(sqlite_tables) & set(postgres_tables)
logger.info("Found %d tables", len(tables))
# Step 3. Figure out what still needs copying
self.progress.set_state("Checking on port progress")
# Set up tables.
setup_res = yield defer.gatherResults(
[
self.setup_table(table)
@@ -540,8 +539,7 @@ class Porter(object):
consumeErrors=True,
)
# Step 4. Do the copying.
self.progress.set_state("Copying to postgres")
# Process tables.
yield defer.gatherResults(
[
self.handle_table(*res)
@@ -550,9 +548,6 @@ class Porter(object):
consumeErrors=True,
)
# Step 5. Do final post-processing
yield self._setup_state_group_id_seq()
self.progress.done()
except:
global end_error_exec_info
@@ -712,16 +707,6 @@ class Porter(object):
defer.returnValue((done, remaining + done))
def _setup_state_group_id_seq(self):
def r(txn):
txn.execute("SELECT MAX(id) FROM state_groups")
next_id = txn.fetchone()[0]+1
txn.execute(
"ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
(next_id,),
)
return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
##############################################
###### The following is simply UI stuff ######

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.27.3"
__version__ = "0.27.2"

View File

@@ -42,7 +42,7 @@ logger = logging.getLogger("synapse.app.appservice")
class AppserviceSlaveStore(
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
SlavedRegistrationStore,
):
pass

View File

@@ -50,11 +50,11 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
SlavedClientIpStore,

View File

@@ -430,10 +430,6 @@ def run(hs):
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in r30_results.iteritems():
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR

View File

@@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronSlavedStore(
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedPushRuleStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedFilteringStore,
@@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
RoomStore,

View File

@@ -252,7 +252,6 @@ def main():
for running_pid in running_pids:
while pid_running(running_pid):
time.sleep(0.2)
write("All processes exited; now restarting...")
if action == "start" or action == "restart":
if start_stop_synapse:

View File

@@ -49,8 +49,8 @@ logger = logging.getLogger("synapse.app.user_dir")
class UserDirectorySlaveStore(
SlavedEventStore,
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedRegistrationStore,
SlavedClientIpStore,
UserDirectoryStore,

View File

@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import EventTypes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import GroupID, get_domain_from_id
from twisted.internet import defer
@@ -173,6 +172,7 @@ class ApplicationService(object):
if self.is_interested_in_user(event.sender):
defer.returnValue(True)
# also check m.room.member state key
if (event.type == EventTypes.Member and
self.is_interested_in_user(event.state_key)):
@@ -181,20 +181,18 @@ class ApplicationService(object):
if not store:
defer.returnValue(False)
does_match = yield self._matches_user_in_member_list(event.room_id, store)
does_match = yield self._matches_user_in_member_list(
event, store,
)
defer.returnValue(does_match)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = yield store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
@defer.inlineCallbacks
def _matches_user_in_member_list(self, event, store):
ases = yield store.get_appservices_with_user_in_room(
event,
)
# check joined member events
for user_id in member_list:
if self.is_interested_in_user(user_id):
defer.returnValue(True)
defer.returnValue(False)
defer.returnValue(self.id in ases)
def _matches_room_id(self, event):
if hasattr(event, "room_id"):

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -21,7 +20,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
import urllib
logger = logging.getLogger(__name__)
@@ -51,7 +49,7 @@ class TransportLayerClient(object):
logger.debug("get_room_state dest=%s, room=%s",
destination, room_id)
path = _create_path(PREFIX, "/state/%s/", room_id)
path = PREFIX + "/state/%s/" % room_id
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@@ -73,7 +71,7 @@ class TransportLayerClient(object):
logger.debug("get_room_state_ids dest=%s, room=%s",
destination, room_id)
path = _create_path(PREFIX, "/state_ids/%s/", room_id)
path = PREFIX + "/state_ids/%s/" % room_id
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@@ -95,7 +93,7 @@ class TransportLayerClient(object):
logger.debug("get_pdu dest=%s, event_id=%s",
destination, event_id)
path = _create_path(PREFIX, "/event/%s/", event_id)
path = PREFIX + "/event/%s/" % (event_id, )
return self.client.get_json(destination, path=path, timeout=timeout)
@log_function
@@ -121,7 +119,7 @@ class TransportLayerClient(object):
# TODO: raise?
return
path = _create_path(PREFIX, "/backfill/%s/", room_id)
path = PREFIX + "/backfill/%s/" % (room_id,)
args = {
"v": event_tuples,
@@ -159,11 +157,9 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
response = yield self.client.put_json(
transaction.destination,
path=path,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
data=json_data,
json_data_callback=json_data_callback,
long_retries=True,
@@ -181,7 +177,7 @@ class TransportLayerClient(object):
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail,
ignore_backoff=False):
path = _create_path(PREFIX, "/query/%s", query_type)
path = PREFIX + "/query/%s" % query_type
content = yield self.client.get_json(
destination=destination,
@@ -226,7 +222,7 @@ class TransportLayerClient(object):
"make_membership_event called with membership='%s', must be one of %s" %
(membership, ",".join(valid_memberships))
)
path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
ignore_backoff = False
retry_on_dns_fail = False
@@ -252,7 +248,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -265,7 +261,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_leave(self, destination, room_id, event_id, content):
path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -284,7 +280,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -326,7 +322,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def exchange_third_party_invite(self, destination, room_id, event_dict):
path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
response = yield self.client.put_json(
destination=destination,
@@ -339,7 +335,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
content = yield self.client.get_json(
destination=destination,
@@ -351,7 +347,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_query_auth(self, destination, room_id, event_id, content):
path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
content = yield self.client.post_json(
destination=destination,
@@ -413,7 +409,7 @@ class TransportLayerClient(object):
Returns:
A dict containg the device keys.
"""
path = _create_path(PREFIX, "/user/devices/%s", user_id)
path = PREFIX + "/user/devices/" + user_id
content = yield self.client.get_json(
destination=destination,
@@ -463,7 +459,7 @@ class TransportLayerClient(object):
@log_function
def get_missing_events(self, destination, room_id, earliest_events,
latest_events, limit, min_depth, timeout):
path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
path = PREFIX + "/get_missing_events/%s" % (room_id,)
content = yield self.client.post_json(
destination=destination,
@@ -483,7 +479,7 @@ class TransportLayerClient(object):
def get_group_profile(self, destination, group_id, requester_user_id):
"""Get a group profile
"""
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
path = PREFIX + "/groups/%s/profile" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -502,7 +498,7 @@ class TransportLayerClient(object):
requester_user_id (str)
content (dict): The new profile of the group
"""
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
path = PREFIX + "/groups/%s/profile" % (group_id,)
return self.client.post_json(
destination=destination,
@@ -516,7 +512,7 @@ class TransportLayerClient(object):
def get_group_summary(self, destination, group_id, requester_user_id):
"""Get a group summary
"""
path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
path = PREFIX + "/groups/%s/summary" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -529,7 +525,7 @@ class TransportLayerClient(object):
def get_rooms_in_group(self, destination, group_id, requester_user_id):
"""Get all rooms in a group
"""
path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
path = PREFIX + "/groups/%s/rooms" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -542,7 +538,7 @@ class TransportLayerClient(object):
content):
"""Add a room to a group
"""
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
return self.client.post_json(
destination=destination,
@@ -556,10 +552,7 @@ class TransportLayerClient(object):
config_key, content):
"""Update room in group
"""
path = _create_path(
PREFIX, "/groups/%s/room/%s/config/%s",
group_id, room_id, config_key,
)
path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
return self.client.post_json(
destination=destination,
@@ -572,7 +565,7 @@ class TransportLayerClient(object):
def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
"""Remove a room from a group
"""
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
return self.client.delete_json(
destination=destination,
@@ -585,7 +578,7 @@ class TransportLayerClient(object):
def get_users_in_group(self, destination, group_id, requester_user_id):
"""Get users in a group
"""
path = _create_path(PREFIX, "/groups/%s/users", group_id,)
path = PREFIX + "/groups/%s/users" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -598,7 +591,7 @@ class TransportLayerClient(object):
def get_invited_users_in_group(self, destination, group_id, requester_user_id):
"""Get users that have been invited to a group
"""
path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
path = PREFIX + "/groups/%s/invited_users" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -611,23 +604,7 @@ class TransportLayerClient(object):
def accept_group_invite(self, destination, group_id, user_id, content):
"""Accept a group invite
"""
path = _create_path(
PREFIX, "/groups/%s/users/%s/accept_invite",
group_id, user_id,
)
return self.client.post_json(
destination=destination,
path=path,
data=content,
ignore_backoff=True,
)
@log_function
def join_group(self, destination, group_id, user_id, content):
"""Attempts to join a group
"""
path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -640,7 +617,7 @@ class TransportLayerClient(object):
def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
"""Invite a user to a group
"""
path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -656,7 +633,7 @@ class TransportLayerClient(object):
invited.
"""
path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -670,7 +647,7 @@ class TransportLayerClient(object):
user_id, content):
"""Remove a user fron a group
"""
path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -687,7 +664,7 @@ class TransportLayerClient(object):
kicked from the group.
"""
path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -702,7 +679,7 @@ class TransportLayerClient(object):
the attestations
"""
path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -717,12 +694,11 @@ class TransportLayerClient(object):
"""Update a room entry in a group summary
"""
if category_id:
path = _create_path(
PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
group_id, category_id, room_id,
)
else:
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
return self.client.post_json(
destination=destination,
@@ -738,12 +714,11 @@ class TransportLayerClient(object):
"""Delete a room entry in a group summary
"""
if category_id:
path = _create_path(
PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
group_id, category_id, room_id,
)
else:
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
return self.client.delete_json(
destination=destination,
@@ -756,7 +731,7 @@ class TransportLayerClient(object):
def get_group_categories(self, destination, group_id, requester_user_id):
"""Get all categories in a group
"""
path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
path = PREFIX + "/groups/%s/categories" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -769,7 +744,7 @@ class TransportLayerClient(object):
def get_group_category(self, destination, group_id, requester_user_id, category_id):
"""Get category info in a group
"""
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
return self.client.get_json(
destination=destination,
@@ -783,7 +758,7 @@ class TransportLayerClient(object):
content):
"""Update a category in a group
"""
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
return self.client.post_json(
destination=destination,
@@ -798,7 +773,7 @@ class TransportLayerClient(object):
category_id):
"""Delete a category in a group
"""
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
return self.client.delete_json(
destination=destination,
@@ -811,7 +786,7 @@ class TransportLayerClient(object):
def get_group_roles(self, destination, group_id, requester_user_id):
"""Get all roles in a group
"""
path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
path = PREFIX + "/groups/%s/roles" % (group_id,)
return self.client.get_json(
destination=destination,
@@ -824,7 +799,7 @@ class TransportLayerClient(object):
def get_group_role(self, destination, group_id, requester_user_id, role_id):
"""Get a roles info
"""
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
return self.client.get_json(
destination=destination,
@@ -838,7 +813,7 @@ class TransportLayerClient(object):
content):
"""Update a role in a group
"""
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
return self.client.post_json(
destination=destination,
@@ -852,7 +827,7 @@ class TransportLayerClient(object):
def delete_group_role(self, destination, group_id, requester_user_id, role_id):
"""Delete a role in a group
"""
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
return self.client.delete_json(
destination=destination,
@@ -867,12 +842,11 @@ class TransportLayerClient(object):
"""Update a users entry in a group
"""
if role_id:
path = _create_path(
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
group_id, role_id, user_id,
)
else:
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
return self.client.post_json(
destination=destination,
@@ -882,33 +856,17 @@ class TransportLayerClient(object):
ignore_backoff=True,
)
@log_function
def set_group_join_policy(self, destination, group_id, requester_user_id,
content):
"""Sets the join policy for a group
"""
path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
return self.client.put_json(
destination=destination,
path=path,
args={"requester_user_id": requester_user_id},
data=content,
ignore_backoff=True,
)
@log_function
def delete_group_summary_user(self, destination, group_id, requester_user_id,
user_id, role_id):
"""Delete a users entry in a group
"""
if role_id:
path = _create_path(
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
group_id, role_id, user_id,
)
else:
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
return self.client.delete_json(
destination=destination,
@@ -931,22 +889,3 @@ class TransportLayerClient(object):
data=content,
ignore_backoff=True,
)
def _create_path(prefix, path, *args):
"""Creates a path from the prefix, path template and args. Ensures that
all args are url encoded.
Example:
_create_path(PREFIX, "/event/%s/", event_id)
Args:
prefix (str)
path (str): String template for the path
args: ([str]): Args to insert into path. Each arg will be url encoded
Returns:
str
"""
return prefix + path % tuple(urllib.quote(arg, "") for arg in args)

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -803,23 +802,6 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
class FederationGroupsJoinServlet(BaseFederationServlet):
"""Attempt to join a group
"""
PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, group_id, user_id):
if get_domain_from_id(user_id) != origin:
raise SynapseError(403, "user_id doesn't match origin")
new_content = yield self.handler.join_group(
group_id, user_id, content,
)
defer.returnValue((200, new_content))
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
"""Leave or kick a user from the group
"""
@@ -1142,24 +1124,6 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
defer.returnValue((200, resp))
class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
"""Sets whether a group is joinable without an invite or knock
"""
PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, group_id):
requester_user_id = parse_string_from_args(query, "requester_user_id")
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
new_content = yield self.handler.set_group_join_policy(
group_id, requester_user_id, content
)
defer.returnValue((200, new_content))
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -1199,7 +1163,6 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsInvitedUsersServlet,
FederationGroupsInviteServlet,
FederationGroupsAcceptInviteServlet,
FederationGroupsJoinServlet,
FederationGroupsRemoveUserServlet,
FederationGroupsSummaryRoomsServlet,
FederationGroupsCategoriesServlet,
@@ -1209,7 +1172,6 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsSummaryUsersServlet,
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet,
)

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -206,28 +205,6 @@ class GroupsServerHandler(object):
defer.returnValue({})
@defer.inlineCallbacks
def set_group_join_policy(self, group_id, requester_user_id, content):
"""Sets the group join policy.
Currently supported policies are:
- "invite": an invite must be received and accepted in order to join.
- "open": anyone can join.
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
join_policy = _parse_join_policy_from_contents(content)
if join_policy is None:
raise SynapseError(
400, "No value specified for 'm.join_policy'"
)
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
defer.returnValue({})
@defer.inlineCallbacks
def get_group_categories(self, group_id, requester_user_id):
"""Get all categories in a group (as seen by user)
@@ -404,16 +381,9 @@ class GroupsServerHandler(object):
yield self.check_group_is_ours(group_id, requester_user_id)
group = yield self.store.get_group(group_id)
if group:
cols = [
"name", "short_description", "long_description",
"avatar_url", "is_public",
]
group_description = {key: group[key] for key in cols}
group_description["is_openly_joinable"] = group["join_policy"] == "open"
group_description = yield self.store.get_group(group_id)
if group_description:
defer.returnValue(group_description)
else:
raise SynapseError(404, "Unknown group")
@@ -684,40 +654,6 @@ class GroupsServerHandler(object):
else:
raise SynapseError(502, "Unknown state returned by HS")
@defer.inlineCallbacks
def _add_user(self, group_id, user_id, content):
"""Add a user to a group based on a content dict.
See accept_invite, join_group.
"""
if not self.hs.is_mine_id(user_id):
local_attestation = self.attestations.create_attestation(
group_id, user_id,
)
remote_attestation = content["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
user_id=user_id,
group_id=group_id,
)
else:
local_attestation = None
remote_attestation = None
is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_group(
group_id, user_id,
is_admin=False,
is_public=is_public,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
defer.returnValue(local_attestation)
@defer.inlineCallbacks
def accept_invite(self, group_id, requester_user_id, content):
"""User tries to accept an invite to the group.
@@ -734,27 +670,30 @@ class GroupsServerHandler(object):
if not is_invited:
raise SynapseError(403, "User not invited to group")
local_attestation = yield self._add_user(group_id, requester_user_id, content)
if not self.hs.is_mine_id(requester_user_id):
local_attestation = self.attestations.create_attestation(
group_id, requester_user_id,
)
remote_attestation = content["attestation"]
defer.returnValue({
"state": "join",
"attestation": local_attestation,
})
yield self.attestations.verify_attestation(
remote_attestation,
user_id=requester_user_id,
group_id=group_id,
)
else:
local_attestation = None
remote_attestation = None
@defer.inlineCallbacks
def join_group(self, group_id, requester_user_id, content):
"""User tries to join the group.
is_public = _parse_visibility_from_contents(content)
This will error if the group requires an invite/knock to join
"""
group_info = yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True
yield self.store.add_user_to_group(
group_id, requester_user_id,
is_admin=False,
is_public=is_public,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
if group_info['join_policy'] != "open":
raise SynapseError(403, "Group is not publicly joinable")
local_attestation = yield self._add_user(group_id, requester_user_id, content)
defer.returnValue({
"state": "join",
@@ -896,31 +835,6 @@ class GroupsServerHandler(object):
})
def _parse_join_policy_from_contents(content):
"""Given a content for a request, return the specified join policy or None
"""
join_policy_dict = content.get("m.join_policy")
if join_policy_dict:
return _parse_join_policy_dict(join_policy_dict)
else:
return None
def _parse_join_policy_dict(join_policy_dict):
"""Given a dict for the "m.join_policy" config return the join policy specified
"""
join_policy_type = join_policy_dict.get("type")
if not join_policy_type:
return "invite"
if join_policy_type not in ("invite", "open"):
raise SynapseError(
400, "Synapse only supports 'invite'/'open' join rule"
)
return join_policy_type
def _parse_visibility_from_contents(content):
"""Given a content for a request parse out whether the entity should be
public or not

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -91,8 +90,6 @@ class GroupsLocalHandler(object):
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
set_group_join_policy = _create_rerouter("set_group_join_policy")
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@@ -229,45 +226,7 @@ class GroupsLocalHandler(object):
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
if self.is_mine_id(group_id):
yield self.groups_server_handler.join_group(
group_id, user_id, content
)
local_attestation = None
remote_attestation = None
else:
local_attestation = self.attestations.create_attestation(group_id, user_id)
content["attestation"] = local_attestation
res = yield self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content,
)
remote_attestation = res["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
group_id=group_id,
user_id=user_id,
server_name=get_domain_from_id(group_id),
)
# TODO: Check that the group is public and we're being added publically
is_publicised = content.get("publicise", False)
token = yield self.store.register_user_group_membership(
group_id, user_id,
membership="join",
is_admin=False,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
is_publicised=is_publicised,
)
self.notifier.on_new_event(
"groups_key", token, users=[user_id],
)
defer.returnValue({})
raise NotImplementedError() # TODO
@defer.inlineCallbacks
def accept_invite(self, group_id, user_id, content):

View File

@@ -286,8 +286,7 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, args={}, data={},
json_data_callback=None,
def put_json(self, destination, path, data={}, json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
@@ -297,7 +296,6 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): query params
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
@@ -344,7 +342,6 @@ class MatrixFederationHttpClient(object):
path,
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
query_bytes=encode_query_args(args),
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -376,7 +373,6 @@ class MatrixFederationHttpClient(object):
giving up. None indicates no timeout.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
args (dict): query params
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.

View File

@@ -113,11 +113,6 @@ response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
)
_next_request_id = 0
@@ -431,8 +426,6 @@ class RequestMetrics(object):
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
class RootRedirect(resource.Resource):
"""Redirects the root '/' path to another path."""

View File

@@ -17,8 +17,10 @@
from synapse.storage.appservice import (
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
)
from synapse.replication.slave.storage.events import SlavedEventStore
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore):
ApplicationServiceWorkerStore,
SlavedEventStore):
pass

View File

@@ -655,12 +655,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
content=event_content,
)
return_value = {}
if membership_action == "join":
return_value["room_id"] = room_id
defer.returnValue((200, return_value))
defer.returnValue((200, {}))
def _has_3pid_invite_keys(self, content):
for key in {"id_server", "medium", "address"}:

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -402,32 +401,6 @@ class GroupInvitedUsersServlet(RestServlet):
defer.returnValue((200, result))
class GroupSettingJoinPolicyServlet(RestServlet):
"""Set group join policy
"""
PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/settings/m.join_policy$")
def __init__(self, hs):
super(GroupSettingJoinPolicyServlet, self).__init__()
self.auth = hs.get_auth()
self.groups_handler = hs.get_groups_local_handler()
@defer.inlineCallbacks
def on_PUT(self, request, group_id):
requester = yield self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
result = yield self.groups_handler.set_group_join_policy(
group_id,
requester_user_id,
content,
)
defer.returnValue((200, result))
class GroupCreateServlet(RestServlet):
"""Create a group
"""
@@ -765,7 +738,6 @@ def register_servlets(hs, http_server):
GroupInvitedUsersServlet(hs).register(http_server)
GroupUsersServlet(hs).register(http_server)
GroupRoomServlet(hs).register(http_server)
GroupSettingJoinPolicyServlet(hs).register(http_server)
GroupCreateServlet(hs).register(http_server)
GroupAdminRoomsServlet(hs).register(http_server)
GroupAdminRoomsConfigServlet(hs).register(http_server)

View File

@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -242,12 +244,13 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows]
@defer.inlineCallbacks
def count_daily_users(self):
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
def _count_users(txn):
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
sql = """
SELECT COALESCE(count(*), 0) FROM (
@@ -261,91 +264,8 @@ class DataStore(RoomMemberStore, RoomStore,
count, = txn.fetchone()
return count
return self.runInteraction("count_users", _count_users)
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days
Returns counts globaly for a given user as well as breaking
by platform
"""
def _count_r30_users(txn):
thirty_days_in_secs = 86400 * 30
now = int(self._clock.time())
thirty_days_ago_in_secs = now - thirty_days_in_secs
sql = """
SELECT platform, COALESCE(count(*), 0) FROM (
SELECT
users.name, platform, users.creation_ts * 1000,
MAX(uip.last_seen)
FROM users
INNER JOIN (
SELECT
user_id,
last_seen,
CASE
WHEN user_agent LIKE '%%Android%%' THEN 'android'
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
ELSE 'unknown'
END
AS platform
FROM user_ips
) uip
ON users.name = uip.user_id
AND users.appservice_id is NULL
AND users.creation_ts < ?
AND uip.last_seen/1000 > ?
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
GROUP BY users.name, platform, users.creation_ts
) u GROUP BY platform
"""
results = {}
txn.execute(sql, (thirty_days_ago_in_secs,
thirty_days_ago_in_secs))
for row in txn:
if row[0] is 'unknown':
pass
results[row[0]] = row[1]
sql = """
SELECT COALESCE(count(*), 0) FROM (
SELECT users.name, users.creation_ts * 1000,
MAX(uip.last_seen)
FROM users
INNER JOIN (
SELECT
user_id,
last_seen
FROM user_ips
) uip
ON users.name = uip.user_id
AND appservice_id is NULL
AND users.creation_ts < ?
AND uip.last_seen/1000 > ?
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
GROUP BY users.name, users.creation_ts
) u
"""
txn.execute(sql, (thirty_days_ago_in_secs,
thirty_days_ago_in_secs))
count, = txn.fetchone()
results['all'] = count
return results
return self.runInteraction("count_r30_users", _count_r30_users)
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
def get_users(self):
"""Function to reterive a list of users in users table.

View File

@@ -18,9 +18,14 @@ import re
import simplejson as json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.async import Linearizer
from ._base import SQLBaseStore
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
return exclusive_user_regex
class ApplicationServiceWorkerStore(SQLBaseStore):
class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
SQLBaseStore):
def __init__(self, db_conn, hs):
self.services_cache = load_appservices(
hs.hostname,
@@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
return service
return None
@defer.inlineCallbacks
def get_appservices_with_user_in_room(self, event):
"""Get the list of appservices in the room at the given event
Args:
event (Event)
Returns:
Deferred[set(str)]: The IDs of all ASes in the room
"""
state_group = yield self._get_state_group_for_event(event.event_id)
if not state_group:
raise Exception("No state group for event %s", event.event_id)
ases_in_room = yield self._get_appservices_with_user_in_room(
event.room_id, state_group,
)
defer.returnValue(ases_in_room)
@cachedInlineCallbacks(num_args=2, max_entries=10000)
def _get_appservices_with_user_in_room(self, room_id, state_group):
cache = self._get_appservices_with_user_in_room_cache(room_id)
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
defer.returnValue(ases_in_room)
@cached(max_entries=10000)
def _get_appservices_with_user_in_room_cache(self, room_id):
return _AppserviceUsersCache(self, room_id)
class ApplicationServiceStore(ApplicationServiceWorkerStore):
# This is currently empty due to there not being any AS storage functions
@@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" AND NOT e.outlier"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
@@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
# to keep consistency with the other stores, we keep this empty class for
# now.
pass
class _AppserviceUsersCache(object):
"""Attempts to calculate which appservices have users in a given room by
looking at state groups and their delta_ids
"""
def __init__(self, store, room_id):
self.store = store
self.room_id = room_id
self.linearizer = Linearizer("_AppserviceUsersCache")
# The last state group we calculated the ASes in the room for.
self.state_group = object()
# A dict of all appservices in the room at the above state group,
# along with a user_id of an AS user in the room.
# Dict of as_id -> user_id.
self.appservices_in_room = {}
@defer.inlineCallbacks
def get_appservices_in_room_by_user(self, state_group):
"""
Args:
state_group(str)
Returns:
Deferred[set(str)]: The IDs of all ASes in the room
"""
assert state_group is not None
if state_group == self.state_group:
defer.returnValue(frozenset(self.appservices_in_room))
with (yield self.linearizer.queue(())):
# Set of ASes that we need to recalculate their membership of
# the room
uhandled_ases = set()
# If the state groups match then there is nothing to do
if state_group == self.state_group:
defer.returnValue(frozenset(self.appservices_in_room))
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
# If the prev_group matches the last state group we can calculate
# the new value by looking at the deltas
if prev_group and prev_group == self.state_group:
for (typ, state_key), event_id in delta_ids.iteritems():
if typ != EventTypes.Member:
continue
user_id = state_key
event = yield self.store.get_event(event_id)
is_join = event.membership == Membership.JOIN
for appservice in self.store.get_app_services():
as_id = appservice.id
# If this is a join and the appservice is already in
# the room then its a noop
if is_join:
if as_id in self.appservices_in_room:
continue
# If this is not a join, then we only need to recalculate
# if the AS is in the room and the cached joined AS user
# matches this event.
elif self.appservices_in_room.get(as_id, None) != user_id:
continue
# If the AS is not interested in the user then its a
# noop.
if not appservice.is_interested_in_user(user_id):
continue
if is_join:
# If an AS user is joining then the AS is now
# interested in the room
self.appservices_in_room[as_id] = user_id
else:
# If an AS user has left then we need to
# recalcualte if they're in the room.
uhandled_ases.add(appservice)
self.appservices_in_room.pop(as_id, None)
else:
uhandled_ases = set(self.store.get_app_services())
if uhandled_ases:
# We need to recalculate which ASes are in the room, so lets
# get the current state and try and find a join event
# that the AS is interested in.
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
for appservice in uhandled_ases:
as_id = appservice.id
self.appservices_in_room.pop(as_id, None)
for (etype, state_key), event_id in current_state_ids.iteritems():
if etype != EventTypes.Member:
continue
if not appservice.is_interested_in_user(state_key):
continue
event = yield self.store.get_event(event_id)
if event.membership == Membership.JOIN:
self.appservices_in_room[as_id] = state_key
break
self.state_group = state_group
defer.returnValue(frozenset(self.appservices_in_room))

View File

@@ -48,13 +48,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["user_id", "device_id", "last_seen"],
)
self.register_background_index_update(
"user_ips_last_seen_index",
index_name="user_ips_last_seen",
table="user_ips",
columns=["user_id", "last_seen"],
)
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,24 +29,6 @@ _DEFAULT_ROLE_ID = ""
class GroupServerStore(SQLBaseStore):
def set_group_join_policy(self, group_id, join_policy):
"""Set the join policy of a group.
join_policy can be one of:
* "invite"
* "open"
"""
return self._simple_update_one(
table="groups",
keyvalues={
"group_id": group_id,
},
updatevalues={
"join_policy": join_policy,
},
desc="set_group_join_policy",
)
def get_group(self, group_id):
return self._simple_select_one(
table="groups",
@@ -55,11 +36,10 @@ class GroupServerStore(SQLBaseStore):
"group_id": group_id,
},
retcols=(
"name", "short_description", "long_description",
"avatar_url", "is_public", "join_policy",
"name", "short_description", "long_description", "avatar_url", "is_public"
),
allow_none=True,
desc="get_group",
desc="is_user_in_group",
)
def get_users_in_group(self, group_id, include_private=False):

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 48
SCHEMA_VERSION = 47
dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@@ -594,8 +594,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
SELECT stream_ordering, json FROM events
JOIN event_json USING (event_id)
SELECT stream_ordering, content FROM events
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@@ -607,8 +606,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
next_token = None
for stream_ordering, content_json in txn:
next_token = stream_ordering
event_json = json.loads(content_json)
content = event_json["content"]
content = json.loads(content_json)
content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url")

View File

@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
# We don't use `state_group`, its there so that we can cache based
# on it. However, its important that its never None, since two current_state's
@@ -645,9 +644,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
def add_membership_profile_txn(txn):
sql = ("""
SELECT stream_ordering, event_id, events.room_id, event_json.json
SELECT stream_ordering, event_id, events.room_id, content
FROM events
INNER JOIN event_json USING (event_id)
INNER JOIN room_memberships USING (event_id)
WHERE ? <= stream_ordering AND stream_ordering < ?
AND type = 'm.room.member'
@@ -668,8 +666,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
event_id = row["event_id"]
room_id = row["room_id"]
try:
event_json = json.loads(row["json"])
content = event_json['content']
content = json.loads(row["content"])
except Exception:
continue

View File

@@ -1,17 +0,0 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('user_ips_last_seen_index', '{}');

View File

@@ -1,22 +0,0 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This isn't a real ENUM because sqlite doesn't support it
* and we use a default of NULL for inserted rows and interpret
* NULL at the python store level as necessary so that existing
* rows are given the correct default policy.
*/
ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';

View File

@@ -75,9 +75,8 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id, room_id, type, json, "
"SELECT stream_ordering, event_id, room_id, type, content, "
" origin_server_ts FROM events"
" JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -105,8 +104,7 @@ class SearchStore(BackgroundUpdateStore):
stream_ordering = row["stream_ordering"]
origin_server_ts = row["origin_server_ts"]
try:
event_json = json.loads(row["json"])
content = event_json["content"]
content = json.loads(row["content"])
except Exception:
continue

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,11 +39,12 @@ _CacheSentinel = object()
class CacheEntry(object):
__slots__ = [
"deferred", "callbacks", "invalidated"
"deferred", "sequence", "callbacks", "invalidated"
]
def __init__(self, deferred, callbacks):
def __init__(self, deferred, sequence, callbacks):
self.deferred = deferred
self.sequence = sequence
self.callbacks = set(callbacks)
self.invalidated = False
@@ -62,6 +62,7 @@ class Cache(object):
"max_entries",
"name",
"keylen",
"sequence",
"thread",
"metrics",
"_pending_deferred_cache",
@@ -79,6 +80,7 @@ class Cache(object):
self.name = name
self.keylen = keylen
self.sequence = 0
self.thread = None
self.metrics = register_cache(name, self.cache)
@@ -111,10 +113,11 @@ class Cache(object):
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
val.callbacks.update(callbacks)
if update_metrics:
self.metrics.inc_hits()
return val.deferred
if val.sequence == self.sequence:
val.callbacks.update(callbacks)
if update_metrics:
self.metrics.inc_hits()
return val.deferred
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
if val is not _CacheSentinel:
@@ -134,9 +137,12 @@ class Cache(object):
self.check_thread()
entry = CacheEntry(
deferred=value,
sequence=self.sequence,
callbacks=callbacks,
)
entry.callbacks.update(callbacks)
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry:
existing_entry.invalidate()
@@ -144,25 +150,13 @@ class Cache(object):
self._pending_deferred_cache[key] = entry
def shuffle(result):
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry is entry:
self.cache.set(key, result, entry.callbacks)
if self.sequence == entry.sequence:
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry is entry:
self.cache.set(key, result, entry.callbacks)
else:
entry.invalidate()
else:
# oops, the _pending_deferred_cache has been updated since
# we started our query, so we are out of date.
#
# Better put back whatever we took out. (We do it this way
# round, rather than peeking into the _pending_deferred_cache
# and then removing on a match, to make the common case faster)
if existing_entry is not None:
self._pending_deferred_cache[key] = existing_entry
# we're not going to put this entry into the cache, so need
# to make sure that the invalidation callbacks are called.
# That was probably done when _pending_deferred_cache was
# updated, but it's possible that `set` was called without
# `invalidate` being previously called, in which case it may
# not have been. Either way, let's double-check now.
entry.invalidate()
return result
@@ -174,29 +168,25 @@ class Cache(object):
def invalidate(self, key):
self.check_thread()
self.cache.pop(key, None)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
# for future queries and (b) stop it being persisted as a proper entry
# in self.cache.
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
entry = self._pending_deferred_cache.pop(key, None)
# run the invalidation callbacks now, rather than waiting for the
# deferred to resolve.
if entry:
entry.invalidate()
self.cache.pop(key, None)
def invalidate_many(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
self.sequence += 1
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
@@ -204,10 +194,8 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
self.sequence += 1
self.cache.clear()
for entry in self._pending_deferred_cache.itervalues():
entry.invalidate()
self._pending_deferred_cache.clear()
class _CacheDescriptorBase(object):

View File

@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
self.event.sender = "@irc_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event)))
self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_user_id_prefix_no_match(self):
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
self.event.sender = "@someone_else:matrix.org"
self.assertFalse((yield self.service.is_interested(self.event)))
self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_member_is_checked(self):
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.event.sender = "@someone_else:matrix.org"
self.event.type = "m.room.member"
self.event.state_key = "@irc_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event)))
self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_id_match(self):
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event)))
self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_id_no_match(self):
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
self.assertFalse((yield self.service.is_interested(self.event)))
self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_alias_match(self):
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.store.get_aliases_for_room.return_value = [
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
]
self.store.get_users_in_room.return_value = []
self.store.get_appservices_with_user_in_room.return_value = []
self.assertFalse((yield self.service.is_interested(
self.event, self.store
)))
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
}
self.event.state_key = self.service.sender
self.assertTrue((yield self.service.is_interested(self.event)))
@defer.inlineCallbacks
def test_member_list_match(self):
self.service.namespaces[ApplicationService.NS_USERS].append(
_regex("@irc_.*")
)
self.store.get_users_in_room.return_value = [
"@alice:here",
"@irc_fo:here", # AS user
"@bob:here",
]
self.store.get_aliases_for_room.return_value = []
self.event.sender = "@xmpp_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(
event=self.event, store=self.store
)))

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial
import logging
import mock
@@ -27,50 +25,6 @@ from tests import unittest
logger = logging.getLogger(__name__)
class CacheTestCase(unittest.TestCase):
def test_invalidate_all(self):
cache = descriptors.Cache("testcache")
callback_record = [False, False]
def record_callback(idx):
callback_record[idx] = True
# add a couple of pending entries
d1 = defer.Deferred()
cache.set("key1", d1, partial(record_callback, 0))
d2 = defer.Deferred()
cache.set("key2", d2, partial(record_callback, 1))
# lookup should return the deferreds
self.assertIs(cache.get("key1"), d1)
self.assertIs(cache.get("key2"), d2)
# let one of the lookups complete
d2.callback("result2")
self.assertEqual(cache.get("key2"), "result2")
# now do the invalidation
cache.invalidate_all()
# lookup should return none
self.assertIsNone(cache.get("key1", None))
self.assertIsNone(cache.get("key2", None))
# both callbacks should have been callbacked
self.assertTrue(
callback_record[0], "Invalidation callback for key1 not called",
)
self.assertTrue(
callback_record[1], "Invalidation callback for key2 not called",
)
# letting the other lookup complete should do nothing
d1.callback("result1")
self.assertIsNone(cache.get("key1", None))
class DescriptorTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_cache(self):