1
0

Compare commits

...

82 Commits

Author SHA1 Message Date
Matrix 8d910ff5b9 Local changes 2017-02-24 16:01:57 +00:00
Erik Johnston cff886c47b Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-21 14:01:45 +00:00
Erik Johnston a24492c06f Don't limit count 2017-02-21 14:01:33 +00:00
Erik Johnston b7442c3e2b Store looping call 2017-02-21 13:59:25 +00:00
Erik Johnston a3708a1885 Merge branch 'master' of github.com:matrix-org/synapse into develop 2017-02-21 13:46:27 +00:00
Richard van der Hoff c927d6de9b Merge pull request #1930 from matrix-org/rav/fix_txnq_race
Fix a race in transaction queue
2017-02-21 08:46:38 +00:00
Richard van der Hoff 0c4cf9372b Fix a race in transaction queue
It was theoretically possible for a PDU to get queued and not sent for ages. On
closer inspection I think there were bigger problems elsewhere, but we might as
well fix this since it's easy.
2017-02-20 16:46:25 +00:00
Erik Johnston b5c268738b Merge pull request #1929 from matrix-org/erikj/context_fix
Fix /context/ visibiltiy rules
2017-02-20 16:19:19 +01:00
Erik Johnston 17673404fb Remove unused param 2017-02-20 15:02:01 +00:00
Erik Johnston 7f026792e1 Fix /context/ visibiltiy rules 2017-02-20 14:54:50 +00:00
Richard van der Hoff 11940d462a Merge remote-tracking branch 'origin/master' into develop 2017-02-20 09:14:43 +00:00
Erik Johnston 699be7d1be Fix up notif rotation 2017-02-18 14:42:39 +00:00
Erik Johnston ede125b7e1 Fix up notif rotation 2017-02-18 14:41:31 +00:00
Richard van der Hoff 2fa14fd48a Merge pull request #1926 from matrix-org/rav/example_log_config
Add an example log_config file
2017-02-17 14:41:48 +00:00
Richard van der Hoff 66eb0bd548 Update example_log_config.yaml
add trailing NL
2017-02-17 12:55:36 +00:00
Richard van der Hoff 5aae844e60 Add an example log_config file 2017-02-17 12:48:53 +00:00
David Baker ec8d7603e6 Merge pull request #1925 from matrix-org/dbkr/pushers_lang_lengthen
Make the pushers lang field column longer
2017-02-17 11:29:06 +00:00
David Baker 8c87bb550e Merge pull request #1922 from matrix-org/dbkr/allow_forget_for_ban
Allow forgetting rooms you're banned from
2017-02-17 10:52:30 +00:00
David Baker 4aa29508af Use TEXT rather than VARCHAR
While we're changing anyway
2017-02-17 10:51:49 +00:00
David Baker b4017539d4 Make the pushers lang field column longer
To accommodate things like zh-Hans-CN

Fixes https://github.com/vector-im/riot-ios/issues/1031
2017-02-17 10:42:57 +00:00
Erik Johnston c97d491649 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-16 15:12:52 +00:00
Erik Johnston b6557f2cfe Merge pull request #1923 from matrix-org/erikj/push_action_compress
Store the default push actions in a more efficient manner
2017-02-16 16:07:50 +01:00
Erik Johnston 138e030cfe Comment 2017-02-16 15:03:36 +00:00
Erik Johnston 502ae6c663 Comment 2017-02-16 14:47:11 +00:00
Erik Johnston e6acf0c399 Store the default push actions in a more efficient manner 2017-02-16 14:40:24 +00:00
Erik Johnston 04eca2589d Merge pull request #1916 from matrix-org/erikj/push_actions_delete
Aggregate event push actions
2017-02-16 15:28:58 +01:00
David Baker 474c9aadbe Allow forgetting rooms you're banned from 2017-02-15 19:32:20 +00:00
Richard van der Hoff 7dcbcca68c Merge pull request #1921 from matrix-org/rav/fix_key_changes
Fix bugs in the /keys/changes api
2017-02-15 11:25:16 +00:00
Erik Johnston 3b42bb96e4 Merge commit 'd7457c7661fa3b28427b21f44252c3abbee45ef8' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-15 10:17:01 +00:00
Erik Johnston 738000971e Merge branch 'develop' into matrix-org-hotfixes 2017-02-15 09:54:09 +00:00
David Baker fa467e62a9 Merge pull request #1917 from matrix-org/dbkr/make_ban_reasons_work
Make kick & ban reasons work
2017-02-14 16:10:06 +00:00
David Baker 355d62c499 Make kick & ban reasons work
We somehow specced APIs with reason strings, preserve the content
in the events  and even have the clients display them, but failed
to actually pass the parameter through to the event content.
2017-02-14 15:10:55 +00:00
Richard van der Hoff fc2f29c1d0 Fix bugs in the /keys/changes api
* `get_forward_extremeties_for_room` takes a numeric `stream_ordering`. We were
  passing a `RoomStreamToken`, which meant that it returned the *current*
  extremities, rather than those corresponding to the `from_token`. However:
* `get_state_ids_for_events` required a second ('types') parameter; this meant
  that a `TypeError` was thrown and we ended up acting as though there was *no*
  prev state.
* `get_state_ids_for_events` actually returns a map from event_id to state
  dictionary - just looking up the state keys in it again meant that we acted
  as though there was no prev state. We now check if each member's state has
  changed since *any* of the extremities.

