1
0

Compare commits

...

94 Commits

Author SHA1 Message Date
Erik Johnston f0ed2e408f Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-19 14:50:55 +01:00
Erik Johnston ab4ee2e524 Merge pull request #2236 from matrix-org/erikj/invalidation
Fix invalidation of get_users_with_read_receipts_in_room
2017-05-19 14:50:23 +01:00
Erik Johnston 58ebb96cce Fix invalidation of get_users_with_read_receipts_in_room 2017-05-19 14:38:50 +01:00
Erik Johnston 3da426357c Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-19 13:43:01 +01:00
Erik Johnston 99713dc7d3 Merge pull request #2234 from matrix-org/erikj/fix_push
Store ActionGenerator in HomeServer
2017-05-19 13:42:49 +01:00
Erik Johnston 1c1c0257f4 Move invalidation cb to its own structure 2017-05-19 11:44:11 +01:00
Erik Johnston cafe659f72 Store ActionGenerator in HomeServer 2017-05-19 10:09:56 +01:00
Erik Johnston 5b83e958e8 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-18 17:48:53 +01:00
Erik Johnston 72ed8196b3 Don't push users who have left 2017-05-18 17:48:36 +01:00
Erik Johnston 0e242aebc0 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-18 17:18:10 +01:00
Erik Johnston 107ac7ac96 Increase size of push rule caches 2017-05-18 17:17:53 +01:00
Erik Johnston e8ac65c4c9 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-18 16:51:40 +01:00
Erik Johnston 234772db6d Merge pull request #2233 from matrix-org/erikj/faster_as_check
Make get_if_app_services_interested_in_user faster
2017-05-18 16:51:18 +01:00
Erik Johnston 760625acba Make get_if_app_services_interested_in_user faster 2017-05-18 16:34:44 +01:00
Erik Johnston c57789d138 Remove size of push get_rules cache 2017-05-18 16:17:23 +01:00
Erik Johnston fbec8847be Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-18 16:04:59 +01:00
Erik Johnston f33df30732 Merge branch 'master' of github.com:matrix-org/synapse into develop 2017-05-18 13:56:37 +01:00
Erik Johnston 6e381180ae Merge pull request #2177 from matrix-org/erikj/faster_push_rules
Make calculating push actions faster
2017-05-18 11:46:18 +01:00
Erik Johnston 056ba9b795 Add comment 2017-05-18 11:45:56 +01:00
Erik Johnston 689b454674 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-18 10:03:21 +01:00
Erik Johnston 88664afe14 Merge pull request #2231 from aaronraimist/patch-1
Correct a typo in UPGRADE.rst
2017-05-18 10:00:35 +01:00
Aaron Raimist f98efea9b1 Correct a typo in UPGRADE.rst 2017-05-17 21:41:48 -05:00
Erik Johnston d9e3a4b5db Merge pull request #2230 from matrix-org/erikj/speed_up_get_state
Make get_state_groups_from_groups faster.
2017-05-17 17:23:04 +01:00
Erik Johnston 66d8ffabbd Faster push rule calculation via push specific cache
We add a push rule specific cache that ensures that we can reuse
calculated push rules appropriately when a user join/leaves.
2017-05-17 16:55:40 +01:00
Erik Johnston ace23463c5 Merge pull request #2216 from slipeer/app_services_interested_in_user
Fix users claimed non-exclusively by an app service don't get notific…
2017-05-17 16:28:50 +01:00
Erik Johnston bbfe4e996c Make get_state_groups_from_groups faster.
Most of the time was spent copying a dict to filter out sentinel values
that indicated that keys did not exist in the dict. The sentinel values
were added to ensure that we cached the non-existence of keys.

