Compare commits
82 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8d910ff5b9 | |||
| cff886c47b | |||
| a24492c06f | |||
| b7442c3e2b | |||
| a3708a1885 | |||
| c927d6de9b | |||
| 0c4cf9372b | |||
| b5c268738b | |||
| 17673404fb | |||
| 7f026792e1 | |||
| 11940d462a | |||
| 699be7d1be | |||
| ede125b7e1 | |||
| 2fa14fd48a | |||
| 66eb0bd548 | |||
| 5aae844e60 | |||
| ec8d7603e6 | |||
| 8c87bb550e | |||
| 4aa29508af | |||
| b4017539d4 | |||
| c97d491649 | |||
| b6557f2cfe | |||
| 138e030cfe | |||
| 502ae6c663 | |||
| e6acf0c399 | |||
| 04eca2589d | |||
| 474c9aadbe | |||
| 7dcbcca68c | |||
| 3b42bb96e4 | |||
| 738000971e | |||
| fa467e62a9 | |||
| 355d62c499 | |||
| fc2f29c1d0 | |||
| ce3c8df6df | |||
| 095b45c165 | |||
| 795f8e3fe7 | |||
| d7457c7661 | |||
| 359c97f506 | |||
| 9e617cd4c2 | |||
| d0497425f8 | |||
| 808ddf0ae7 | |||
| feb15dc99f | |||
| ecd7e36047 | |||
| 6bba80241c | |||
| 3a46280ca3 | |||
| e1a12e24d2 | |||
| 6a3743b0d4 | |||
| 481f6c87e7 | |||
| df4407d665 | |||
| 96ef35c62f | |||
| 5c3cb8778a | |||
| 1beda9c8a7 | |||
| fdbd90e25d | |||
| 52cd019a54 | |||
| f20cd34858 | |||
| 9adcd3a514 | |||
| 063a1251a9 | |||
| af6da6db2d | |||
| 131c0134f5 | |||
| 83c7b8ec91 | |||
| 2849d3f29d | |||
| f57cb21952 | |||
| fac3c03087 | |||
| 4abecb7b02 | |||
| 010159365c | |||
| 717e4448c4 | |||
| e41f183aa8 | |||
| 10f7bfe897 | |||
| 772b8ebe54 | |||
| 3c9acdef4d | |||
| fe28150cdc | |||
| 69ef497253 | |||
| 1d08de6ce9 | |||
| 10d31c433b | |||
| 345afd9fdc | |||
| 7ed07066ac | |||
| 2f703b8645 | |||
| 9fff1aacca | |||
| cb842dc99f | |||
| 2238a10b42 | |||
| 2998fa57b0 | |||
| f5abaafabd |
@@ -32,7 +32,7 @@ import urlparse
|
||||
import nacl.signing
|
||||
import nacl.encoding
|
||||
|
||||
from syutil.crypto.jsonsign import verify_signed_json, SignatureVerifyException
|
||||
from signedjson.sign import verify_signed_json, SignatureVerifyException
|
||||
|
||||
CONFIG_JSON = "cmdclient_config.json"
|
||||
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
# Example log_config file for synapse. To enable, point `log_config` to it in
|
||||
# `homeserver.yaml`, and restart synapse.
|
||||
#
|
||||
# This configuration will produce similar results to the defaults within
|
||||
# synapse, but can be edited to give more flexibility.
|
||||
|
||||
version: 1
|
||||
|
||||
formatters:
|
||||
fmt:
|
||||
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
|
||||
|
||||
filters:
|
||||
context:
|
||||
(): synapse.util.logcontext.LoggingContextFilter
|
||||
request: ""
|
||||
|
||||
handlers:
|
||||
# example output to console
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
filters: [context]
|
||||
|
||||
# example output to file - to enable, edit 'root' config below.
|
||||
file:
|
||||
class: logging.handlers.RotatingFileHandler
|
||||
formatter: fmt
|
||||
filename: /var/log/synapse/homeserver.log
|
||||
maxBytes: 100000000
|
||||
backupCount: 3
|
||||
filters: [context]
|
||||
|
||||
|
||||
root:
|
||||
level: INFO
|
||||
handlers: [console] # to use file handler instead, switch to [file]
|
||||
|
||||
loggers:
|
||||
synapse:
|
||||
level: INFO
|
||||
|
||||
synapse.storage:
|
||||
level: INFO
|
||||
|
||||
# example of enabling debugging for a component:
|
||||
#
|
||||
# synapse.federation.transport.server:
|
||||
# level: DEBUG
|
||||
@@ -25,6 +25,5 @@ Configuring IP used for auth
|
||||
The ReCaptcha API requires that the IP address of the user who solved the
|
||||
captcha is sent. If the client is connecting through a proxy or load balancer,
|
||||
it may be required to use the X-Forwarded-For (XFF) header instead of the origin
|
||||
IP address. This can be configured as an option on the home server like so::
|
||||
|
||||
captcha_ip_origin_is_x_forwarded: true
|
||||
IP address. This can be configured using the x_forwarded directive in the
|
||||
listeners section of the homeserver.yaml configuration file.
|
||||
|
||||
@@ -43,9 +43,6 @@ class JoinRules(object):
|
||||
|
||||
class LoginType(object):
|
||||
PASSWORD = u"m.login.password"
|
||||
OAUTH = u"m.login.oauth2"
|
||||
EMAIL_CODE = u"m.login.email.code"
|
||||
EMAIL_URL = u"m.login.email.url"
|
||||
EMAIL_IDENTITY = u"m.login.email.identity"
|
||||
RECAPTCHA = u"m.login.recaptcha"
|
||||
DUMMY = u"m.login.dummy"
|
||||
|
||||
@@ -87,6 +87,10 @@ class SynchrotronSlavedStore(
|
||||
RoomMemberStore.__dict__["who_forgot_in_room"]
|
||||
)
|
||||
|
||||
did_forget = (
|
||||
RoomMemberStore.__dict__["did_forget"]
|
||||
)
|
||||
|
||||
# XXX: This is a bit broken because we don't persist the accepted list in a
|
||||
# way that can be replicated. This means that we don't have a way to
|
||||
# invalidate the cache correctly.
|
||||
|
||||
@@ -95,7 +95,7 @@ class TlsConfig(Config):
|
||||
# make HTTPS requests to this server will check that the TLS
|
||||
# certificates returned by this server match one of the fingerprints.
|
||||
#
|
||||
# Synapse automatically adds its the fingerprint of its own certificate
|
||||
# Synapse automatically adds the fingerprint of its own certificate
|
||||
# to the list. So if federation traffic is handle directly by synapse
|
||||
# then no modification to the list is required.
|
||||
#
|
||||
|
||||
@@ -303,18 +303,10 @@ class TransactionQueue(object):
|
||||
try:
|
||||
self.pending_transactions[destination] = 1
|
||||
|
||||
# XXX: what's this for?
|
||||
yield run_on_reactor()
|
||||
|
||||
while True:
|
||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||
|
||||
pending_edus.extend(
|
||||
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
||||
)
|
||||
|
||||
limiter = yield get_retry_limiter(
|
||||
destination,
|
||||
self.clock,
|
||||
@@ -326,6 +318,24 @@ class TransactionQueue(object):
|
||||
yield self._get_new_device_messages(destination)
|
||||
)
|
||||
|
||||
# BEGIN CRITICAL SECTION
|
||||
#
|
||||
# In order to avoid a race condition, we need to make sure that
|
||||
# the following code (from popping the queues up to the point
|
||||
# where we decide if we actually have any pending messages) is
|
||||
# atomic - otherwise new PDUs or EDUs might arrive in the
|
||||
# meantime, but not get sent because we hold the
|
||||
# pending_transactions flag.
|
||||
|
||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||
|
||||
pending_edus.extend(
|
||||
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
||||
)
|
||||
|
||||
pending_edus.extend(device_message_edus)
|
||||
if pending_presence:
|
||||
pending_edus.append(
|
||||
@@ -355,6 +365,8 @@ class TransactionQueue(object):
|
||||
)
|
||||
return
|
||||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
success = yield self._send_new_transaction(
|
||||
destination, pending_pdus, pending_edus, pending_failures,
|
||||
limiter=limiter,
|
||||
|
||||
@@ -19,7 +19,6 @@ from ._base import BaseHandler
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -54,3 +53,46 @@ class AdminHandler(BaseHandler):
|
||||
}
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users(self):
|
||||
"""Function to reterive a list of users in users table.
|
||||
|
||||
Args:
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
ret = yield self.store.get_users()
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_paginate(self, order, start, limit):
|
||||
"""Function to reterive a paginated list of users from
|
||||
users list. This will return a json object, which contains
|
||||
list of users and the total number of users in users table.
|
||||
|
||||
Args:
|
||||
order (str): column name to order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
Returns:
|
||||
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
|
||||
"""
|
||||
ret = yield self.store.get_users_paginate(order, start, limit)
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def search_users(self, term):
|
||||
"""Function to search users list for one or more users with
|
||||
the matched term.
|
||||
|
||||
Args:
|
||||
term (str): search term
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
ret = yield self.store.search_users(term)
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
||||
+30
-10
@@ -12,7 +12,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.api import errors
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.util import stringutils
|
||||
@@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
|
||||
# Then work out if any users have since joined
|
||||
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
|
||||
|
||||
stream_ordering = RoomStreamToken.parse_stream_token(
|
||||
from_token.room_key).stream
|
||||
|
||||
possibly_changed = set(changed)
|
||||
for room_id in rooms_changed:
|
||||
# Fetch the current state at the time.
|
||||
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
|
||||
|
||||
# Fetch the current state at the time.
|
||||
try:
|
||||
event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||
room_id, stream_ordering=stream_ordering
|
||||
)
|
||||
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
|
||||
except:
|
||||
prev_state_ids = {}
|
||||
except errors.StoreError:
|
||||
# we have purged the stream_ordering index since the stream
|
||||
# ordering: treat it the same as a new room
|
||||
event_ids = []
|
||||
|
||||
current_state_ids = yield self.state.get_current_state_ids(room_id)
|
||||
|
||||
# special-case for an empty prev state: include all members
|
||||
# in the changed list
|
||||
if not event_ids:
|
||||
for key, event_id in current_state_ids.iteritems():
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
possibly_changed.add(state_key)
|
||||
continue
|
||||
|
||||
# mapping from event_id -> state_dict
|
||||
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
|
||||
|
||||
# If there has been any change in membership, include them in the
|
||||
# possibly changed list. We'll check if they are joined below,
|
||||
# and we're not toooo worried about spuriously adding users.
|
||||
for key, event_id in current_state_ids.iteritems():
|
||||
etype, state_key = key
|
||||
if etype == EventTypes.Member:
|
||||
prev_event_id = prev_state_ids.get(key, None)
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
||||
# check if this member has changed since any of the extremities
|
||||
# at the stream_ordering, and add them to the list if so.
|
||||
for state_dict in prev_state_ids.values():
|
||||
prev_event_id = state_dict.get(key, None)
|
||||
if not prev_event_id or prev_event_id != event_id:
|
||||
possibly_changed.add(state_key)
|
||||
break
|
||||
|
||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||
user_id
|
||||
@@ -307,7 +327,7 @@ class DeviceHandler(BaseHandler):
|
||||
user_id
|
||||
)
|
||||
logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
|
||||
if str(extremity) == str(prev_ids[0]):
|
||||
if extremity and prev_ids[0] and int(extremity) >= int(prev_ids[0]):
|
||||
resync = False
|
||||
|
||||
if resync:
|
||||
|
||||
@@ -363,6 +363,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_presence():
|
||||
defer.returnValue([])
|
||||
states = yield presence_handler.get_states(
|
||||
[m.user_id for m in room_members],
|
||||
as_event=True,
|
||||
|
||||
@@ -373,6 +373,7 @@ class PresenceHandler(object):
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
return
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
@@ -402,6 +403,7 @@ class PresenceHandler(object):
|
||||
Useful for streams that are not associated with an actual
|
||||
client that is being used by a user.
|
||||
"""
|
||||
affect_presence = False
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
@@ -463,6 +465,7 @@ class PresenceHandler(object):
|
||||
syncing_user_ids(set(str)): The set of user_ids that are
|
||||
currently syncing on that server.
|
||||
"""
|
||||
return
|
||||
|
||||
# Grab the previous list of user_ids that were syncing on that process
|
||||
prev_syncing_user_ids = (
|
||||
@@ -531,7 +534,7 @@ class PresenceHandler(object):
|
||||
# There are things not in our in memory cache. Lets pull them out of
|
||||
# the database.
|
||||
res = yield self.store.get_presence_for_users(missing)
|
||||
states.update({state.user_id: state for state in res})
|
||||
states.update(res)
|
||||
|
||||
missing = [user_id for user_id, state in states.items() if not state]
|
||||
if missing:
|
||||
|
||||
@@ -42,7 +42,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
class RoomListHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
super(RoomListHandler, self).__init__(hs)
|
||||
self.response_cache = ResponseCache(hs)
|
||||
self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
|
||||
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
|
||||
|
||||
def get_local_public_room_list(self, limit=None, since_token=None,
|
||||
|
||||
@@ -719,7 +719,9 @@ class RoomMemberHandler(BaseHandler):
|
||||
)
|
||||
membership = member.membership if member else None
|
||||
|
||||
if membership is not None and membership != Membership.LEAVE:
|
||||
if membership is not None and membership not in [
|
||||
Membership.LEAVE, Membership.BAN
|
||||
]:
|
||||
raise SynapseError(400, "User %s in room %s" % (
|
||||
user_id, room_id
|
||||
))
|
||||
|
||||
@@ -539,7 +539,7 @@ class SyncHandler(object):
|
||||
since_token is None and
|
||||
sync_config.filter_collection.blocks_all_presence()
|
||||
)
|
||||
if not block_all_presence_data:
|
||||
if False or not block_all_presence_data:
|
||||
yield self._generate_sync_entry_for_presence(
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
)
|
||||
|
||||
@@ -218,7 +218,8 @@ class EmailPusher(object):
|
||||
)
|
||||
|
||||
def seconds_until(self, ts_msec):
|
||||
return (ts_msec - self.clock.time_msec()) / 1000
|
||||
secs = (ts_msec - self.clock.time_msec()) / 1000
|
||||
return max(secs, 0) # Ensure non-negative
|
||||
|
||||
def get_room_throttle_ms(self, room_id):
|
||||
if room_id in self.throttle_params:
|
||||
|
||||
@@ -46,6 +46,12 @@ class SlavedAccountDataStore(BaseSlavedStore):
|
||||
)
|
||||
|
||||
get_tags_for_user = TagsStore.__dict__["get_tags_for_user"]
|
||||
get_tags_for_room = (
|
||||
DataStore.get_tags_for_room.__func__
|
||||
)
|
||||
get_account_data_for_room = (
|
||||
DataStore.get_account_data_for_room.__func__
|
||||
)
|
||||
|
||||
get_updated_tags = DataStore.get_updated_tags.__func__
|
||||
get_updated_account_data_for_user = (
|
||||
|
||||
@@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore):
|
||||
get_unread_event_push_actions_by_room_for_user = (
|
||||
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
|
||||
)
|
||||
_get_unread_counts_by_receipt_txn = (
|
||||
DataStore._get_unread_counts_by_receipt_txn.__func__
|
||||
)
|
||||
_get_unread_counts_by_pos_txn = (
|
||||
DataStore._get_unread_counts_by_pos_txn.__func__
|
||||
)
|
||||
_get_state_group_for_events = (
|
||||
StateStore.__dict__["_get_state_group_for_events"]
|
||||
)
|
||||
|
||||
@@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.presence import PresenceStore
|
||||
|
||||
|
||||
class SlavedPresenceStore(BaseSlavedStore):
|
||||
@@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore):
|
||||
|
||||
_get_active_presence = DataStore._get_active_presence.__func__
|
||||
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
|
||||
get_presence_for_users = DataStore.get_presence_for_users.__func__
|
||||
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
||||
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
||||
|
||||
def get_current_presence_token(self):
|
||||
return self._presence_id_gen.get_current_token()
|
||||
|
||||
@@ -87,9 +87,17 @@ class HttpTransactionCache(object):
|
||||
|
||||
deferred = fn(*args, **kwargs)
|
||||
|
||||
# We don't add an errback to the raw deferred, so we ask ObservableDeferred
|
||||
# to swallow the error. This is fine as the error will still be reported
|
||||
# to the observers.
|
||||
# if the request fails with a Twisted failure, remove it
|
||||
# from the transaction map. This is done to ensure that we don't
|
||||
# cache transient errors like rate-limiting errors, etc.
|
||||
def remove_from_map(err):
|
||||
self.transactions.pop(txn_key, None)
|
||||
return err
|
||||
deferred.addErrback(remove_from_map)
|
||||
|
||||
# We don't add any other errbacks to the raw deferred, so we ask
|
||||
# ObservableDeferred to swallow the error. This is fine as the error will
|
||||
# still be reported to the observers.
|
||||
observable = ObservableDeferred(deferred, consumeErrors=True)
|
||||
self.transactions[txn_key] = (observable, self.clock.time_msec())
|
||||
return observable.observe()
|
||||
|
||||
@@ -17,6 +17,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import AuthError, SynapseError
|
||||
from synapse.types import UserID
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
|
||||
from .base import ClientV1RestServlet, client_path_patterns
|
||||
|
||||
@@ -25,6 +26,34 @@ import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UsersRestServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/admin/users/(?P<user_id>[^/]*)")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(UsersRestServlet, self).__init__(hs)
|
||||
self.handlers = hs.get_handlers()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
target_user = UserID.from_string(user_id)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
# To allow all users to get the users list
|
||||
# if not is_admin and target_user != auth_user:
|
||||
# raise AuthError(403, "You are not a server admin")
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Can only users a local user")
|
||||
|
||||
ret = yield self.handlers.admin_handler.get_users()
|
||||
|
||||
defer.returnValue((200, ret))
|
||||
|
||||
|
||||
class WhoisRestServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
|
||||
|
||||
@@ -128,8 +157,199 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
class ResetPasswordRestServlet(ClientV1RestServlet):
|
||||
"""Post request to allow an administrator reset password for a user.
|
||||
This need a user have a administrator access in Synapse.
|
||||
Example:
|
||||
http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
|
||||
@user:to_reset_password?access_token=admin_access_token
|
||||
JsonBodyToSend:
|
||||
{
|
||||
"new_password": "secret"
|
||||
}
|
||||
Returns:
|
||||
200 OK with empty object if success otherwise an error.
|
||||
"""
|
||||
PATTERNS = client_path_patterns("/admin/reset_password/(?P<target_user_id>[^/]*)")
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
super(ResetPasswordRestServlet, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.auth_handler = hs.get_auth_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, target_user_id):
|
||||
"""Post request to allow an administrator reset password for a user.
|
||||
This need a user have a administrator access in Synapse.
|
||||
"""
|
||||
UserID.from_string(target_user_id)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
params = parse_json_object_from_request(request)
|
||||
new_password = params['new_password']
|
||||
if not new_password:
|
||||
raise SynapseError(400, "Missing 'new_password' arg")
|
||||
|
||||
logger.info("new_password: %r", new_password)
|
||||
|
||||
yield self.auth_handler.set_password(
|
||||
target_user_id, new_password, requester
|
||||
)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
class GetUsersPaginatedRestServlet(ClientV1RestServlet):
|
||||
"""Get request to get specific number of users from Synapse.
|
||||
This need a user have a administrator access in Synapse.
|
||||
Example:
|
||||
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
|
||||
@admin:user?access_token=admin_access_token&start=0&limit=10
|
||||
Returns:
|
||||
200 OK with json object {list[dict[str, Any]], count} or empty object.
|
||||
"""
|
||||
PATTERNS = client_path_patterns("/admin/users_paginate/(?P<target_user_id>[^/]*)")
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
super(GetUsersPaginatedRestServlet, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.handlers = hs.get_handlers()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, target_user_id):
|
||||
"""Get request to get specific number of users from Synapse.
|
||||
This need a user have a administrator access in Synapse.
|
||||
"""
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
# To allow all users to get the users list
|
||||
# if not is_admin and target_user != auth_user:
|
||||
# raise AuthError(403, "You are not a server admin")
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Can only users a local user")
|
||||
|
||||
order = "name" # order by name in user table
|
||||
start = request.args.get("start")[0]
|
||||
limit = request.args.get("limit")[0]
|
||||
if not limit:
|
||||
raise SynapseError(400, "Missing 'limit' arg")
|
||||
if not start:
|
||||
raise SynapseError(400, "Missing 'start' arg")
|
||||
logger.info("limit: %s, start: %s", limit, start)
|
||||
|
||||
ret = yield self.handlers.admin_handler.get_users_paginate(
|
||||
order, start, limit
|
||||
)
|
||||
defer.returnValue((200, ret))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, target_user_id):
|
||||
"""Post request to get specific number of users from Synapse..
|
||||
This need a user have a administrator access in Synapse.
|
||||
Example:
|
||||
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
|
||||
@admin:user?access_token=admin_access_token
|
||||
JsonBodyToSend:
|
||||
{
|
||||
"start": "0",
|
||||
"limit": "10
|
||||
}
|
||||
Returns:
|
||||
200 OK with json object {list[dict[str, Any]], count} or empty object.
|
||||
"""
|
||||
UserID.from_string(target_user_id)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
order = "name" # order by name in user table
|
||||
params = parse_json_object_from_request(request)
|
||||
limit = params['limit']
|
||||
start = params['start']
|
||||
if not limit:
|
||||
raise SynapseError(400, "Missing 'limit' arg")
|
||||
if not start:
|
||||
raise SynapseError(400, "Missing 'start' arg")
|
||||
logger.info("limit: %s, start: %s", limit, start)
|
||||
|
||||
ret = yield self.handlers.admin_handler.get_users_paginate(
|
||||
order, start, limit
|
||||
)
|
||||
defer.returnValue((200, ret))
|
||||
|
||||
|
||||
class SearchUsersRestServlet(ClientV1RestServlet):
|
||||
"""Get request to search user table for specific users according to
|
||||
search term.
|
||||
This need a user have a administrator access in Synapse.
|
||||
Example:
|
||||
http://localhost:8008/_matrix/client/api/v1/admin/search_users/
|
||||
@admin:user?access_token=admin_access_token&term=alice
|
||||
Returns:
|
||||
200 OK with json object {list[dict[str, Any]], count} or empty object.
|
||||
"""
|
||||
PATTERNS = client_path_patterns("/admin/search_users/(?P<target_user_id>[^/]*)")
|
||||
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
super(SearchUsersRestServlet, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.handlers = hs.get_handlers()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, target_user_id):
|
||||
"""Get request to search user table for specific users according to
|
||||
search term.
|
||||
This need a user have a administrator access in Synapse.
|
||||
"""
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
# To allow all users to get the users list
|
||||
# if not is_admin and target_user != auth_user:
|
||||
# raise AuthError(403, "You are not a server admin")
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Can only users a local user")
|
||||
|
||||
term = request.args.get("term")[0]
|
||||
if not term:
|
||||
raise SynapseError(400, "Missing 'term' arg")
|
||||
|
||||
logger.info("term: %s ", term)
|
||||
|
||||
ret = yield self.handlers.admin_handler.search_users(
|
||||
term
|
||||
)
|
||||
defer.returnValue((200, ret))
|
||||
|
||||
|
||||
def register_servlets(hs, http_server):
|
||||
WhoisRestServlet(hs).register(http_server)
|
||||
PurgeMediaCacheRestServlet(hs).register(http_server)
|
||||
DeactivateAccountRestServlet(hs).register(http_server)
|
||||
PurgeHistoryRestServlet(hs).register(http_server)
|
||||
UsersRestServlet(hs).register(http_server)
|
||||
ResetPasswordRestServlet(hs).register(http_server)
|
||||
GetUsersPaginatedRestServlet(hs).register(http_server)
|
||||
SearchUsersRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -46,6 +46,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
|
||||
def on_PUT(self, request, user_id):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user = UserID.from_string(user_id)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
@@ -55,7 +56,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
|
||||
defer.returnValue((400, "Unable to parse name"))
|
||||
|
||||
yield self.handlers.profile_handler.set_displayname(
|
||||
user, requester, new_name)
|
||||
user, requester, new_name, is_admin)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
@@ -88,6 +89,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
|
||||
def on_PUT(self, request, user_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
user = UserID.from_string(user_id)
|
||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
try:
|
||||
@@ -96,7 +98,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
|
||||
defer.returnValue((400, "Unable to parse name"))
|
||||
|
||||
yield self.handlers.profile_handler.set_avatar_url(
|
||||
user, requester, new_name)
|
||||
user, requester, new_name, is_admin)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
@@ -474,6 +474,7 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
# raise RuntimeError("Guest access has been disabled")
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
pagination_config = PaginationConfig.from_request(request)
|
||||
content = yield self.initial_sync_handler.room_initial_sync(
|
||||
@@ -608,6 +609,10 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
|
||||
raise SynapseError(400, "Missing user_id key.")
|
||||
target = UserID.from_string(content["user_id"])
|
||||
|
||||
event_content = None
|
||||
if 'reason' in content and membership_action in ['kick', 'ban']:
|
||||
event_content = {'reason': content['reason']}
|
||||
|
||||
yield self.handlers.room_member_handler.update_membership(
|
||||
requester=requester,
|
||||
target=target,
|
||||
@@ -615,6 +620,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
|
||||
action=membership_action,
|
||||
txn_id=txn_id,
|
||||
third_party_signed=content.get("third_party_signed", None),
|
||||
content=event_content,
|
||||
)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
@@ -297,6 +297,82 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
desc="get_user_ip_and_agents",
|
||||
)
|
||||
|
||||
def get_users(self):
|
||||
"""Function to reterive a list of users in users table.
|
||||
|
||||
Args:
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self._simple_select_list(
|
||||
table="users",
|
||||
keyvalues={},
|
||||
retcols=[
|
||||
"name",
|
||||
"password_hash",
|
||||
"is_guest",
|
||||
"admin"
|
||||
],
|
||||
desc="get_users",
|
||||
)
|
||||
|
||||
def get_users_paginate(self, order, start, limit):
|
||||
"""Function to reterive a paginated list of users from
|
||||
users list. This will return a json object, which contains
|
||||
list of users and the total number of users in users table.
|
||||
|
||||
Args:
|
||||
order (str): column name to order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
Returns:
|
||||
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
|
||||
"""
|
||||
is_guest = 0
|
||||
i_start = (int)(start)
|
||||
i_limit = (int)(limit)
|
||||
return self.get_user_list_paginate(
|
||||
table="users",
|
||||
keyvalues={
|
||||
"is_guest": is_guest
|
||||
},
|
||||
pagevalues=[
|
||||
order,
|
||||
i_limit,
|
||||
i_start
|
||||
],
|
||||
retcols=[
|
||||
"name",
|
||||
"password_hash",
|
||||
"is_guest",
|
||||
"admin"
|
||||
],
|
||||
desc="get_users_paginate",
|
||||
)
|
||||
|
||||
def search_users(self, term):
|
||||
"""Function to search users list for one or more users with
|
||||
the matched term.
|
||||
|
||||
Args:
|
||||
term (str): search term
|
||||
col (str): column to query term should be matched to
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self._simple_search_list(
|
||||
table="users",
|
||||
term=term,
|
||||
col="name",
|
||||
retcols=[
|
||||
"name",
|
||||
"password_hash",
|
||||
"is_guest",
|
||||
"admin"
|
||||
],
|
||||
desc="search_users",
|
||||
)
|
||||
|
||||
|
||||
def are_all_users_on_domain(txn, database_engine, domain):
|
||||
sql = database_engine.convert_param_style(
|
||||
|
||||
@@ -934,6 +934,165 @@ class SQLBaseStore(object):
|
||||
else:
|
||||
return 0
|
||||
|
||||
def _simple_select_list_paginate(self, table, keyvalues, pagevalues, retcols,
|
||||
desc="_simple_select_list_paginate"):
|
||||
"""Executes a SELECT query on the named table with start and limit,
|
||||
of row numbers, which may return zero or number of rows from start to limit,
|
||||
returning the result as a list of dicts.
|
||||
|
||||
Args:
|
||||
table (str): the table name
|
||||
keyvalues (dict[str, Any] | None):
|
||||
column names and values to select the rows with, or None to not
|
||||
apply a WHERE clause.
|
||||
retcols (iterable[str]): the names of the columns to return
|
||||
order (str): order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self.runInteraction(
|
||||
desc,
|
||||
self._simple_select_list_paginate_txn,
|
||||
table, keyvalues, pagevalues, retcols
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _simple_select_list_paginate_txn(cls, txn, table, keyvalues, pagevalues, retcols):
|
||||
"""Executes a SELECT query on the named table with start and limit,
|
||||
of row numbers, which may return zero or number of rows from start to limit,
|
||||
returning the result as a list of dicts.
|
||||
|
||||
Args:
|
||||
txn : Transaction object
|
||||
table (str): the table name
|
||||
keyvalues (dict[str, T] | None):
|
||||
column names and values to select the rows with, or None to not
|
||||
apply a WHERE clause.
|
||||
pagevalues ([]):
|
||||
order (str): order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
retcols (iterable[str]): the names of the columns to return
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
|
||||
"""
|
||||
if keyvalues:
|
||||
sql = "SELECT %s FROM %s WHERE %s ORDER BY %s" % (
|
||||
", ".join(retcols),
|
||||
table,
|
||||
" AND ".join("%s = ?" % (k,) for k in keyvalues),
|
||||
" ? ASC LIMIT ? OFFSET ?"
|
||||
)
|
||||
txn.execute(sql, keyvalues.values() + pagevalues)
|
||||
else:
|
||||
sql = "SELECT %s FROM %s ORDER BY %s" % (
|
||||
", ".join(retcols),
|
||||
table,
|
||||
" ? ASC LIMIT ? OFFSET ?"
|
||||
)
|
||||
txn.execute(sql, pagevalues)
|
||||
|
||||
return cls.cursor_to_dict(txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_list_paginate(self, table, keyvalues, pagevalues, retcols,
|
||||
desc="get_user_list_paginate"):
|
||||
"""Get a list of users from start row to a limit number of rows. This will
|
||||
return a json object with users and total number of users in users list.
|
||||
|
||||
Args:
|
||||
table (str): the table name
|
||||
keyvalues (dict[str, Any] | None):
|
||||
column names and values to select the rows with, or None to not
|
||||
apply a WHERE clause.
|
||||
pagevalues ([]):
|
||||
order (str): order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
retcols (iterable[str]): the names of the columns to return
|
||||
Returns:
|
||||
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
|
||||
"""
|
||||
users = yield self.runInteraction(
|
||||
desc,
|
||||
self._simple_select_list_paginate_txn,
|
||||
table, keyvalues, pagevalues, retcols
|
||||
)
|
||||
count = yield self.runInteraction(
|
||||
desc,
|
||||
self.get_user_count_txn
|
||||
)
|
||||
retval = {
|
||||
"users": users,
|
||||
"total": count
|
||||
}
|
||||
defer.returnValue(retval)
|
||||
|
||||
def get_user_count_txn(self, txn):
|
||||
"""Get a total number of registerd users in the users list.
|
||||
|
||||
Args:
|
||||
txn : Transaction object
|
||||
Returns:
|
||||
defer.Deferred: resolves to int
|
||||
"""
|
||||
sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
|
||||
txn.execute(sql_count)
|
||||
count = txn.fetchone()[0]
|
||||
defer.returnValue(count)
|
||||
|
||||
def _simple_search_list(self, table, term, col, retcols,
|
||||
desc="_simple_search_list"):
|
||||
"""Executes a SELECT query on the named table, which may return zero or
|
||||
more rows, returning the result as a list of dicts.
|
||||
|
||||
Args:
|
||||
table (str): the table name
|
||||
term (str | None):
|
||||
term for searching the table matched to a column.
|
||||
col (str): column to query term should be matched to
|
||||
retcols (iterable[str]): the names of the columns to return
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]] or None
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
desc,
|
||||
self._simple_search_list_txn,
|
||||
table, term, col, retcols
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _simple_search_list_txn(cls, txn, table, term, col, retcols):
|
||||
"""Executes a SELECT query on the named table, which may return zero or
|
||||
more rows, returning the result as a list of dicts.
|
||||
|
||||
Args:
|
||||
txn : Transaction object
|
||||
table (str): the table name
|
||||
term (str | None):
|
||||
term for searching the table matched to a column.
|
||||
col (str): column to query term should be matched to
|
||||
retcols (iterable[str]): the names of the columns to return
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]] or None
|
||||
"""
|
||||
if term:
|
||||
sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (
|
||||
", ".join(retcols),
|
||||
table,
|
||||
col
|
||||
)
|
||||
termvalues = ["%%" + term + "%%"]
|
||||
txn.execute(sql, termvalues)
|
||||
else:
|
||||
return 0
|
||||
|
||||
return cls.cursor_to_dict(txn)
|
||||
|
||||
|
||||
class _RollbackButIsFineException(Exception):
|
||||
""" This exception is used to rollback a transaction without implying
|
||||
|
||||
@@ -93,7 +93,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
||||
query_clause = "user_id = ?"
|
||||
query_params.append(user_id)
|
||||
|
||||
if device_id:
|
||||
if device_id is not None:
|
||||
query_clause += " AND device_id = ?"
|
||||
query_params.append(device_id)
|
||||
|
||||
|
||||
@@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
|
||||
"""For a given room_id and stream_ordering, return the forward
|
||||
extremeties of the room at that point in "time".
|
||||
|
||||
Throws a StoreError if we have since purged the index for
|
||||
stream_orderings from that point.
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
stream_ordering (int):
|
||||
|
||||
Returns:
|
||||
deferred, which resolves to a list of event_ids
|
||||
"""
|
||||
# We want to make the cache more effective, so we clamp to the last
|
||||
# change before the given ordering.
|
||||
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
|
||||
|
||||
# We don't always have a full stream_to_exterm_id table, e.g. after
|
||||
# the upgrade that introduced it, so we make sure we never ask for a
|
||||
# try and pin to a stream_ordering from before a restart
|
||||
# stream_ordering from before a restart
|
||||
last_change = max(self._stream_order_on_start, last_change)
|
||||
|
||||
# provided the last_change is recent enough, we now clamp the requested
|
||||
# stream_ordering to it.
|
||||
if last_change > self.stream_ordering_month_ago:
|
||||
stream_ordering = min(last_change, stream_ordering)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from twisted.internet import defer
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.types import RoomStreamToken
|
||||
from .stream import lower_bound
|
||||
@@ -25,11 +26,46 @@ import ujson as json
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
|
||||
DEFAULT_HIGHLIGHT_ACTION = [
|
||||
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
|
||||
]
|
||||
|
||||
|
||||
def _serialize_action(actions, is_highlight):
|
||||
"""Custom serializer for actions. This allows us to "compress" common actions.
|
||||
|
||||
We use the fact that most users have the same actions for notifs (and for
|
||||
highlights).
|
||||
We store these default actions as the empty string rather than the full JSON.
|
||||
Since the empty string isn't valid JSON there is no risk of this clashing with
|
||||
any real JSON actions
|
||||
"""
|
||||
if is_highlight:
|
||||
if actions == DEFAULT_HIGHLIGHT_ACTION:
|
||||
return "" # We use empty string as the column is non-NULL
|
||||
else:
|
||||
if actions == DEFAULT_NOTIF_ACTION:
|
||||
return ""
|
||||
return json.dumps(actions)
|
||||
|
||||
|
||||
def _deserialize_action(actions, is_highlight):
|
||||
"""Custom deserializer for actions. This allows us to "compress" common actions
|
||||
"""
|
||||
if actions:
|
||||
return json.loads(actions)
|
||||
|
||||
if is_highlight:
|
||||
return DEFAULT_HIGHLIGHT_ACTION
|
||||
else:
|
||||
return DEFAULT_NOTIF_ACTION
|
||||
|
||||
|
||||
class EventPushActionsStore(SQLBaseStore):
|
||||
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
||||
|
||||
def __init__(self, hs):
|
||||
self.stream_ordering_month_ago = None
|
||||
super(EventPushActionsStore, self).__init__(hs)
|
||||
|
||||
self.register_background_index_update(
|
||||
@@ -47,6 +83,13 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
where_clause="highlight=1"
|
||||
)
|
||||
|
||||
self._doing_notif_rotation = False
|
||||
self._rotate_notif_loop = self._clock.looping_call(
|
||||
self._rotate_notifs, 30 * 60 * 1000
|
||||
)
|
||||
self._rotate_delay = 3
|
||||
self._rotate_count = 10000
|
||||
|
||||
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
|
||||
"""
|
||||
Args:
|
||||
@@ -55,15 +98,17 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
"""
|
||||
values = []
|
||||
for uid, actions in tuples:
|
||||
is_highlight = 1 if _action_has_highlight(actions) else 0
|
||||
|
||||
values.append({
|
||||
'room_id': event.room_id,
|
||||
'event_id': event.event_id,
|
||||
'user_id': uid,
|
||||
'actions': json.dumps(actions),
|
||||
'actions': _serialize_action(actions, is_highlight),
|
||||
'stream_ordering': event.internal_metadata.stream_ordering,
|
||||
'topological_ordering': event.depth,
|
||||
'notif': 1,
|
||||
'highlight': 1 if _action_has_highlight(actions) else 0,
|
||||
'highlight': is_highlight,
|
||||
})
|
||||
|
||||
for uid, __ in tuples:
|
||||
@@ -77,67 +122,84 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
def get_unread_event_push_actions_by_room_for_user(
|
||||
self, room_id, user_id, last_read_event_id
|
||||
):
|
||||
def _get_unread_event_push_actions_by_room(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, topological_ordering"
|
||||
" FROM events"
|
||||
" WHERE room_id = ? AND event_id = ?"
|
||||
)
|
||||
txn.execute(
|
||||
sql, (room_id, last_read_event_id)
|
||||
)
|
||||
results = txn.fetchall()
|
||||
if len(results) == 0:
|
||||
return {"notify_count": 0, "highlight_count": 0}
|
||||
|
||||
stream_ordering = results[0][0]
|
||||
topological_ordering = results[0][1]
|
||||
token = RoomStreamToken(
|
||||
topological_ordering, stream_ordering
|
||||
)
|
||||
|
||||
# First get number of notifications.
|
||||
# We don't need to put a notif=1 clause as all rows always have
|
||||
# notif=1
|
||||
sql = (
|
||||
"SELECT count(*)"
|
||||
" FROM event_push_actions ea"
|
||||
" WHERE"
|
||||
" user_id = ?"
|
||||
" AND room_id = ?"
|
||||
" AND %s"
|
||||
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
||||
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
row = txn.fetchone()
|
||||
notify_count = row[0] if row else 0
|
||||
|
||||
# Now get the number of highlights
|
||||
sql = (
|
||||
"SELECT count(*)"
|
||||
" FROM event_push_actions ea"
|
||||
" WHERE"
|
||||
" highlight = 1"
|
||||
" AND user_id = ?"
|
||||
" AND room_id = ?"
|
||||
" AND %s"
|
||||
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
||||
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
row = txn.fetchone()
|
||||
highlight_count = row[0] if row else 0
|
||||
|
||||
return {
|
||||
"notify_count": notify_count,
|
||||
"highlight_count": highlight_count,
|
||||
}
|
||||
|
||||
ret = yield self.runInteraction(
|
||||
"get_unread_event_push_actions_by_room",
|
||||
_get_unread_event_push_actions_by_room
|
||||
self._get_unread_counts_by_receipt_txn,
|
||||
room_id, user_id, last_read_event_id
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
|
||||
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
|
||||
last_read_event_id):
|
||||
sql = (
|
||||
"SELECT stream_ordering, topological_ordering"
|
||||
" FROM events"
|
||||
" WHERE room_id = ? AND event_id = ?"
|
||||
)
|
||||
txn.execute(
|
||||
sql, (room_id, last_read_event_id)
|
||||
)
|
||||
results = txn.fetchall()
|
||||
if len(results) == 0:
|
||||
return {"notify_count": 0, "highlight_count": 0}
|
||||
|
||||
stream_ordering = results[0][0]
|
||||
topological_ordering = results[0][1]
|
||||
|
||||
return self._get_unread_counts_by_pos_txn(
|
||||
txn, room_id, user_id, topological_ordering, stream_ordering
|
||||
)
|
||||
|
||||
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
|
||||
stream_ordering):
|
||||
token = RoomStreamToken(
|
||||
topological_ordering, stream_ordering
|
||||
)
|
||||
|
||||
# First get number of notifications.
|
||||
# We don't need to put a notif=1 clause as all rows always have
|
||||
# notif=1
|
||||
sql = (
|
||||
"SELECT count(*)"
|
||||
" FROM event_push_actions ea"
|
||||
" WHERE"
|
||||
" user_id = ?"
|
||||
" AND room_id = ?"
|
||||
" AND %s"
|
||||
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
||||
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
row = txn.fetchone()
|
||||
notify_count = row[0] if row else 0
|
||||
|
||||
txn.execute("""
|
||||
SELECT notif_count FROM event_push_summary
|
||||
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
|
||||
""", (room_id, user_id, stream_ordering,))
|
||||
rows = txn.fetchall()
|
||||
if rows:
|
||||
notify_count += rows[0][0]
|
||||
|
||||
# Now get the number of highlights
|
||||
sql = (
|
||||
"SELECT count(*)"
|
||||
" FROM event_push_actions ea"
|
||||
" WHERE"
|
||||
" highlight = 1"
|
||||
" AND user_id = ?"
|
||||
" AND room_id = ?"
|
||||
" AND %s"
|
||||
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
||||
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
row = txn.fetchone()
|
||||
highlight_count = row[0] if row else 0
|
||||
|
||||
return {
|
||||
"notify_count": notify_count,
|
||||
"highlight_count": highlight_count,
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
|
||||
def f(txn):
|
||||
@@ -176,7 +238,8 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
# find rooms that have a read receipt in them and return the next
|
||||
# push actions
|
||||
sql = (
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
|
||||
" ep.highlight "
|
||||
" FROM ("
|
||||
" SELECT room_id,"
|
||||
" MAX(topological_ordering) as topological_ordering,"
|
||||
@@ -217,7 +280,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
def get_no_receipt(txn):
|
||||
sql = (
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
|
||||
" e.received_ts"
|
||||
" ep.highlight "
|
||||
" FROM event_push_actions AS ep"
|
||||
" INNER JOIN events AS e USING (room_id, event_id)"
|
||||
" WHERE"
|
||||
@@ -246,7 +309,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
"event_id": row[0],
|
||||
"room_id": row[1],
|
||||
"stream_ordering": row[2],
|
||||
"actions": json.loads(row[3]),
|
||||
"actions": _deserialize_action(row[3], row[4]),
|
||||
} for row in after_read_receipt + no_read_receipt
|
||||
]
|
||||
|
||||
@@ -285,7 +348,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
def get_after_receipt(txn):
|
||||
sql = (
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
|
||||
" e.received_ts"
|
||||
" ep.highlight, e.received_ts"
|
||||
" FROM ("
|
||||
" SELECT room_id,"
|
||||
" MAX(topological_ordering) as topological_ordering,"
|
||||
@@ -327,7 +390,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
def get_no_receipt(txn):
|
||||
sql = (
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
|
||||
" e.received_ts"
|
||||
" ep.highlight, e.received_ts"
|
||||
" FROM event_push_actions AS ep"
|
||||
" INNER JOIN events AS e USING (room_id, event_id)"
|
||||
" WHERE"
|
||||
@@ -357,8 +420,8 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
"event_id": row[0],
|
||||
"room_id": row[1],
|
||||
"stream_ordering": row[2],
|
||||
"actions": json.loads(row[3]),
|
||||
"received_ts": row[4],
|
||||
"actions": _deserialize_action(row[3], row[4]),
|
||||
"received_ts": row[5],
|
||||
} for row in after_read_receipt + no_read_receipt
|
||||
]
|
||||
|
||||
@@ -392,7 +455,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
sql = (
|
||||
"SELECT epa.event_id, epa.room_id,"
|
||||
" epa.stream_ordering, epa.topological_ordering,"
|
||||
" epa.actions, epa.profile_tag, e.received_ts"
|
||||
" epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
|
||||
" FROM event_push_actions epa, events e"
|
||||
" WHERE epa.event_id = e.event_id"
|
||||
" AND epa.user_id = ? %s"
|
||||
@@ -407,7 +470,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
"get_push_actions_for_user", f
|
||||
)
|
||||
for pa in push_actions:
|
||||
pa["actions"] = json.loads(pa["actions"])
|
||||
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
|
||||
defer.returnValue(push_actions)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -448,10 +511,14 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
|
||||
topological_ordering):
|
||||
topological_ordering, stream_ordering):
|
||||
"""
|
||||
Purges old, stale push actions for a user and room before a given
|
||||
topological_ordering
|
||||
Purges old push actions for a user and room before a given
|
||||
topological_ordering.
|
||||
|
||||
We however keep a months worth of highlighted notifications, so that
|
||||
users can still get a list of recent highlights.
|
||||
|
||||
Args:
|
||||
txn: The transcation
|
||||
room_id: Room ID to delete from
|
||||
@@ -475,10 +542,16 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
txn.execute(
|
||||
"DELETE FROM event_push_actions "
|
||||
" WHERE user_id = ? AND room_id = ? AND "
|
||||
" topological_ordering < ? AND stream_ordering < ?",
|
||||
" topological_ordering <= ?"
|
||||
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
|
||||
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
|
||||
)
|
||||
|
||||
txn.execute("""
|
||||
DELETE FROM event_push_summary
|
||||
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
|
||||
""", (room_id, user_id, stream_ordering))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _find_stream_orderings_for_times(self):
|
||||
yield self.runInteraction(
|
||||
@@ -495,6 +568,14 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
"Found stream ordering 1 month ago: it's %d",
|
||||
self.stream_ordering_month_ago
|
||||
)
|
||||
logger.info("Searching for stream ordering 1 day ago")
|
||||
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
|
||||
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
|
||||
)
|
||||
logger.info(
|
||||
"Found stream ordering 1 day ago: it's %d",
|
||||
self.stream_ordering_day_ago
|
||||
)
|
||||
|
||||
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
|
||||
"""
|
||||
@@ -534,6 +615,135 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
|
||||
return range_end
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _rotate_notifs(self):
|
||||
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
|
||||
return
|
||||
self._doing_notif_rotation = True
|
||||
|
||||
try:
|
||||
while True:
|
||||
logger.info("Rotating notifications")
|
||||
|
||||
caught_up = yield self.runInteraction(
|
||||
"_rotate_notifs",
|
||||
self._rotate_notifs_txn
|
||||
)
|
||||
if caught_up:
|
||||
break
|
||||
yield sleep(self._rotate_delay)
|
||||
finally:
|
||||
self._doing_notif_rotation = False
|
||||
|
||||
def _rotate_notifs_txn(self, txn):
|
||||
"""Archives older notifications into event_push_summary. Returns whether
|
||||
the archiving process has caught up or not.
|
||||
"""
|
||||
|
||||
# We want to make sure that we only ever do this one at a time
|
||||
# self.database_engine.lock_table(txn, "event_push_summary")
|
||||
|
||||
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="event_push_summary_stream_ordering",
|
||||
keyvalues={},
|
||||
retcol="stream_ordering",
|
||||
)
|
||||
|
||||
# We don't to try and rotate millions of rows at once, so we cap the
|
||||
# maximum stream ordering we'll rotate before.
|
||||
txn.execute("""
|
||||
SELECT stream_ordering FROM event_push_actions
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
|
||||
""", (old_rotate_stream_ordering, self._rotate_count))
|
||||
stream_row = txn.fetchone()
|
||||
# stream_row = (old_rotate_stream_ordering + self._rotate_count,)
|
||||
if stream_row:
|
||||
offset_stream_ordering, = stream_row
|
||||
rotate_to_stream_ordering = min(
|
||||
self.stream_ordering_day_ago, offset_stream_ordering
|
||||
)
|
||||
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
|
||||
else:
|
||||
rotate_to_stream_ordering = self.stream_ordering_day_ago
|
||||
caught_up = True
|
||||
|
||||
logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
|
||||
|
||||
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
|
||||
|
||||
# We have caught up iff we were limited by `stream_ordering_day_ago`
|
||||
return caught_up
|
||||
|
||||
def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
|
||||
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="event_push_summary_stream_ordering",
|
||||
keyvalues={},
|
||||
retcol="stream_ordering",
|
||||
)
|
||||
|
||||
# Calculate the new counts that should be upserted into event_push_summary
|
||||
sql = """
|
||||
SELECT user_id, room_id,
|
||||
coalesce(old.notif_count, 0) + upd.notif_count,
|
||||
upd.stream_ordering,
|
||||
old.user_id
|
||||
FROM (
|
||||
SELECT user_id, room_id, count(*) as notif_count,
|
||||
max(stream_ordering) as stream_ordering
|
||||
FROM event_push_actions
|
||||
WHERE ? <= stream_ordering AND stream_ordering < ?
|
||||
AND highlight = 0
|
||||
GROUP BY user_id, room_id
|
||||
) AS upd
|
||||
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
|
||||
"""
|
||||
|
||||
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
|
||||
rows = txn.fetchall()
|
||||
|
||||
logger.info("Rotating notifications, handling %d rows", len(rows))
|
||||
|
||||
# If the `old.user_id` above is NULL then we know there isn't already an
|
||||
# entry in the table, so we simply insert it. Otherwise we update the
|
||||
# existing table.
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_push_summary",
|
||||
values=[
|
||||
{
|
||||
"user_id": row[0],
|
||||
"room_id": row[1],
|
||||
"notif_count": row[2],
|
||||
"stream_ordering": row[3],
|
||||
}
|
||||
for row in rows if row[4] is None
|
||||
]
|
||||
)
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
|
||||
WHERE user_id = ? AND room_id = ?
|
||||
""",
|
||||
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
"DELETE FROM event_push_actions"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
|
||||
(old_rotate_stream_ordering, rotate_to_stream_ordering,)
|
||||
)
|
||||
|
||||
logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
|
||||
|
||||
txn.execute(
|
||||
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
|
||||
(rotate_to_stream_ordering,)
|
||||
)
|
||||
|
||||
|
||||
def _action_has_highlight(actions):
|
||||
for action in actions:
|
||||
|
||||
@@ -765,7 +765,6 @@ class EventsStore(SQLBaseStore):
|
||||
"event_edge_hashes",
|
||||
"event_edges",
|
||||
"event_forward_extremities",
|
||||
"event_push_actions",
|
||||
"event_reference_hashes",
|
||||
"event_search",
|
||||
"event_signatures",
|
||||
@@ -784,6 +783,13 @@ class EventsStore(SQLBaseStore):
|
||||
"DELETE FROM %s WHERE event_id = ?" % (table,),
|
||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||
)
|
||||
for table in (
|
||||
"event_push_actions",
|
||||
):
|
||||
txn.executemany(
|
||||
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
|
||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.api.constants import PresenceState
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
|
||||
|
||||
from collections import namedtuple
|
||||
from twisted.internet import defer
|
||||
@@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
|
||||
self.presence_stream_cache.entity_has_changed,
|
||||
state.user_id, stream_id,
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self._get_presence_for_user, (state.user_id,)
|
||||
)
|
||||
|
||||
# Actually insert new rows
|
||||
self._simple_insert_many_txn(
|
||||
@@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
|
||||
"get_all_presence_updates", get_all_presence_updates_txn
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@cached()
|
||||
def _get_presence_for_user(self, user_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
|
||||
num_args=1, inlineCallbacks=True)
|
||||
def get_presence_for_users(self, user_ids):
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="presence_stream",
|
||||
@@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
|
||||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
defer.returnValue([UserPresenceState(**row) for row in rows])
|
||||
defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
|
||||
|
||||
def get_current_presence_token(self):
|
||||
return self._presence_id_gen.get_current_token()
|
||||
|
||||
@@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore):
|
||||
room_id=room_id,
|
||||
user_id=user_id,
|
||||
topological_ordering=topological_ordering,
|
||||
stream_ordering=stream_ordering,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
/* Copyright 2017 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Aggregate of old notification counts that have been deleted out of the
|
||||
-- main event_push_actions table. This count does not include those that were
|
||||
-- highlights, as they remain in the event_push_actions table.
|
||||
CREATE TABLE event_push_summary (
|
||||
user_id TEXT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
notif_count BIGINT NOT NULL,
|
||||
stream_ordering BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
|
||||
|
||||
|
||||
-- The stream ordering up to which we have aggregated the event_push_actions
|
||||
-- table into event_push_summary
|
||||
CREATE TABLE event_push_summary_stream_ordering (
|
||||
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
|
||||
stream_ordering BIGINT NOT NULL,
|
||||
CHECK (Lock='X')
|
||||
);
|
||||
|
||||
INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
|
||||
@@ -0,0 +1,39 @@
|
||||
/* Copyright 2017 Vector Creations Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE TABLE IF NOT EXISTS pushers2 (
|
||||
id BIGINT PRIMARY KEY,
|
||||
user_name TEXT NOT NULL,
|
||||
access_token BIGINT DEFAULT NULL,
|
||||
profile_tag TEXT NOT NULL,
|
||||
kind TEXT NOT NULL,
|
||||
app_id TEXT NOT NULL,
|
||||
app_display_name TEXT NOT NULL,
|
||||
device_display_name TEXT NOT NULL,
|
||||
pushkey TEXT NOT NULL,
|
||||
ts BIGINT NOT NULL,
|
||||
lang TEXT,
|
||||
data TEXT,
|
||||
last_stream_ordering INTEGER,
|
||||
last_success BIGINT,
|
||||
failing_since BIGINT,
|
||||
UNIQUE (app_id, pushkey, user_name)
|
||||
);
|
||||
|
||||
INSERT INTO pushers2 SELECT * FROM PUSHERS;
|
||||
|
||||
DROP TABLE PUSHERS;
|
||||
|
||||
ALTER TABLE pushers2 RENAME TO pushers;
|
||||
@@ -600,7 +600,7 @@ def _parse_query(database_engine, search_term):
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
return " & ".join(result + ":*" for result in results)
|
||||
return " & ".join(result for result in results)
|
||||
elif isinstance(database_engine, Sqlite3Engine):
|
||||
return " & ".join(result + "*" for result in results)
|
||||
else:
|
||||
|
||||
@@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
|
||||
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_ids_for_events(self, event_ids, types):
|
||||
def get_state_ids_for_events(self, event_ids, types=None):
|
||||
"""
|
||||
Get the state dicts corresponding to a list of events
|
||||
|
||||
Args:
|
||||
event_ids(list(str)): events whose state should be returned
|
||||
types(list[(str, str)]|None): List of (type, state_key) tuples
|
||||
which are used to filter the state fetched. May be None, which
|
||||
matches any key
|
||||
|
||||
Returns:
|
||||
A deferred dict from event_id -> (type, state_key) -> state_event
|
||||
"""
|
||||
event_to_groups = yield self._get_state_group_for_events(
|
||||
event_ids,
|
||||
)
|
||||
|
||||
@@ -17,9 +17,15 @@ from twisted.internet import defer
|
||||
|
||||
import tests.unittest
|
||||
import tests.utils
|
||||
from mock import Mock
|
||||
|
||||
USER_ID = "@user:example.com"
|
||||
|
||||
PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
|
||||
HIGHLIGHT = [
|
||||
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
|
||||
]
|
||||
|
||||
|
||||
class EventPushActionsStoreTestCase(tests.unittest.TestCase):
|
||||
|
||||
@@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
|
||||
yield self.store.get_unread_push_actions_for_user_in_range_for_email(
|
||||
USER_ID, 0, 1000, 20
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_count_aggregation(self):
|
||||
room_id = "!foo:example.com"
|
||||
user_id = "@user1235:example.com"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _assert_counts(noitf_count, highlight_count):
|
||||
counts = yield self.store.runInteraction(
|
||||
"", self.store._get_unread_counts_by_pos_txn,
|
||||
room_id, user_id, 0, 0
|
||||
)
|
||||
self.assertEquals(
|
||||
counts,
|
||||
{"notify_count": noitf_count, "highlight_count": highlight_count}
|
||||
)
|
||||
|
||||
def _inject_actions(stream, action):
|
||||
event = Mock()
|
||||
event.room_id = room_id
|
||||
event.event_id = "$test:example.com"
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
event.depth = stream
|
||||
|
||||
tuples = [(user_id, action)]
|
||||
|
||||
return self.store.runInteraction(
|
||||
"", self.store._set_push_actions_for_event_and_users_txn,
|
||||
event, tuples
|
||||
)
|
||||
|
||||
def _rotate(stream):
|
||||
return self.store.runInteraction(
|
||||
"", self.store._rotate_notifs_before_txn, stream
|
||||
)
|
||||
|
||||
def _mark_read(stream, depth):
|
||||
return self.store.runInteraction(
|
||||
"", self.store._remove_old_push_actions_before_txn,
|
||||
room_id, user_id, depth, stream
|
||||
)
|
||||
|
||||
yield _assert_counts(0, 0)
|
||||
yield _inject_actions(1, PlAIN_NOTIF)
|
||||
yield _assert_counts(1, 0)
|
||||
yield _rotate(2)
|
||||
yield _assert_counts(1, 0)
|
||||
|
||||
yield _inject_actions(3, PlAIN_NOTIF)
|
||||
yield _assert_counts(2, 0)
|
||||
yield _rotate(4)
|
||||
yield _assert_counts(2, 0)
|
||||
|
||||
yield _inject_actions(5, PlAIN_NOTIF)
|
||||
yield _mark_read(3, 3)
|
||||
yield _assert_counts(1, 0)
|
||||
|
||||
yield _mark_read(5, 5)
|
||||
yield _assert_counts(0, 0)
|
||||
|
||||
yield _inject_actions(6, PlAIN_NOTIF)
|
||||
yield _rotate(7)
|
||||
|
||||
yield self.store._simple_delete(
|
||||
table="event_push_actions",
|
||||
keyvalues={"1": 1},
|
||||
desc="",
|
||||
)
|
||||
|
||||
yield _assert_counts(1, 0)
|
||||
|
||||
yield _mark_read(7, 7)
|
||||
yield _assert_counts(0, 0)
|
||||
|
||||
yield _inject_actions(8, HIGHLIGHT)
|
||||
yield _assert_counts(1, 1)
|
||||
yield _rotate(9)
|
||||
yield _assert_counts(1, 1)
|
||||
yield _rotate(10)
|
||||
yield _assert_counts(1, 1)
|
||||
|
||||
Reference in New Issue
Block a user