Also add/fix some comments.
2017-02-14 13:59:50 +00:00
Erik Johnston ce3c8df6df Less aggressive timers 2017-02-14 13:41:24 +00:00
Erik Johnston 095b45c165 Aggregate event push actions 2017-02-14 13:39:41 +00:00
Erik Johnston 795f8e3fe7 Merge pull request #1873 from matrix-org/erikj/delete_push_actions
Be more agressive about purging old room event_push_actions
2017-02-14 14:29:04 +01:00
Erik Johnston d7457c7661 Merge pull request #1914 from matrix-org/erikj/cache_presence
Cache get_presence storage
2017-02-13 16:59:19 +01:00
Erik Johnston 359c97f506 Merge pull request #1913 from matrix-org/kegan/dont-cache-errors
http txns: Do not cache error responses
2017-02-13 16:29:19 +01:00
Erik Johnston 9e617cd4c2 Cache get_presence storage 2017-02-13 13:50:03 +00:00
Kegan Dougal d0497425f8 Ordering is important on errbacks so add the cleanup func before creating an ObservableDeferred 2017-02-13 13:49:44 +00:00
Kegan Dougal 808ddf0ae7 Pop the txn from the map in case it has already been deleted somehow 2017-02-13 13:36:15 +00:00
Kegan Dougal feb15dc99f Don't cache errors at all 2017-02-13 13:33:12 +00:00
Kegan Dougal ecd7e36047 http txns: Do not cache error responses
Previously we did. This meant that, amongst other errors, rate-limiting errors
would be cached and prevent messages with that txn ID being sent.
2017-02-13 13:16:48 +00:00
Erik Johnston 6bba80241c Merge pull request #1912 from matrix-org/markjh/roominitialsync
Add db functions needed for room initial sync to slave
2017-02-13 12:20:21 +01:00
Mark Haines 3a46280ca3 Add db functions needed for room initial sync to slave 2017-02-13 11:16:53 +00:00
Erik Johnston e1a12e24d2 Merge pull request #1907 from andrewshadura/develop
Use signedjson.sign instead of syutil.crypto.jsonsign
2017-02-13 11:54:24 +01:00
Andrew Shadura 6a3743b0d4 Use signedjson.sign instead of syutil.crypto.jsonsign
Functions from syutil.crypto.jsonsign are now available in
signedjson, so use that instead of depending on syutil.

Signed-off-by: Andrew Shadura <andrew@shadura.me>
2017-02-13 11:31:22 +01:00
Erik Johnston 481f6c87e7 Merge pull request #1906 from tyler-smith/TS_fix_config_documentation
Fix typo in config comments.
2017-02-13 11:01:03 +01:00
Tyler Smith df4407d665 Fix typo in config comments.
Signed-off-by: Tyler Smith <tylersmith.me@gmail.com>
2017-02-11 23:02:57 -08:00
Erik Johnston 96ef35c62f Merge branch 'master' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-09 13:50:38 +00:00
Erik Johnston 5c3cb8778a Merge pull request #1896 from DanielDent/patch-1
Update CAPTCHA_SETUP.rst X-Forwarded-For docs
2017-02-09 14:20:15 +01:00
Erik Johnston 1beda9c8a7 Merge branch 'master' of github.com:matrix-org/synapse into develop 2017-02-09 10:46:58 +00:00
Daniel Dent fdbd90e25d Update CAPTCHA_SETUP.rst X-Forwarded-For docs
It looks like CAPTCHA_SETUP.rst contains information relevant to an old version of Synapse, but Synapse now has a different approach to configuring use of the X-Forwarded-For header.
2017-02-08 21:21:02 -08:00
Erik Johnston 52cd019a54 Make None check explicit 2017-02-08 16:04:29 +00:00
Erik Johnston f20cd34858 Merge pull request #1892 from matrix-org/erikj/rejection_fwd_extrem
Ignore new rejected events when working out forward extremities.
2017-02-08 16:59:06 +01:00
David Baker 9adcd3a514 Merge pull request #1891 from matrix-org/dbkr/remove_unused_constants
Remove a few aspirational but unused constants
2017-02-08 13:30:40 +00:00
David Baker 063a1251a9 Remove a few aspirational but unused constants
from the Kegan era
2017-02-08 11:36:08 +00:00
Erik Johnston af6da6db2d Merge pull request #1784 from morteza-araby/user-admin
Administration functionalities
2017-02-06 16:21:10 +01:00
Erik Johnston 131c0134f5 Merge branch 'master' of github.com:matrix-org/synapse into develop 2017-02-04 15:16:05 +00:00
Erik Johnston 83c7b8ec91 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-02 19:26:05 +00:00
Morteza Araby 2849d3f29d admin,storage: added more administrator functionalities
administrators can now:
 - Set displayname of users
 - Update user avatars
 - Search for users by user_id
 - Browse all users in a paginated API
 - Reset user passwords
 - Deactivate users

Helpers for doing paginated queries has also been added to storage