By updating DictionaryCache to keep track of which keys were known to
not exist itself we can remove a dictionary copy.
2017-05-17 15:12:15 +01:00
Erik Johnston 9f430fa07f Merge branch 'release-v0.21.0' of github.com:matrix-org/synapse into develop 2017-05-17 13:28:46 +01:00
Erik Johnston 120eefdc19 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-15 11:36:34 +01:00
Slipeer 328378f9cb Fix users claimed non-exclusively by an app service don't get notifications #2211 2017-05-11 11:42:08 +03:00
Erik Johnston b517ae2d03 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-11 09:04:03 +01:00
Erik Johnston ce32d1eed2 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-09 09:58:20 +01:00
Erik Johnston 0d8a2ee02b Revert "Remove unused import"
This reverts commit ab37bef83b.
2017-05-05 11:23:33 +01:00
Erik Johnston 25d3f28a98 Revert "We don't care about forgotten rooms"
This reverts commit ad8b316939.
2017-05-05 11:23:33 +01:00
Erik Johnston 17ddec014a Revert "Speed up filtering of a single event in push"
This reverts commit 421fdf7460.
2017-05-05 11:23:32 +01:00
Erik Johnston fb4c0ada1f Disable presence
This reverts commit 0ebd376a53 and
disables presence a bit more
2017-05-05 11:03:37 +01:00
Erik Johnston 85999aade0 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-05 11:00:45 +01:00
Erik Johnston dfc7bf2e84 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-03 10:54:24 +01:00
Erik Johnston 02928332d5 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-05-03 09:49:25 +01:00
hera 0ebd376a53 Reenable presence 2017-05-03 08:48:47 +00:00
Erik Johnston 05e62e0478 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-26 14:03:15 +01:00
Erik Johnston 97359ca4ec Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-25 11:18:47 +01:00
Erik Johnston 037aede1ee Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-24 14:36:14 +01:00
Erik Johnston 7ceacaaa6e Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-12 13:59:50 +01:00
Erik Johnston 0cbdfcbb75 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-11 09:44:37 +01:00
Erik Johnston 088d52ba6c Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-10 14:44:05 +01:00
Erik Johnston d728648b65 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-05 14:24:41 +01:00
Erik Johnston 023600d20c Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-05 10:10:07 +01:00
Erik Johnston 076bc0510b Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-04 15:47:19 +01:00
Erik Johnston 2effa32140 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-04-04 11:20:46 +01:00
Erik Johnston d7dbc56c71 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-31 10:11:07 +01:00
Erik Johnston 1ec9f7db27 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-27 18:39:25 +01:00
Erik Johnston 8224121502 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-24 13:59:35 +00:00
Erik Johnston c042bcbe0f Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-21 13:11:19 +00:00
Erik Johnston 5bf6bcb850 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-21 13:09:38 +00:00
Erik Johnston bbc0dbeec0 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-16 13:05:27 +00:00
Erik Johnston b6b1382be1 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-15 16:08:46 +00:00
Erik Johnston 224dc4f6a9 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-15 11:30:41 +00:00
Erik Johnston f90eb60ae9 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-15 10:00:10 +00:00
Erik Johnston a0d6987991 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-13 10:06:28 +00:00
Erik Johnston bf575ae20e Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-09 10:40:54 +00:00
Erik Johnston 929b005999 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-03-09 10:40:47 +00:00
Erik Johnston 01bcf01927 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-28 16:22:54 +00:00
Erik Johnston ec9c8fc6cf Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-28 15:21:35 +00:00
Erik Johnston f2c9e51d28 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-28 14:11:07 +00:00
Erik Johnston 8a6196c6c8 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-27 16:40:11 +00:00
Matrix 8d910ff5b9 Local changes 2017-02-24 16:01:57 +00:00
Erik Johnston cff886c47b Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-21 14:01:45 +00:00
Erik Johnston a24492c06f Don't limit count 2017-02-21 14:01:33 +00:00
Erik Johnston ede125b7e1 Fix up notif rotation 2017-02-18 14:41:31 +00:00
Erik Johnston c97d491649 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-16 15:12:52 +00:00
Erik Johnston 3b42bb96e4 Merge commit 'd7457c7661fa3b28427b21f44252c3abbee45ef8' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-15 10:17:01 +00:00
Erik Johnston 738000971e Merge branch 'develop' into matrix-org-hotfixes 2017-02-15 09:54:09 +00:00
Erik Johnston 96ef35c62f Merge branch 'master' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-09 13:50:38 +00:00
Erik Johnston 83c7b8ec91 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-02 19:26:05 +00:00
Erik Johnston f57cb21952 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-02 09:30:20 +00:00
Erik Johnston 4abecb7b02 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-02-01 14:33:28 +00:00
Erik Johnston 010159365c Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-31 11:20:51 +00:00
Erik Johnston 717e4448c4 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-30 14:36:46 +00:00
Erik Johnston e41f183aa8 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-20 14:00:31 +00:00
Erik Johnston 10f7bfe897 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-11 10:57:42 +00:00
Erik Johnston 772b8ebe54 PEP8 2017-01-10 16:38:00 +00:00
Erik Johnston 3c9acdef4d Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-10 16:31:06 +00:00
Erik Johnston fe28150cdc Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2017-01-10 16:16:17 +00:00
Mark Haines 69ef497253 Log which files we saved attachments to in the media_repository 2017-01-10 14:29:17 +00:00
Matrix 1d08de6ce9 Merge branch 'release-v0.18.7' into matrix-org-hotfixes 2017-01-07 04:03:15 +00:00
Matrix 10d31c433b Merge branch 'release-v0.18.7' into matrix-org-hotfixes 2017-01-07 03:12:35 +00:00
Mark Haines 345afd9fdc Merge tag 'v0.18.6-rc3' into matrix-org-hotfixes
v0.18.6-rc3
2017-01-05 14:19:36 +00:00
Matrix 7ed07066ac Merge branch 'markjh/linearizer_logging' into matrix-org-hotfixes 2017-01-01 13:55:13 +00:00
Mark Haines 2f703b8645 Merge branch 'markjh/fix_get_missing' into matrix-org-hotfixes 2016-12-30 18:47:10 +00:00
Mark Haines 9fff1aacca Merge remote-tracking branch 'origin/release-v0.18.6' into matrix-org-hotfixes 2016-12-30 14:04:16 +00:00
Mark Haines cb842dc99f Deleting from event_push_actions needs to use an index 2016-12-29 16:54:03 +00:00
Mark Haines 2238a10b42 Merge branch 'release-v0.18.6' into matrix-org-hotfixes 2016-12-29 16:26:52 +00:00
Matthew Hodgson 2998fa57b0 merge uncommitted changes from matrix.org into hotfixes-v0.18.5 2016-12-20 12:14:08 +00:00
Richard van der Hoff f5abaafabd uncommitted changes on matrix.org
These might be important? who knows?
2016-12-20 11:49:25 +00:00
24 changed files with 383 additions and 112 deletions
+1 -1
View File
@@ -33,7 +33,7 @@ To check whether your update was sucessfull, run:
.. code:: bash
# replace your.server.domain with ther domain of your synaspe homeserver
# replace your.server.domain with ther domain of your synapse homeserver
curl https://<your.server.domain>/_matrix/federation/v1/version
So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
+1
View File
@@ -121,6 +121,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
+10
View File
@@ -241,6 +241,16 @@ class ApplicationService(object):
def is_exclusive_room(self, room_id):
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
def get_exlusive_user_regexes(self):
"""Get the list of regexes used to determine if a user is exclusively
registered by the AS
"""
return [
regex_obj["regex"]
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
if regex_obj["exclusive"]
]
def is_rate_limited(self):
return self.rate_limited
+2 -3
View File
@@ -43,7 +43,6 @@ from synapse.events.utils import prune_event
from synapse.util.retryutils import NotRetryingDestination
from synapse.push.action_generator import ActionGenerator
from synapse.util.distributor import user_joined_room
from twisted.internet import defer
@@ -75,6 +74,7 @@ class FederationHandler(BaseHandler):
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
self.replication_layer.set_handler(self)
@@ -1389,8 +1389,7 @@ class FederationHandler(BaseHandler):
)
if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
yield self.action_generator.handle_push_actions_for_event(
event, context
)
+1
View File
@@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
+3 -3
View File
@@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.push.action_generator import ActionGenerator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
@@ -54,6 +53,8 @@ class MessageHandler(BaseHandler):
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
self.action_generator = hs.get_action_generator()
@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
event = yield self.store.get_event(event_id)
@@ -590,8 +591,7 @@ class MessageHandler(BaseHandler):
"Changing the room create event is forbidden",
)
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
yield self.action_generator.handle_push_actions_for_event(
event, context
)
+3
View File
@@ -372,6 +372,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -401,6 +402,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -462,6 +464,7 @@ class PresenceHandler(object):
syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server.
"""
return
# Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = (
+1 -1
View File
@@ -43,7 +43,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
+1 -1
View File
@@ -539,7 +539,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if False or not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
+5 -9
View File
@@ -15,7 +15,7 @@
from twisted.internet import defer
from .bulk_push_rule_evaluator import evaluator_for_event
from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
from synapse.util.metrics import Measure
@@ -24,11 +24,12 @@ import logging
logger = logging.getLogger(__name__)
class ActionGenerator:
class ActionGenerator(object):
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.bulk_evaluator = BulkPushRuleEvaluator(hs)
# really we want to get all user ids and all profile tags too,
# since we want the actions for each profile tag for every user and
# also actions for a client with no profile tag for each user.
@@ -38,16 +39,11 @@ class ActionGenerator:
@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "evaluator_for_event"):
bulk_evaluator = yield evaluator_for_event(
event, self.hs, self.store, context
)
with Measure(self.clock, "action_for_event_by_user"):
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
event, context
)
context.push_actions = [
(uid, actions) for uid, actions in actions_by_user.items()
(uid, actions) for uid, actions in actions_by_user.iteritems()
]
+256 -38
View File
@@ -19,60 +19,83 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.api.constants import EventTypes
from synapse.visibility import filter_events_for_clients_context
from synapse.api.constants import EventTypes, Membership
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
from collections import namedtuple
logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def evaluator_for_event(event, hs, store, context):
rules_by_user = yield store.bulk_get_push_rules_for_room(
event, context
)
# if this event is an invite event, we may need to run rules for the user
# who's been invited, otherwise they won't get told they've been invited
if event.type == 'm.room.member' and event.content['membership'] == 'invite':
invited_user = event.state_key
if invited_user and hs.is_mine_id(invited_user):
has_pusher = yield store.user_has_pusher(invited_user)
if has_pusher:
rules_by_user = dict(rules_by_user)
rules_by_user[invited_user] = yield store.get_push_rules_for_user(
invited_user
)
defer.returnValue(BulkPushRuleEvaluator(
event.room_id, rules_by_user, store
))
rules_by_room = {}
class BulkPushRuleEvaluator:
class BulkPushRuleEvaluator(object):
"""Calculates the outcome of push rules for an event for all users in the
room at once.
"""
Runs push rules for all users in a room.
This is faster than running PushRuleEvaluator for each user because it
fetches all the rules for all the users in one (batched) db query
rather than doing multiple queries per-user. It currently uses
the same logic to run the actual rules, but could be optimised further
(see https://matrix.org/jira/browse/SYN-562)
"""
def __init__(self, room_id, rules_by_user, store):
self.room_id = room_id
self.rules_by_user = rules_by_user
self.store = store
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@defer.inlineCallbacks
def _get_rules_for_event(self, event, context):
"""This gets the rules for all users in the room at the time of the event,
as well as the push rules for the invitee if the event is an invite.
Returns:
dict of user_id -> push_rules
"""
room_id = event.room_id
rules_for_room = self._get_rules_for_room(room_id)
rules_by_user = yield rules_for_room.get_rules(context)
# if this event is an invite event, we may need to run rules for the user
# who's been invited, otherwise they won't get told they've been invited
if event.type == 'm.room.member' and event.content['membership'] == 'invite':
invited = event.state_key
if invited and self.hs.is_mine_id(invited):
has_pusher = yield self.store.user_has_pusher(invited)
if has_pusher:
rules_by_user = dict(rules_by_user)
rules_by_user[invited] = yield self.store.get_push_rules_for_user(
invited
)
defer.returnValue(rules_by_user)
@cached()
def _get_rules_for_room(self, room_id):
"""Get the current RulesForRoom object for the given room id
Returns:
RulesForRoom
"""
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
"""Given an event and context, evaluate the push rules and return
the results
Returns:
dict of user_id -> action
"""
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
# None of these users can be peeking since this list of users comes
# from the set of users in the room, so we know for sure they're all
# actually in the room.
user_tuples = [
(u, False) for u in self.rules_by_user.keys()
]
user_tuples = [(u, False) for u in rules_by_user]
filtered_by_user = yield filter_events_for_clients_context(
self.store, user_tuples, [event], {event.event_id: context}
@@ -86,7 +109,7 @@ class BulkPushRuleEvaluator:
condition_cache = {}
for uid, rules in self.rules_by_user.items():
for uid, rules in rules_by_user.iteritems():
display_name = None
profile_info = room_members.get(uid)
if profile_info:
@@ -138,3 +161,198 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache):
return False
return True
class RulesForRoom(object):
"""Caches push rules for users in a room.
This efficiently handles users joining/leaving the room by not invalidating
the entire cache for the room.
"""
def __init__(self, hs, room_id, rules_for_room_cache):
"""
Args:
hs (HomeServer)
room_id (str)
rules_for_room_cache(Cache): The cache object that caches these
RoomsForUser objects.
"""
self.room_id = room_id
self.is_mine_id = hs.is_mine_id
self.store = hs.get_datastore()
self.linearizer = Linearizer(name="rules_for_room")
self.member_map = {} # event_id -> (user_id, state)
self.rules_by_user = {} # user_id -> rules
# The last state group we updated the caches for. If the state_group of
# a new event comes along, we know that we can just return the cached
# result.
# On invalidation of the rules themselves (if the user changes them),
# we invalidate everything and set state_group to `object()`
self.state_group = object()
# A sequence number to keep track of when we're allowed to update the
# cache. We bump the sequence number when we invalidate the cache. If
# the sequence number changes while we're calculating stuff we should
# not update the cache with it.
self.sequence = 0
# We need to be clever on the invalidating caches callbacks, as
# otherwise the invalidation callback holds a reference to the object,
# potentially causing it to leak.
# To get around this we pass a function that on invalidations looks ups
# the RoomsForUser entry in the cache, rather than keeping a reference
# to self around in the callback.
self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
@defer.inlineCallbacks
def get_rules(self, context):
"""Given an event context return the rules for all users who are
currently in the room.
"""
state_group = context.state_group
with (yield self.linearizer.queue(())):
if state_group and self.state_group == state_group:
defer.returnValue(self.rules_by_user)
ret_rules_by_user = {}
missing_member_event_ids = {}
if state_group and self.state_group == context.prev_group:
# If we have a simple delta then we can reuse most of the previous
# results.
ret_rules_by_user = self.rules_by_user
current_state_ids = context.delta_ids
else:
current_state_ids = context.current_state_ids
# Loop through to see which member events we've seen and have rules
# for and which we need to fetch
for key, event_id in current_state_ids.iteritems():
if key[0] != EventTypes.Member:
continue
res = self.member_map.get(event_id, None)
if res:
user_id, state = res
if state == Membership.JOIN:
rules = self.rules_by_user.get(user_id, None)
if rules:
ret_rules_by_user[user_id] = rules
continue
user_id = key[1]
if not self.is_mine_id(user_id):
continue
if self.store.get_if_app_services_interested_in_user(user_id):
continue
# If a user has left a room we remove their push rule. If they
# joined then we readd it later in _update_rules_with_member_event_ids
ret_rules_by_user.pop(user_id, None)
missing_member_event_ids[user_id] = event_id
if missing_member_event_ids:
# If we have some memebr events we haven't seen, look them up
# and fetch push rules for them if appropriate.
yield self._update_rules_with_member_event_ids(
ret_rules_by_user, missing_member_event_ids, state_group
)
defer.returnValue(ret_rules_by_user)
@defer.inlineCallbacks
def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids,
state_group):
"""Update the partially filled rules_by_user dict by fetching rules for
any newly joined users in the `member_event_ids` list.
Args:
ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
updated with any new rules.
member_event_ids (list): List of event ids for membership events that
have happened since the last time we filled rules_by_user
state_group: The state group we are currently computing push rules
for. Used when updating the cache.
"""
sequence = self.sequence
rows = yield self.store._simple_select_many_batch(
table="room_memberships",
column="event_id",
iterable=member_event_ids.values(),
retcols=('user_id', 'membership', 'event_id'),
keyvalues={},
batch_size=500,
desc="_get_rules_for_member_event_ids",
)
members = {
row["event_id"]: (row["user_id"], row["membership"])
for row in rows
}
interested_in_user_ids = set(
user_id for user_id, membership in members.itervalues()
if membership == Membership.JOIN
)
if_users_with_pushers = yield self.store.get_if_users_have_pushers(
interested_in_user_ids,
on_invalidate=self.invalidate_all_cb,
)
user_ids = set(
uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
)
users_with_receipts = yield self.store.get_users_with_read_receipts_in_room(
self.room_id, on_invalidate=self.invalidate_all_cb,
)
# any users with pushers must be ours: they have pushers
for uid in users_with_receipts:
if uid in interested_in_user_ids:
user_ids.add(uid)
rules_by_user = yield self.store.bulk_get_push_rules(
user_ids, on_invalidate=self.invalidate_all_cb,
)
ret_rules_by_user.update(
item for item in rules_by_user.iteritems() if item[0] is not None
)
self.update_cache(sequence, members, ret_rules_by_user, state_group)
def invalidate_all(self):
# Note: Don't hand this function directly to an invalidation callback
# as it keeps a reference to self and will stop this instance from being
# GC'd if it gets dropped from the rules_to_user cache. Instead use
# `self.invalidate_all_cb`
self.sequence += 1
self.state_group = object()
self.member_map = {}
self.rules_by_user = {}
def update_cache(self, sequence, members, rules_by_user, state_group):
if sequence == self.sequence:
self.member_map.update(members)
self.rules_by_user = rules_by_user
self.state_group = state_group
class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
# We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
# which namedtuple does for us (i.e. two _CacheContext are the same if
# their caches and keys match). This is important in particular to
# dedupe when we add callbacks to lru cache nodes, otherwise the number
# of callbacks would grow.
def __call__(self):
rules = self.cache.get(self.room_id, None, update_metrics=False)
if rules:
rules.invalidate_all()
+1 -1
View File
@@ -219,7 +219,7 @@ class EmailPusher(object):
def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000
return max(secs, 0)
return max(secs, 0) # Ensure non-negative
def get_room_throttle_ms(self, room_id):
if room_id in self.throttle_params:
+1 -1
View File
@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except:
raise SynapseError(400, "Unable to parse state")
yield self.presence_handler.set_state(user, state)
# yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))
+1
View File
@@ -481,6 +481,7 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, room_id):
# raise RuntimeError("Guest access has been disabled")
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
content = yield self.initial_sync_handler.room_initial_sync(
+5
View File
@@ -52,6 +52,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool
from synapse.rest.media.v1.media_repository import MediaRepository
from synapse.state import StateHandler
@@ -135,6 +136,7 @@ class HomeServer(object):
'macaroon_generator',
'tcp_replication',
'read_marker_handler',
'action_generator',
]
def __init__(self, hostname, **kwargs):
@@ -299,6 +301,9 @@ class HomeServer(object):
def build_tcp_replication(self):
raise NotImplementedError()
def build_action_generator(self):
return ActionGenerator(self)
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
+21 -5
View File
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import simplejson as json
from twisted.internet import defer
@@ -36,16 +37,31 @@ class ApplicationServiceStore(SQLBaseStore):
hs.config.app_service_config_files
)
# We precompie a regex constructed from all the regexes that the AS's
# have registered for exclusive users.
exclusive_user_regexes = [
regex.pattern
for service in self.services_cache
for regex in service.get_exlusive_user_regexes()
]
if exclusive_user_regexes:
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
self.exclusive_user_regex = re.compile(exclusive_user_regex)
else:
# We handle this case specially otherwise the constructed regex
# will always match
self.exclusive_user_regex = None
def get_app_services(self):
return self.services_cache
def get_if_app_services_interested_in_user(self, user_id):
"""Check if the user is one associated with an app service
"""Check if the user is one associated with an app service (exclusively)
"""
for service in self.services_cache:
if service.is_interested_in_user(user_id):
return True
return False
if self.exclusive_user_regex:
return bool(self.exclusive_user_regex.match(user_id))
else:
return False
def get_app_service_by_user_id(self, user_id):
"""Retrieve an application service from their user ID.
+6 -3
View File
@@ -87,6 +87,8 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
@@ -629,7 +631,7 @@ class EventPushActionsStore(SQLBaseStore):
)
if caught_up:
break
yield sleep(5)
yield sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@@ -650,9 +652,10 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""", (old_rotate_stream_ordering,))
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
# stream_row = (old_rotate_stream_ordering + self._rotate_count,)
if stream_row:
offset_stream_ordering, = stream_row
rotate_to_stream_ordering = min(
-1
View File
@@ -972,7 +972,6 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
+2 -2
View File
@@ -49,7 +49,7 @@ def _load_rules(rawrules, enabled_map):
class PushRuleStore(SQLBaseStore):
@cachedInlineCallbacks()
@cachedInlineCallbacks(max_entries=5000)
def get_push_rules_for_user(self, user_id):
rows = yield self._simple_select_list(
table="push_rules",
@@ -73,7 +73,7 @@ class PushRuleStore(SQLBaseStore):
defer.returnValue(rules)
@cachedInlineCallbacks()
@cachedInlineCallbacks(max_entries=5000)
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
+3 -1
View File
@@ -45,7 +45,9 @@ class ReceiptsStore(SQLBaseStore):
return
# Returns an ObservableDeferred
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
res = self.get_users_with_read_receipts_in_room.cache.get(
room_id, None, update_metrics=False,
)
if res:
if isinstance(res, defer.Deferred) and res.called:
+1 -1
View File
@@ -600,7 +600,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
return " & ".join(result + ":*" for result in results)
return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
+11 -29
View File
@@ -563,20 +563,22 @@ class StateStore(SQLBaseStore):
where a `state_key` of `None` matches all state_keys for the
`type`.
"""
is_all, state_dict_ids = self._state_group_cache.get(group)
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
missing_types = set()
for typ, state_key in types:
key = (typ, state_key)
if state_key is None:
type_to_key[typ] = None
missing_types.add((typ, state_key))
missing_types.add(key)
else:
if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key)
if (typ, state_key) not in state_dict_ids:
missing_types.add((typ, state_key))
if key not in state_dict_ids and key not in known_absent:
missing_types.add(key)
sentinel = object()
@@ -590,7 +592,7 @@ class StateStore(SQLBaseStore):
return True
return False
got_all = not (missing_types or types is None)
got_all = is_all or not missing_types
return {
k: v for k, v in state_dict_ids.iteritems()
@@ -607,7 +609,7 @@ class StateStore(SQLBaseStore):
Args:
group: The state group to lookup
"""
is_all, state_dict_ids = self._state_group_cache.get(group)
is_all, _, state_dict_ids = self._state_group_cache.get(group)
return state_dict_ids, is_all
@@ -624,7 +626,7 @@ class StateStore(SQLBaseStore):
missing_groups = []
if types is not None:
for group in set(groups):
state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
state_dict_ids, _, got_all = self._get_some_state_from_cache(
group, types
)
results[group] = state_dict_ids
@@ -653,19 +655,7 @@ class StateStore(SQLBaseStore):
# Now we want to update the cache with all the things we fetched
# from the database.
for group, group_state_dict in group_to_state_dict.iteritems():
if types:
# We delibrately put key -> None mappings into the cache to
# cache absence of the key, on the assumption that if we've
# explicitly asked for some types then we will probably ask
# for them again.
state_dict = {
(intern_string(etype), intern_string(state_key)): None
for (etype, state_key) in types
}
state_dict.update(results[group])
results[group] = state_dict
else:
state_dict = results[group]
state_dict = results[group]
state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
@@ -677,17 +667,9 @@ class StateStore(SQLBaseStore):
key=group,
value=state_dict,
full=(types is None),
known_absent=types,
)
# Remove all the entries with None values. The None values were just
# used for bookkeeping in the cache.
for group, state_dict in results.iteritems():
results[group] = {
key: event_id
for key, event_id in state_dict.iteritems()
if event_id
}
defer.returnValue(results)
def get_next_state_group(self):
+46 -11
View File
@@ -23,7 +23,17 @@ import logging
logger = logging.getLogger(__name__)
class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))):
class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))):
"""Returned when getting an entry from the cache
Attributes:
full (bool): Whether the cache has the full or dict or just some keys.
If not full then not all requested keys will necessarily be present
in `value`
known_absent (set): Keys that were looked up in the dict and were not
there.
value (dict): The full or partial dict value
"""
def __len__(self):
return len(self.value)
@@ -58,21 +68,31 @@ class DictionaryCache(object):
)
def get(self, key, dict_keys=None):
"""Fetch an entry out of the cache
Args:
key
dict_key(list): If given a set of keys then return only those keys
that exist in the cache.
Returns:
DictionaryEntry
"""
entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel:
self.metrics.inc_hits()
if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value))
return DictionaryEntry(entry.full, entry.known_absent, dict(entry.value))
else:
return DictionaryEntry(entry.full, {
return DictionaryEntry(entry.full, entry.known_absent, {
k: entry.value[k]
for k in dict_keys
if k in entry.value
})
self.metrics.inc_misses()
return DictionaryEntry(False, {})
return DictionaryEntry(False, set(), {})
def invalidate(self, key):
self.check_thread()
@@ -87,19 +107,34 @@ class DictionaryCache(object):
self.sequence += 1
self.cache.clear()
def update(self, sequence, key, value, full=False):
def update(self, sequence, key, value, full=False, known_absent=None):
"""Updates the entry in the cache
Args:
sequence
key
value (dict): The value to update the cache with.
full (bool): Whether the given value is the full dict, or just a
partial subset there of. If not full then any existing entries
for the key will be updated.
known_absent (set): Set of keys that we know don't exist in the full
dict.
"""
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
if known_absent is None:
known_absent = set()
if full:
self._insert(key, value)
self._insert(key, value, known_absent)
else:
self._update_or_insert(key, value)
self._update_or_insert(key, value, known_absent)
def _update_or_insert(self, key, value):
entry = self.cache.setdefault(key, DictionaryEntry(False, {}))
def _update_or_insert(self, key, value, known_absent):
entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {}))
entry.value.update(value)
entry.known_absent.update(known_absent)
def _insert(self, key, value):
self.cache[key] = DictionaryEntry(True, value)
def _insert(self, key, value, known_absent):
self.cache[key] = DictionaryEntry(True, known_absent, value)
+1 -1
View File
@@ -28,7 +28,7 @@ class DictCacheTestCase(unittest.TestCase):
key = "test_simple_cache_hit_full"
v = self.cache.get(key)
self.assertEqual((False, {}), v)
self.assertEqual((False, set(), {}), v)
seq = self.cache.sequence
test_value = {"test": "test_simple_cache_hit_full"}