Signed-off-by: Morteza Araby <morteza.araby@ericsson.com>
2017-02-02 14:02:26 +01:00
Erik Johnston f57cb21952 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-02 09:30:20 +00:00
Erik Johnston fac3c03087 Be more agressive about purging old room event_push_actions 2017-02-01 18:27:24 +00:00
Erik Johnston 4abecb7b02 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-01 14:33:28 +00:00
Erik Johnston 010159365c Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-31 11:20:51 +00:00
Erik Johnston 717e4448c4 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-30 14:36:46 +00:00
Erik Johnston e41f183aa8 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-20 14:00:31 +00:00
Erik Johnston 10f7bfe897 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-11 10:57:42 +00:00
Erik Johnston 772b8ebe54 PEP8 2017-01-10 16:38:00 +00:00
Erik Johnston 3c9acdef4d Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-10 16:31:06 +00:00
Erik Johnston fe28150cdc Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-10 16:16:17 +00:00
Mark Haines 69ef497253 Log which files we saved attachments to in the media_repository 2017-01-10 14:29:17 +00:00
Matrix 1d08de6ce9 Merge branch 'release-v0.18.7' into matrix-org-hotfixes 2017-01-07 04:03:15 +00:00
Matrix 10d31c433b Merge branch 'release-v0.18.7' into matrix-org-hotfixes 2017-01-07 03:12:35 +00:00
Mark Haines 345afd9fdc Merge tag 'v0.18.6-rc3' into matrix-org-hotfixes
v0.18.6-rc3
2017-01-05 14:19:36 +00:00
Matrix 7ed07066ac Merge branch 'markjh/linearizer_logging' into matrix-org-hotfixes 2017-01-01 13:55:13 +00:00
Mark Haines 2f703b8645 Merge branch 'markjh/fix_get_missing' into matrix-org-hotfixes 2016-12-30 18:47:10 +00:00
Mark Haines 9fff1aacca Merge remote-tracking branch 'origin/release-v0.18.6' into matrix-org-hotfixes 2016-12-30 14:04:16 +00:00
Mark Haines cb842dc99f Deleting from event_push_actions needs to use an index 2016-12-29 16:54:03 +00:00
Mark Haines 2238a10b42 Merge branch 'release-v0.18.6' into matrix-org-hotfixes 2016-12-29 16:26:52 +00:00
Matthew Hodgson 2998fa57b0 merge uncommitted changes from matrix.org into hotfixes-v0.18.5 2016-12-20 12:14:08 +00:00
Richard van der Hoff f5abaafabd uncommitted changes on matrix.org
These might be important? who knows?
2016-12-20 11:49:25 +00:00
35 changed files with 1147 additions and 119 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ import urlparse
import nacl.signing
import nacl.encoding
from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException
from signedjson.sign import verify_signed_json, SignatureVerifyException
CONFIG_JSON = "cmdclient_config.json"
+48
View File
@@ -0,0 +1,48 @@
# Example log_config file for synapse. To enable, point `log_config` to it in
# `homeserver.yaml`, and restart synapse.
#
# This configuration will produce similar results to the defaults within
# synapse, but can be edited to give more flexibility.
version: 1
formatters:
fmt:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
filters:
context:
(): synapse.util.logcontext.LoggingContextFilter
request: ""
handlers:
# example output to console
console:
class: logging.StreamHandler
filters: [context]
# example output to file - to enable, edit 'root' config below.
file:
class: logging.handlers.RotatingFileHandler
formatter: fmt
filename: /var/log/synapse/homeserver.log
maxBytes: 100000000
backupCount: 3
filters: [context]
root:
level: INFO
handlers: [console] # to use file handler instead, switch to [file]
loggers:
synapse:
level: INFO
synapse.storage:
level: INFO
# example of enabling debugging for a component:
#
# synapse.federation.transport.server:
# level: DEBUG
+2 -3
View File
@@ -25,6 +25,5 @@ Configuring IP used for auth
The ReCaptcha API requires that the IP address of the user who solved the
captcha is sent. If the client is connecting through a proxy or load balancer,
it may be required to use the X-Forwarded-For (XFF) header instead of the origin
IP address. This can be configured as an option on the home server like so::
captcha_ip_origin_is_x_forwarded: true
IP address. This can be configured using the x_forwarded directive in the
listeners section of the homeserver.yaml configuration file.
-3
View File
@@ -43,9 +43,6 @@ class JoinRules(object):
class LoginType(object):
PASSWORD = u"m.login.password"
OAUTH = u"m.login.oauth2"
EMAIL_CODE = u"m.login.email.code"
EMAIL_URL = u"m.login.email.url"
EMAIL_IDENTITY = u"m.login.email.identity"
RECAPTCHA = u"m.login.recaptcha"
DUMMY = u"m.login.dummy"
+4
View File
@@ -87,6 +87,10 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["who_forgot_in_room"]
)
did_forget = (
RoomMemberStore.__dict__["did_forget"]
)
# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
+1 -1
View File
@@ -95,7 +95,7 @@ class TlsConfig(Config):
# make HTTPS requests to this server will check that the TLS
# certificates returned by this server match one of the fingerprints.
#
# Synapse automatically adds its the fingerprint of its own certificate
# Synapse automatically adds the fingerprint of its own certificate
# to the list. So if federation traffic is handle directly by synapse
# then no modification to the list is required.
#
+21 -9
View File
@@ -303,18 +303,10 @@ class TransactionQueue(object):
try:
self.pending_transactions[destination] = 1
# XXX: what's this for?
yield run_on_reactor()
while True:
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
limiter = yield get_retry_limiter(
destination,
self.clock,
@@ -326,6 +318,24 @@ class TransactionQueue(object):
yield self._get_new_device_messages(destination)
)
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# pending_transactions flag.
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
@@ -355,6 +365,8 @@ class TransactionQueue(object):
)
return
# END CRITICAL SECTION
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
limiter=limiter,
+43 -1
View File
@@ -19,7 +19,6 @@ from ._base import BaseHandler
import logging
logger = logging.getLogger(__name__)
@@ -54,3 +53,46 @@ class AdminHandler(BaseHandler):
}
defer.returnValue(ret)
@defer.inlineCallbacks
def get_users(self):
"""Function to reterive a list of users in users table.
Args:
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = yield self.store.get_users()
defer.returnValue(ret)
@defer.inlineCallbacks
def get_users_paginate(self, order, start, limit):
"""Function to reterive a paginated list of users from
users list. This will return a json object, which contains
list of users and the total number of users in users table.
Args:
order (str): column name to order the select by this column
start (int): start number to begin the query from
limit (int): number of rows to reterive
Returns:
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
"""
ret = yield self.store.get_users_paginate(order, start, limit)
defer.returnValue(ret)
@defer.inlineCallbacks
def search_users(self, term):
"""Function to search users list for one or more users with
the matched term.
Args:
term (str): search term
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = yield self.store.search_users(term)
defer.returnValue(ret)
+30 -10
View File
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.util import stringutils
@@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key).stream
possibly_changed = set(changed)
for room_id in rooms_changed:
# Fetch the current state at the time.
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
# Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering
)
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
except:
prev_state_ids = {}
except errors.StoreError:
# we have purged the stream_ordering index since the stream
# ordering: treat it the same as a new room
event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype == EventTypes.Member:
prev_event_id = prev_state_ids.get(key, None)
if etype != EventTypes.Member:
continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key)
break
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
@@ -307,7 +327,7 @@ class DeviceHandler(BaseHandler):
user_id
)
logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
if str(extremity) == str(prev_ids[0]):
if extremity and prev_ids[0] and int(extremity) >= int(prev_ids[0]):
resync = False
if resync:
+1
View File
@@ -363,6 +363,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
+4 -1
View File
@@ -373,6 +373,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -402,6 +403,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -463,6 +465,7 @@ class PresenceHandler(object):
syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server.
"""
return
# Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = (
@@ -531,7 +534,7 @@ class PresenceHandler(object):
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
states.update({state.user_id: state for state in res})
states.update(res)
missing = [user_id for user_id, state in states.items() if not state]
if missing:
+1 -1
View File
@@ -42,7 +42,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
+3 -1
View File
@@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler):
)
membership = member.membership if member else None
if membership is not None and membership != Membership.LEAVE:
if membership is not None and membership not in [
Membership.LEAVE, Membership.BAN
]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))
+1 -1
View File
@@ -539,7 +539,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if False or not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
+2 -1
View File
@@ -218,7 +218,8 @@ class EmailPusher(object):
)
def seconds_until(self, ts_msec):
return (ts_msec - self.clock.time_msec()) / 1000
secs = (ts_msec - self.clock.time_msec()) / 1000
return max(secs, 0) # Ensure non-negative
def get_room_throttle_ms(self, room_id):
if room_id in self.throttle_params:
@@ -46,6 +46,12 @@ class SlavedAccountDataStore(BaseSlavedStore):
)
get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
get_tags_for_room = (
DataStore.get_tags_for_room.__func__
)
get_account_data_for_room = (
DataStore.get_account_data_for_room.__func__
)
get_updated_tags = DataStore.get_updated_tags.__func__
get_updated_account_data_for_user = (
@@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.storage import DataStore
from synapse.storage.presence import PresenceStore
class SlavedPresenceStore(BaseSlavedStore):
@@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore):
_get_active_presence = DataStore._get_active_presence.__func__
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
get_presence_for_users = DataStore.get_presence_for_users.__func__
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
+11 -3
View File
@@ -87,9 +87,17 @@ class HttpTransactionCache(object):
deferred = fn(*args, **kwargs)
# We don't add an errback to the raw deferred, so we ask ObservableDeferred
# to swallow the error. This is fine as the error will still be reported
# to the observers.
# if the request fails with a Twisted failure, remove it
# from the transaction map. This is done to ensure that we don't
# cache transient errors like rate-limiting errors, etc.
def remove_from_map(err):
self.transactions.pop(txn_key, None)
return err
deferred.addErrback(remove_from_map)
# We don't add any other errbacks to the raw deferred, so we ask
# ObservableDeferred to swallow the error. This is fine as the error will
# still be reported to the observers.
observable = ObservableDeferred(deferred, consumeErrors=True)
self.transactions[txn_key] = (observable, self.clock.time_msec())
return observable.observe()
+220
View File
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID
from synapse.http.servlet import parse_json_object_from_request
from .base import ClientV1RestServlet, client_path_patterns
@@ -25,6 +26,34 @@ import logging
logger = logging.getLogger(__name__)
class UsersRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/users/(?P<user_id>[^/]*)")
def __init__(self, hs):
super(UsersRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
@defer.inlineCallbacks
def on_GET(self, request, user_id):
target_user = UserID.from_string(user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
# To allow all users to get the users list
# if not is_admin and target_user != auth_user:
# raise AuthError(403, "You are not a server admin")
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
ret = yield self.handlers.admin_handler.get_users()
defer.returnValue((200, ret))
class WhoisRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
@@ -128,8 +157,199 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
defer.returnValue((200, {}))
class ResetPasswordRestServlet(ClientV1RestServlet):
"""Post request to allow an administrator reset password for a user.
This need a user have a administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
@user:to_reset_password?access_token=admin_access_token
JsonBodyToSend:
{
"new_password": "secret"
}
Returns:
200 OK with empty object if success otherwise an error.
"""
PATTERNS = client_path_patterns("/admin/reset_password/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
super(ResetPasswordRestServlet, self).__init__(hs)
self.hs = hs
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
"""Post request to allow an administrator reset password for a user.
This need a user have a administrator access in Synapse.
"""
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
params = parse_json_object_from_request(request)
new_password = params['new_password']
if not new_password:
raise SynapseError(400, "Missing 'new_password' arg")
logger.info("new_password: %r", new_password)
yield self.auth_handler.set_password(
target_user_id, new_password, requester
)
defer.returnValue((200, {}))
class GetUsersPaginatedRestServlet(ClientV1RestServlet):
"""Get request to get specific number of users from Synapse.
This need a user have a administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
@admin:user?access_token=admin_access_token&start=0&limit=10
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = client_path_patterns("/admin/users_paginate/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
super(GetUsersPaginatedRestServlet, self).__init__(hs)
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
@defer.inlineCallbacks
def on_GET(self, request, target_user_id):
"""Get request to get specific number of users from Synapse.
This need a user have a administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
# To allow all users to get the users list
# if not is_admin and target_user != auth_user:
# raise AuthError(403, "You are not a server admin")
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
order = "name" # order by name in user table
start = request.args.get("start")[0]
limit = request.args.get("limit")[0]
if not limit:
raise SynapseError(400, "Missing 'limit' arg")
if not start:
raise SynapseError(400, "Missing 'start' arg")
logger.info("limit: %s, start: %s", limit, start)
ret = yield self.handlers.admin_handler.get_users_paginate(
order, start, limit
)
defer.returnValue((200, ret))
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
"""Post request to get specific number of users from Synapse..
This need a user have a administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
@admin:user?access_token=admin_access_token
JsonBodyToSend:
{
"start": "0",
"limit": "10
}
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
order = "name" # order by name in user table
params = parse_json_object_from_request(request)
limit = params['limit']
start = params['start']
if not limit:
raise SynapseError(400, "Missing 'limit' arg")
if not start:
raise SynapseError(400, "Missing 'start' arg")
logger.info("limit: %s, start: %s", limit, start)
ret = yield self.handlers.admin_handler.get_users_paginate(
order, start, limit
)
defer.returnValue((200, ret))
class SearchUsersRestServlet(ClientV1RestServlet):
"""Get request to search user table for specific users according to
search term.
This need a user have a administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/search_users/
@admin:user?access_token=admin_access_token&term=alice
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = client_path_patterns("/admin/search_users/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
super(SearchUsersRestServlet, self).__init__(hs)
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
@defer.inlineCallbacks
def on_GET(self, request, target_user_id):
"""Get request to search user table for specific users according to
search term.
This need a user have a administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
# To allow all users to get the users list
# if not is_admin and target_user != auth_user:
# raise AuthError(403, "You are not a server admin")
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
term = request.args.get("term")[0]
if not term:
raise SynapseError(400, "Missing 'term' arg")
logger.info("term: %s ", term)
ret = yield self.handlers.admin_handler.search_users(
term
)
defer.returnValue((200, ret))
def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
UsersRestServlet(hs).register(http_server)
ResetPasswordRestServlet(hs).register(http_server)
GetUsersPaginatedRestServlet(hs).register(http_server)
SearchUsersRestServlet(hs).register(http_server)
+4 -2
View File
@@ -46,6 +46,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
def on_PUT(self, request, user_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
user = UserID.from_string(user_id)
is_admin = yield self.auth.is_server_admin(requester.user)
content = parse_json_object_from_request(request)
@@ -55,7 +56,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
defer.returnValue((400, "Unable to parse name"))
yield self.handlers.profile_handler.set_displayname(
user, requester, new_name)
user, requester, new_name, is_admin)
defer.returnValue((200, {}))
@@ -88,6 +89,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
def on_PUT(self, request, user_id):
requester = yield self.auth.get_user_by_req(request)
user = UserID.from_string(user_id)
is_admin = yield self.auth.is_server_admin(requester.user)
content = parse_json_object_from_request(request)
try:
@@ -96,7 +98,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
defer.returnValue((400, "Unable to parse name"))
yield self.handlers.profile_handler.set_avatar_url(
user, requester, new_name)
user, requester, new_name, is_admin)
defer.returnValue((200, {}))
+6
View File
@@ -474,6 +474,7 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, room_id):
# raise RuntimeError("Guest access has been disabled")
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
content = yield self.initial_sync_handler.room_initial_sync(
@@ -608,6 +609,10 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Missing user_id key.")
target = UserID.from_string(content["user_id"])
event_content = None
if 'reason' in content and membership_action in ['kick', 'ban']:
event_content = {'reason': content['reason']}
yield self.handlers.room_member_handler.update_membership(
requester=requester,
target=target,
@@ -615,6 +620,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
action=membership_action,
txn_id=txn_id,
third_party_signed=content.get("third_party_signed", None),
content=event_content,
)
defer.returnValue((200, {}))
+76
View File
@@ -297,6 +297,82 @@ class DataStore(RoomMemberStore, RoomStore,
desc="get_user_ip_and_agents",
)
def get_users(self):
"""Function to reterive a list of users in users table.
Args:
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
return self._simple_select_list(
table="users",
keyvalues={},
retcols=[
"name",
"password_hash",
"is_guest",
"admin"
],
desc="get_users",
)
def get_users_paginate(self, order, start, limit):
"""Function to reterive a paginated list of users from
users list. This will return a json object, which contains
list of users and the total number of users in users table.
Args:
order (str): column name to order the select by this column
start (int): start number to begin the query from
limit (int): number of rows to reterive
Returns:
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
"""
is_guest = 0
i_start = (int)(start)
i_limit = (int)(limit)
return self.get_user_list_paginate(
table="users",
keyvalues={
"is_guest": is_guest
},
pagevalues=[
order,
i_limit,
i_start
],
retcols=[
"name",
"password_hash",
"is_guest",
"admin"
],
desc="get_users_paginate",
)
def search_users(self, term):
"""Function to search users list for one or more users with
the matched term.
Args:
term (str): search term
col (str): column to query term should be matched to
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
return self._simple_search_list(
table="users",
term=term,
col="name",
retcols=[
"name",
"password_hash",
"is_guest",
"admin"
],
desc="search_users",
)
def are_all_users_on_domain(txn, database_engine, domain):
sql = database_engine.convert_param_style(
+159
View File
@@ -934,6 +934,165 @@ class SQLBaseStore(object):
else:
return 0
def _simple_select_list_paginate(self, table, keyvalues, pagevalues, retcols,
desc="_simple_select_list_paginate"):
"""Executes a SELECT query on the named table with start and limit,
of row numbers, which may return zero or number of rows from start to limit,
returning the result as a list of dicts.
Args:
table (str): the table name
keyvalues (dict[str, Any] | None):
column names and values to select the rows with, or None to not
apply a WHERE clause.
retcols (iterable[str]): the names of the columns to return
order (str): order the select by this column
start (int): start number to begin the query from
limit (int): number of rows to reterive
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
return self.runInteraction(
desc,
self._simple_select_list_paginate_txn,
table, keyvalues, pagevalues, retcols
)
@classmethod
def _simple_select_list_paginate_txn(cls, txn, table, keyvalues, pagevalues, retcols):
"""Executes a SELECT query on the named table with start and limit,
of row numbers, which may return zero or number of rows from start to limit,
returning the result as a list of dicts.
Args:
txn : Transaction object
table (str): the table name
keyvalues (dict[str, T] | None):
column names and values to select the rows with, or None to not
apply a WHERE clause.
pagevalues ([]):
order (str): order the select by this column
start (int): start number to begin the query from
limit (int): number of rows to reterive
retcols (iterable[str]): the names of the columns to return
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
if keyvalues:
sql = "SELECT %s FROM %s WHERE %s ORDER BY %s" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k,) for k in keyvalues),
" ? ASC LIMIT ? OFFSET ?"
)
txn.execute(sql, keyvalues.values() + pagevalues)
else:
sql = "SELECT %s FROM %s ORDER BY %s" % (
", ".join(retcols),
table,
" ? ASC LIMIT ? OFFSET ?"
)
txn.execute(sql, pagevalues)
return cls.cursor_to_dict(txn)
@defer.inlineCallbacks
def get_user_list_paginate(self, table, keyvalues, pagevalues, retcols,
desc="get_user_list_paginate"):
"""Get a list of users from start row to a limit number of rows. This will
return a json object with users and total number of users in users list.
Args:
table (str): the table name
keyvalues (dict[str, Any] | None):
column names and values to select the rows with, or None to not
apply a WHERE clause.
pagevalues ([]):
order (str): order the select by this column
start (int): start number to begin the query from
limit (int): number of rows to reterive
retcols (iterable[str]): the names of the columns to return
Returns:
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
"""
users = yield self.runInteraction(
desc,
self._simple_select_list_paginate_txn,
table, keyvalues, pagevalues, retcols
)
count = yield self.runInteraction(
desc,
self.get_user_count_txn
)
retval = {
"users": users,
"total": count
}
defer.returnValue(retval)
def get_user_count_txn(self, txn):
"""Get a total number of registerd users in the users list.
Args:
txn : Transaction object
Returns:
defer.Deferred: resolves to int
"""
sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
txn.execute(sql_count)
count = txn.fetchone()[0]
defer.returnValue(count)
def _simple_search_list(self, table, term, col, retcols,
desc="_simple_search_list"):
"""Executes a SELECT query on the named table, which may return zero or
more rows, returning the result as a list of dicts.
Args:
table (str): the table name
term (str | None):
term for searching the table matched to a column.
col (str): column to query term should be matched to
retcols (iterable[str]): the names of the columns to return
Returns:
defer.Deferred: resolves to list[dict[str, Any]] or None
"""
return self.runInteraction(
desc,
self._simple_search_list_txn,
table, term, col, retcols
)
@classmethod
def _simple_search_list_txn(cls, txn, table, term, col, retcols):
"""Executes a SELECT query on the named table, which may return zero or
more rows, returning the result as a list of dicts.
Args:
txn : Transaction object
table (str): the table name
term (str | None):
term for searching the table matched to a column.
col (str): column to query term should be matched to
retcols (iterable[str]): the names of the columns to return
Returns:
defer.Deferred: resolves to list[dict[str, Any]] or None
"""
if term:
sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (
", ".join(retcols),
table,
col
)
termvalues = ["%%" + term + "%%"]
txn.execute(sql, termvalues)
else:
return 0
return cls.cursor_to_dict(txn)
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
+1 -1
View File
@@ -93,7 +93,7 @@ class EndToEndKeyStore(SQLBaseStore):
query_clause = "user_id = ?"
query_params.append(user_id)
if device_id:
if device_id is not None:
query_clause += " AND device_id = ?"
query_params.append(device_id)
+16 -1
View File
@@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
)
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
Throws a StoreError if we have since purged the index for
stream_orderings from that point.
Args:
room_id (str):
stream_ordering (int):
Returns:
deferred, which resolves to a list of event_ids
"""
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart
# stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change)
# provided the last_change is recent enough, we now clamp the requested
# stream_ordering to it.
if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering)
+282 -72
View File
@@ -15,6 +15,7 @@
from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
@@ -25,11 +26,46 @@ import ujson as json
logger = logging.getLogger(__name__)
DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
DEFAULT_HIGHLIGHT_ACTION = [
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
]
def _serialize_action(actions, is_highlight):
"""Custom serializer for actions. This allows us to "compress" common actions.
We use the fact that most users have the same actions for notifs (and for
highlights).
We store these default actions as the empty string rather than the full JSON.
Since the empty string isn't valid JSON there is no risk of this clashing with
any real JSON actions
"""
if is_highlight:
if actions == DEFAULT_HIGHLIGHT_ACTION:
return "" # We use empty string as the column is non-NULL
else:
if actions == DEFAULT_NOTIF_ACTION:
return ""
return json.dumps(actions)
def _deserialize_action(actions, is_highlight):
"""Custom deserializer for actions. This allows us to "compress" common actions
"""
if actions:
return json.loads(actions)
if is_highlight:
return DEFAULT_HIGHLIGHT_ACTION
else:
return DEFAULT_NOTIF_ACTION
class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)
self.register_background_index_update(
@@ -47,6 +83,13 @@ class EventPushActionsStore(SQLBaseStore):
where_clause="highlight=1"
)
self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -55,15 +98,17 @@ class EventPushActionsStore(SQLBaseStore):
"""
values = []
for uid, actions in tuples:
is_highlight = 1 if _action_has_highlight(actions) else 0
values.append({
'room_id': event.room_id,
'event_id': event.event_id,
'user_id': uid,
'actions': json.dumps(actions),
'actions': _serialize_action(actions, is_highlight),
'stream_ordering': event.internal_metadata.stream_ordering,
'topological_ordering': event.depth,
'notif': 1,
'highlight': 1 if _action_has_highlight(actions) else 0,
'highlight': is_highlight,
})
for uid, __ in tuples:
@@ -77,67 +122,84 @@ class EventPushActionsStore(SQLBaseStore):
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
def _get_unread_event_push_actions_by_room(txn):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
topological_ordering = results[0][1]
token = RoomStreamToken(
topological_ordering, stream_ordering
)
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0
# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0
return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}
ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
_get_unread_event_push_actions_by_room
self._get_unread_counts_by_receipt_txn,
room_id, user_id, last_read_event_id
)
defer.returnValue(ret)
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
topological_ordering = results[0][1]
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering
)
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0
txn.execute("""
SELECT notif_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""", (room_id, user_id, stream_ordering,))
rows = txn.fetchall()
if rows:
notify_count += rows[0][0]
# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0
return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}
@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
def f(txn):
@@ -176,7 +238,8 @@ class EventPushActionsStore(SQLBaseStore):
# find rooms that have a read receipt in them and return the next
# push actions
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" ep.highlight "
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
@@ -217,7 +280,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts"
" ep.highlight "
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
@@ -246,7 +309,7 @@ class EventPushActionsStore(SQLBaseStore):
"event_id": row[0],
"room_id": row[1],
"stream_ordering": row[2],
"actions": json.loads(row[3]),
"actions": _deserialize_action(row[3], row[4]),
} for row in after_read_receipt + no_read_receipt
]
@@ -285,7 +348,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts"
" ep.highlight, e.received_ts"
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
@@ -327,7 +390,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts"
" ep.highlight, e.received_ts"
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
@@ -357,8 +420,8 @@ class EventPushActionsStore(SQLBaseStore):
"event_id": row[0],
"room_id": row[1],
"stream_ordering": row[2],
"actions": json.loads(row[3]),
"received_ts": row[4],
"actions": _deserialize_action(row[3], row[4]),
"received_ts": row[5],
} for row in after_read_receipt + no_read_receipt
]
@@ -392,7 +455,7 @@ class EventPushActionsStore(SQLBaseStore):
sql = (
"SELECT epa.event_id, epa.room_id,"
" epa.stream_ordering, epa.topological_ordering,"
" epa.actions, epa.profile_tag, e.received_ts"
" epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
" FROM event_push_actions epa, events e"
" WHERE epa.event_id = e.event_id"
" AND epa.user_id = ? %s"
@@ -407,7 +470,7 @@ class EventPushActionsStore(SQLBaseStore):
"get_push_actions_for_user", f
)
for pa in push_actions:
pa["actions"] = json.loads(pa["actions"])
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
defer.returnValue(push_actions)
@defer.inlineCallbacks
@@ -448,10 +511,14 @@ class EventPushActionsStore(SQLBaseStore):
)
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering):
topological_ordering, stream_ordering):
"""
Purges old, stale push actions for a user and room before a given
topological_ordering
Purges old push actions for a user and room before a given
topological_ordering.
We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.
Args:
txn: The transcation
room_id: Room ID to delete from
@@ -475,10 +542,16 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" topological_ordering < ? AND stream_ordering < ?",
" topological_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
)
txn.execute("""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
@@ -495,6 +568,14 @@ class EventPushActionsStore(SQLBaseStore):
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
@@ -534,6 +615,135 @@ class EventPushActionsStore(SQLBaseStore):
return range_end
@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True
try:
while True:
logger.info("Rotating notifications")
caught_up = yield self.runInteraction(
"_rotate_notifs",
self._rotate_notifs_txn
)
if caught_up:
break
yield sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
def _rotate_notifs_txn(self, txn):
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
"""
# We want to make sure that we only ever do this one at a time
# self.database_engine.lock_table(txn, "event_push_summary")
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
# We don't to try and rotate millions of rows at once, so we cap the
# maximum stream ordering we'll rotate before.
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
# stream_row = (old_rotate_stream_ordering + self._rotate_count,)
if stream_row:
offset_stream_ordering, = stream_row
rotate_to_stream_ordering = min(
self.stream_ordering_day_ago, offset_stream_ordering
)
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
else:
rotate_to_stream_ordering = self.stream_ordering_day_ago
caught_up = True
logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
# We have caught up iff we were limited by `stream_ordering_day_ago`
return caught_up
def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
rows = txn.fetchall()
logger.info("Rotating notifications, handling %d rows", len(rows))
# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
# existing table.
self._simple_insert_many_txn(
txn,
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
}
for row in rows if row[4] is None
]
)
txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
)
txn.execute(
"DELETE FROM event_push_actions"
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
(old_rotate_stream_ordering, rotate_to_stream_ordering,)
)
logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,)
)
def _action_has_highlight(actions):
for action in actions:
+7 -1
View File
@@ -765,7 +765,6 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -784,6 +783,13 @@ class EventsStore(SQLBaseStore):
"DELETE FROM %s WHERE event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
self._simple_insert_many_txn(
txn,
+11 -3
View File
@@ -15,7 +15,7 @@
from ._base import SQLBaseStore
from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from collections import namedtuple
from twisted.internet import defer
@@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
self.presence_stream_cache.entity_has_changed,
state.user_id, stream_id,
)
self._invalidate_cache_and_stream(
txn, self._get_presence_for_user, (state.user_id,)
)
# Actually insert new rows
self._simple_insert_many_txn(
@@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
"get_all_presence_updates", get_all_presence_updates_txn
)
@defer.inlineCallbacks
@cached()
def _get_presence_for_user(self, user_id):
raise NotImplementedError()
@cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
num_args=1, inlineCallbacks=True)
def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch(
table="presence_stream",
@@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
for row in rows:
row["currently_active"] = bool(row["currently_active"])
defer.returnValue([UserPresenceState(**row) for row in rows])
defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
+1
View File
@@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore):
room_id=room_id,
user_id=user_id,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)
return True
@@ -0,0 +1,37 @@
/* Copyright 2017 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Aggregate of old notification counts that have been deleted out of the
-- main event_push_actions table. This count does not include those that were
-- highlights, as they remain in the event_push_actions table.
CREATE TABLE event_push_summary (
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notif_count BIGINT NOT NULL,
stream_ordering BIGINT NOT NULL
);
CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
-- The stream ordering up to which we have aggregated the event_push_actions
-- table into event_push_summary
CREATE TABLE event_push_summary_stream_ordering (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_ordering BIGINT NOT NULL,
CHECK (Lock='X')
);
INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
@@ -0,0 +1,39 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS pushers2 (
id BIGINT PRIMARY KEY,
user_name TEXT NOT NULL,
access_token BIGINT DEFAULT NULL,
profile_tag TEXT NOT NULL,
kind TEXT NOT NULL,
app_id TEXT NOT NULL,
app_display_name TEXT NOT NULL,
device_display_name TEXT NOT NULL,
pushkey TEXT NOT NULL,
ts BIGINT NOT NULL,
lang TEXT,
data TEXT,
last_stream_ordering INTEGER,
last_success BIGINT,
failing_since BIGINT,
UNIQUE (app_id, pushkey, user_name)
);
INSERT INTO pushers2 SELECT * FROM PUSHERS;
DROP TABLE PUSHERS;
ALTER TABLE pushers2 RENAME TO pushers;
+1 -1
View File
@@ -600,7 +600,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
return " & ".join(result + ":*" for result in results)
return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
+13 -1
View File
@@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
def get_state_ids_for_events(self, event_ids, types):
def get_state_ids_for_events(self, event_ids, types=None):
"""
Get the state dicts corresponding to a list of events
Args:
event_ids(list(str)): events whose state should be returned
types(list[(str, str)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. May be None, which
matches any key
Returns:
A deferred dict from event_id -> (type, state_key) -> state_event
"""
event_to_groups = yield self._get_state_group_for_events(
event_ids,
)
+86
View File
@@ -17,9 +17,15 @@ from twisted.internet import defer
import tests.unittest
import tests.utils
from mock import Mock
USER_ID = "@user:example.com"
PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
HIGHLIGHT = [
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
]
class EventPushActionsStoreTestCase(tests.unittest.TestCase):
@@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
yield self.store.get_unread_push_actions_for_user_in_range_for_email(
USER_ID, 0, 1000, 20
)
@defer.inlineCallbacks
def test_count_aggregation(self):
room_id = "!foo:example.com"
user_id = "@user1235:example.com"
@defer.inlineCallbacks
def _assert_counts(noitf_count, highlight_count):
counts = yield self.store.runInteraction(
"", self.store._get_unread_counts_by_pos_txn,
room_id, user_id, 0, 0
)
self.assertEquals(
counts,
{"notify_count": noitf_count, "highlight_count": highlight_count}
)
def _inject_actions(stream, action):
event = Mock()
event.room_id = room_id
event.event_id = "$test:example.com"
event.internal_metadata.stream_ordering = stream
event.depth = stream
tuples = [(user_id, action)]
return self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
event, tuples
)
def _rotate(stream):
return self.store.runInteraction(
"", self.store._rotate_notifs_before_txn, stream
)
def _mark_read(stream, depth):
return self.store.runInteraction(
"", self.store._remove_old_push_actions_before_txn,
room_id, user_id, depth, stream
)
yield _assert_counts(0, 0)
yield _inject_actions(1, PlAIN_NOTIF)
yield _assert_counts(1, 0)
yield _rotate(2)
yield _assert_counts(1, 0)
yield _inject_actions(3, PlAIN_NOTIF)
yield _assert_counts(2, 0)
yield _rotate(4)
yield _assert_counts(2, 0)
yield _inject_actions(5, PlAIN_NOTIF)
yield _mark_read(3, 3)
yield _assert_counts(1, 0)
yield _mark_read(5, 5)
yield _assert_counts(0, 0)
yield _inject_actions(6, PlAIN_NOTIF)
yield _rotate(7)
yield self.store._simple_delete(
table="event_push_actions",
keyvalues={"1": 1},
desc="",
)
yield _assert_counts(1, 0)
yield _mark_read(7, 7)
yield _assert_counts(0, 0)
yield _inject_actions(8, HIGHLIGHT)
yield _assert_counts(1, 1)
yield _rotate(9)
yield _assert_counts(1, 1)
yield _rotate(10)
yield _assert_counts(1, 1)