Compare commits
94 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f0ed2e408f | |||
| ab4ee2e524 | |||
| 58ebb96cce | |||
| 3da426357c | |||
| 99713dc7d3 | |||
| 1c1c0257f4 | |||
| cafe659f72 | |||
| 5b83e958e8 | |||
| 72ed8196b3 | |||
| 0e242aebc0 | |||
| 107ac7ac96 | |||
| e8ac65c4c9 | |||
| 234772db6d | |||
| 760625acba | |||
| c57789d138 | |||
| fbec8847be | |||
| f33df30732 | |||
| 6e381180ae | |||
| 056ba9b795 | |||
| 689b454674 | |||
| 88664afe14 | |||
| f98efea9b1 | |||
| d9e3a4b5db | |||
| 66d8ffabbd | |||
| ace23463c5 | |||
| bbfe4e996c | |||
| 9f430fa07f | |||
| 120eefdc19 | |||
| 328378f9cb | |||
| b517ae2d03 | |||
| ce32d1eed2 | |||
| 0d8a2ee02b | |||
| 25d3f28a98 | |||
| 17ddec014a | |||
| fb4c0ada1f | |||
| 85999aade0 | |||
| dfc7bf2e84 | |||
| 02928332d5 | |||
| 0ebd376a53 | |||
| 05e62e0478 | |||
| 97359ca4ec | |||
| 037aede1ee | |||
| 7ceacaaa6e | |||
| 0cbdfcbb75 | |||
| 088d52ba6c | |||
| d728648b65 | |||
| 023600d20c | |||
| 076bc0510b | |||
| 2effa32140 | |||
| d7dbc56c71 | |||
| 1ec9f7db27 | |||
| 8224121502 | |||
| c042bcbe0f | |||
| 5bf6bcb850 | |||
| bbc0dbeec0 | |||
| b6b1382be1 | |||
| 224dc4f6a9 | |||
| f90eb60ae9 | |||
| a0d6987991 | |||
| bf575ae20e | |||
| 929b005999 | |||
| 01bcf01927 | |||
| ec9c8fc6cf | |||
| f2c9e51d28 | |||
| 8a6196c6c8 | |||
| 8d910ff5b9 | |||
| cff886c47b | |||
| a24492c06f | |||
| ede125b7e1 | |||
| c97d491649 | |||
| 3b42bb96e4 | |||
| 738000971e | |||
| 96ef35c62f | |||
| 83c7b8ec91 | |||
| f57cb21952 | |||
| 4abecb7b02 | |||
| 010159365c | |||
| 717e4448c4 | |||
| e41f183aa8 | |||
| 10f7bfe897 | |||
| 772b8ebe54 | |||
| 3c9acdef4d | |||
| fe28150cdc | |||
| 69ef497253 | |||
| 1d08de6ce9 | |||
| 10d31c433b | |||
| 345afd9fdc | |||
| 7ed07066ac | |||
| 2f703b8645 | |||
| 9fff1aacca | |||
| cb842dc99f | |||
| 2238a10b42 | |||
| 2998fa57b0 | |||
| f5abaafabd |
+1
-1
@@ -33,7 +33,7 @@ To check whether your update was sucessfull, run:
|
||||
|
||||
.. code:: bash
|
||||
|
||||
# replace your.server.domain with ther domain of your synaspe homeserver
|
||||
# replace your.server.domain with ther domain of your synapse homeserver
|
||||
curl https://<your.server.domain>/_matrix/federation/v1/version
|
||||
|
||||
So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
|
||||
|
||||
@@ -121,6 +121,7 @@ class SynchrotronPresence(object):
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
return
|
||||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
|
||||
@@ -241,6 +241,16 @@ class ApplicationService(object):
|
||||
def is_exclusive_room(self, room_id):
|
||||
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
|
||||
|
||||
def get_exlusive_user_regexes(self):
|
||||
"""Get the list of regexes used to determine if a user is exclusively
|
||||
registered by the AS
|
||||
"""
|
||||
return [
|
||||
regex_obj["regex"]
|
||||
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
|
||||
if regex_obj["exclusive"]
|
||||
]
|
||||
|
||||
def is_rate_limited(self):
|
||||
return self.rate_limited
|
||||
|
||||
|
||||
@@ -43,7 +43,6 @@ from synapse.events.utils import prune_event
|
||||
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.util.distributor import user_joined_room
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -75,6 +74,7 @@ class FederationHandler(BaseHandler):
|
||||
self.state_handler = hs.get_state_handler()
|
||||
self.server_name = hs.hostname
|
||||
self.keyring = hs.get_keyring()
|
||||
self.action_generator = hs.get_action_generator()
|
||||
|
||||
self.replication_layer.set_handler(self)
|
||||
|
||||
@@ -1389,8 +1389,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
if not event.internal_metadata.is_outlier():
|
||||
action_generator = ActionGenerator(self.hs)
|
||||
yield action_generator.handle_push_actions_for_event(
|
||||
yield self.action_generator.handle_push_actions_for_event(
|
||||
event, context
|
||||
)
|
||||
|
||||
|
||||
@@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_presence():
|
||||
defer.returnValue([])
|
||||
states = yield presence_handler.get_states(
|
||||
[m.user_id for m in room_members],
|
||||
as_event=True,
|
||||
|
||||
@@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.types import (
|
||||
UserID, RoomAlias, RoomStreamToken,
|
||||
)
|
||||
@@ -54,6 +53,8 @@ class MessageHandler(BaseHandler):
|
||||
# This is to stop us from diverging history *too* much.
|
||||
self.limiter = Limiter(max_count=5)
|
||||
|
||||
self.action_generator = hs.get_action_generator()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def purge_history(self, room_id, event_id):
|
||||
event = yield self.store.get_event(event_id)
|
||||
@@ -590,8 +591,7 @@ class MessageHandler(BaseHandler):
|
||||
"Changing the room create event is forbidden",
|
||||
)
|
||||
|
||||
action_generator = ActionGenerator(self.hs)
|
||||
yield action_generator.handle_push_actions_for_event(
|
||||
yield self.action_generator.handle_push_actions_for_event(
|
||||
event, context
|
||||
)
|
||||
|
||||
|
||||
@@ -372,6 +372,7 @@ class PresenceHandler(object):
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
return
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
@@ -401,6 +402,7 @@ class PresenceHandler(object):
|
||||
Useful for streams that are not associated with an actual
|
||||
client that is being used by a user.
|
||||
"""
|
||||
affect_presence = False
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
@@ -462,6 +464,7 @@ class PresenceHandler(object):
|
||||
syncing_user_ids(set(str)): The set of user_ids that are
|
||||
currently syncing on that server.
|
||||
"""
|
||||
return
|
||||
|
||||
# Grab the previous list of user_ids that were syncing on that process
|
||||
prev_syncing_user_ids = (
|
||||
|
||||
@@ -43,7 +43,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
class RoomListHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
super(RoomListHandler, self).__init__(hs)
|
||||
self.response_cache = ResponseCache(hs)
|
||||
self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
|
||||
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
|
||||
|
||||
def get_local_public_room_list(self, limit=None, since_token=None,
|
||||
|
||||
@@ -539,7 +539,7 @@ class SyncHandler(object):
|
||||
since_token is None and
|
||||
sync_config.filter_collection.blocks_all_presence()
|
||||
)
|
||||
if not block_all_presence_data:
|
||||
if False or not block_all_presence_data:
|
||||
yield self._generate_sync_entry_for_presence(
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from .bulk_push_rule_evaluator import evaluator_for_event
|
||||
from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
|
||||
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -24,11 +24,12 @@ import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ActionGenerator:
|
||||
class ActionGenerator(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastore()
|
||||
self.bulk_evaluator = BulkPushRuleEvaluator(hs)
|
||||
# really we want to get all user ids and all profile tags too,
|
||||
# since we want the actions for each profile tag for every user and
|
||||
# also actions for a client with no profile tag for each user.
|
||||
@@ -38,16 +39,11 @@ class ActionGenerator:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_push_actions_for_event(self, event, context):
|
||||
with Measure(self.clock, "evaluator_for_event"):
|
||||
bulk_evaluator = yield evaluator_for_event(
|
||||
event, self.hs, self.store, context
|
||||
)
|
||||
|
||||
with Measure(self.clock, "action_for_event_by_user"):
|
||||
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
||||
actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
|
||||
event, context
|
||||
)
|
||||
|
||||
context.push_actions = [
|
||||
(uid, actions) for uid, actions in actions_by_user.items()
|
||||
(uid, actions) for uid, actions in actions_by_user.iteritems()
|
||||
]
|
||||
|
||||
@@ -19,60 +19,83 @@ from twisted.internet import defer
|
||||
|
||||
from .push_rule_evaluator import PushRuleEvaluatorForEvent
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.visibility import filter_events_for_clients_context
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.async import Linearizer
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def evaluator_for_event(event, hs, store, context):
|
||||
rules_by_user = yield store.bulk_get_push_rules_for_room(
|
||||
event, context
|
||||
)
|
||||
|
||||
# if this event is an invite event, we may need to run rules for the user
|
||||
# who's been invited, otherwise they won't get told they've been invited
|
||||
if event.type == 'm.room.member' and event.content['membership'] == 'invite':
|
||||
invited_user = event.state_key
|
||||
if invited_user and hs.is_mine_id(invited_user):
|
||||
has_pusher = yield store.user_has_pusher(invited_user)
|
||||
if has_pusher:
|
||||
rules_by_user = dict(rules_by_user)
|
||||
rules_by_user[invited_user] = yield store.get_push_rules_for_user(
|
||||
invited_user
|
||||
)
|
||||
|
||||
defer.returnValue(BulkPushRuleEvaluator(
|
||||
event.room_id, rules_by_user, store
|
||||
))
|
||||
rules_by_room = {}
|
||||
|
||||
|
||||
class BulkPushRuleEvaluator:
|
||||
class BulkPushRuleEvaluator(object):
|
||||
"""Calculates the outcome of push rules for an event for all users in the
|
||||
room at once.
|
||||
"""
|
||||
Runs push rules for all users in a room.
|
||||
This is faster than running PushRuleEvaluator for each user because it
|
||||
fetches all the rules for all the users in one (batched) db query
|
||||
rather than doing multiple queries per-user. It currently uses
|
||||
the same logic to run the actual rules, but could be optimised further
|
||||
(see https://matrix.org/jira/browse/SYN-562)
|
||||
"""
|
||||
def __init__(self, room_id, rules_by_user, store):
|
||||
self.room_id = room_id
|
||||
self.rules_by_user = rules_by_user
|
||||
self.store = store
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_rules_for_event(self, event, context):
|
||||
"""This gets the rules for all users in the room at the time of the event,
|
||||
as well as the push rules for the invitee if the event is an invite.
|
||||
|
||||
Returns:
|
||||
dict of user_id -> push_rules
|
||||
"""
|
||||
room_id = event.room_id
|
||||
rules_for_room = self._get_rules_for_room(room_id)
|
||||
|
||||
rules_by_user = yield rules_for_room.get_rules(context)
|
||||
|
||||
# if this event is an invite event, we may need to run rules for the user
|
||||
# who's been invited, otherwise they won't get told they've been invited
|
||||
if event.type == 'm.room.member' and event.content['membership'] == 'invite':
|
||||
invited = event.state_key
|
||||
if invited and self.hs.is_mine_id(invited):
|
||||
has_pusher = yield self.store.user_has_pusher(invited)
|
||||
if has_pusher:
|
||||
rules_by_user = dict(rules_by_user)
|
||||
rules_by_user[invited] = yield self.store.get_push_rules_for_user(
|
||||
invited
|
||||
)
|
||||
|
||||
defer.returnValue(rules_by_user)
|
||||
|
||||
@cached()
|
||||
def _get_rules_for_room(self, room_id):
|
||||
"""Get the current RulesForRoom object for the given room id
|
||||
|
||||
Returns:
|
||||
RulesForRoom
|
||||
"""
|
||||
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
|
||||
# before any lookup methods get called on it as otherwise there may be
|
||||
# a race if invalidate_all gets called (which assumes its in the cache)
|
||||
return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def action_for_event_by_user(self, event, context):
|
||||
"""Given an event and context, evaluate the push rules and return
|
||||
the results
|
||||
|
||||
Returns:
|
||||
dict of user_id -> action
|
||||
"""
|
||||
rules_by_user = yield self._get_rules_for_event(event, context)
|
||||
actions_by_user = {}
|
||||
|
||||
# None of these users can be peeking since this list of users comes
|
||||
# from the set of users in the room, so we know for sure they're all
|
||||
# actually in the room.
|
||||
user_tuples = [
|
||||
(u, False) for u in self.rules_by_user.keys()
|
||||
]
|
||||
user_tuples = [(u, False) for u in rules_by_user]
|
||||
|
||||
filtered_by_user = yield filter_events_for_clients_context(
|
||||
self.store, user_tuples, [event], {event.event_id: context}
|
||||
@@ -86,7 +109,7 @@ class BulkPushRuleEvaluator:
|
||||
|
||||
condition_cache = {}
|
||||
|
||||
for uid, rules in self.rules_by_user.items():
|
||||
for uid, rules in rules_by_user.iteritems():
|
||||
display_name = None
|
||||
profile_info = room_members.get(uid)
|
||||
if profile_info:
|
||||
@@ -138,3 +161,198 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class RulesForRoom(object):
|
||||
"""Caches push rules for users in a room.
|
||||
|
||||
This efficiently handles users joining/leaving the room by not invalidating
|
||||
the entire cache for the room.
|
||||
"""
|
||||
|
||||
def __init__(self, hs, room_id, rules_for_room_cache):
|
||||
"""
|
||||
Args:
|
||||
hs (HomeServer)
|
||||
room_id (str)
|
||||
rules_for_room_cache(Cache): The cache object that caches these
|
||||
RoomsForUser objects.
|
||||
"""
|
||||
self.room_id = room_id
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
self.linearizer = Linearizer(name="rules_for_room")
|
||||
|
||||
self.member_map = {} # event_id -> (user_id, state)
|
||||
self.rules_by_user = {} # user_id -> rules
|
||||
|
||||
# The last state group we updated the caches for. If the state_group of
|
||||
# a new event comes along, we know that we can just return the cached
|
||||
# result.
|
||||
# On invalidation of the rules themselves (if the user changes them),
|
||||
# we invalidate everything and set state_group to `object()`
|
||||
self.state_group = object()
|
||||
|
||||
# A sequence number to keep track of when we're allowed to update the
|
||||
# cache. We bump the sequence number when we invalidate the cache. If
|
||||
# the sequence number changes while we're calculating stuff we should
|
||||
# not update the cache with it.
|
||||
self.sequence = 0
|
||||
|
||||
# We need to be clever on the invalidating caches callbacks, as
|
||||
# otherwise the invalidation callback holds a reference to the object,
|
||||
# potentially causing it to leak.
|
||||
# To get around this we pass a function that on invalidations looks ups
|
||||
# the RoomsForUser entry in the cache, rather than keeping a reference
|
||||
# to self around in the callback.
|
||||
self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_rules(self, context):
|
||||
"""Given an event context return the rules for all users who are
|
||||
currently in the room.
|
||||
"""
|
||||
state_group = context.state_group
|
||||
|
||||
with (yield self.linearizer.queue(())):
|
||||
if state_group and self.state_group == state_group:
|
||||
defer.returnValue(self.rules_by_user)
|
||||
|
||||
ret_rules_by_user = {}
|
||||
missing_member_event_ids = {}
|
||||
if state_group and self.state_group == context.prev_group:
|
||||
# If we have a simple delta then we can reuse most of the previous
|
||||
# results.
|
||||
ret_rules_by_user = self.rules_by_user
|
||||
current_state_ids = context.delta_ids
|
||||
else:
|
||||
current_state_ids = context.current_state_ids
|
||||
|
||||
# Loop through to see which member events we've seen and have rules
|
||||
# for and which we need to fetch
|
||||
for key, event_id in current_state_ids.iteritems():
|
||||
if key[0] != EventTypes.Member:
|
||||
continue
|
||||
|
||||
res = self.member_map.get(event_id, None)
|
||||
if res:
|
||||
user_id, state = res
|
||||
if state == Membership.JOIN:
|
||||
rules = self.rules_by_user.get(user_id, None)
|
||||
if rules:
|
||||
ret_rules_by_user[user_id] = rules
|
||||
continue
|
||||
|
||||
user_id = key[1]
|
||||
if not self.is_mine_id(user_id):
|
||||
continue
|
||||
|
||||
if self.store.get_if_app_services_interested_in_user(user_id):
|
||||
continue
|
||||
|
||||
# If a user has left a room we remove their push rule. If they
|
||||
# joined then we readd it later in _update_rules_with_member_event_ids
|
||||
ret_rules_by_user.pop(user_id, None)
|
||||
missing_member_event_ids[user_id] = event_id
|
||||
|
||||
if missing_member_event_ids:
|
||||
# If we have some memebr events we haven't seen, look them up
|
||||
# and fetch push rules for them if appropriate.
|
||||
yield self._update_rules_with_member_event_ids(
|
||||
ret_rules_by_user, missing_member_event_ids, state_group
|
||||
)
|
||||
|
||||
defer.returnValue(ret_rules_by_user)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids,
|
||||
state_group):
|
||||
"""Update the partially filled rules_by_user dict by fetching rules for
|
||||
any newly joined users in the `member_event_ids` list.
|
||||
|
||||
Args:
|
||||
ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
|
||||
updated with any new rules.
|
||||
member_event_ids (list): List of event ids for membership events that
|
||||
have happened since the last time we filled rules_by_user
|
||||
state_group: The state group we are currently computing push rules
|
||||
for. Used when updating the cache.
|
||||
"""
|
||||
sequence = self.sequence
|
||||
|
||||
rows = yield self.store._simple_select_many_batch(
|
||||
table="room_memberships",
|
||||
column="event_id",
|
||||
iterable=member_event_ids.values(),
|
||||
retcols=('user_id', 'membership', 'event_id'),
|
||||
keyvalues={},
|
||||
batch_size=500,
|
||||
desc="_get_rules_for_member_event_ids",
|
||||
)
|
||||
|
||||
members = {
|
||||
row["event_id"]: (row["user_id"], row["membership"])
|
||||
for row in rows
|
||||
}
|
||||
|
||||
interested_in_user_ids = set(
|
||||
user_id for user_id, membership in members.itervalues()
|
||||
if membership == Membership.JOIN
|
||||
)
|
||||
|
||||
if_users_with_pushers = yield self.store.get_if_users_have_pushers(
|
||||
interested_in_user_ids,
|
||||
on_invalidate=self.invalidate_all_cb,
|
||||
)
|
||||
|
||||
user_ids = set(
|
||||
uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
|
||||
)
|
||||
|
||||
users_with_receipts = yield self.store.get_users_with_read_receipts_in_room(
|
||||
self.room_id, on_invalidate=self.invalidate_all_cb,
|
||||
)
|
||||
|
||||
# any users with pushers must be ours: they have pushers
|
||||
for uid in users_with_receipts:
|
||||
if uid in interested_in_user_ids:
|
||||
user_ids.add(uid)
|
||||
|
||||
rules_by_user = yield self.store.bulk_get_push_rules(
|
||||
user_ids, on_invalidate=self.invalidate_all_cb,
|
||||
)
|
||||
|
||||
ret_rules_by_user.update(
|
||||
item for item in rules_by_user.iteritems() if item[0] is not None
|
||||
)
|
||||
|
||||
self.update_cache(sequence, members, ret_rules_by_user, state_group)
|
||||
|
||||
def invalidate_all(self):
|
||||
# Note: Don't hand this function directly to an invalidation callback
|
||||
# as it keeps a reference to self and will stop this instance from being
|
||||
# GC'd if it gets dropped from the rules_to_user cache. Instead use
|
||||
# `self.invalidate_all_cb`
|
||||
self.sequence += 1
|
||||
self.state_group = object()
|
||||
self.member_map = {}
|
||||
self.rules_by_user = {}
|
||||
|
||||
def update_cache(self, sequence, members, rules_by_user, state_group):
|
||||
if sequence == self.sequence:
|
||||
self.member_map.update(members)
|
||||
self.rules_by_user = rules_by_user
|
||||
self.state_group = state_group
|
||||
|
||||
|
||||
class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
|
||||
# We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
|
||||
# which namedtuple does for us (i.e. two _CacheContext are the same if
|
||||
# their caches and keys match). This is important in particular to
|
||||
# dedupe when we add callbacks to lru cache nodes, otherwise the number
|
||||
# of callbacks would grow.
|
||||
def __call__(self):
|
||||
rules = self.cache.get(self.room_id, None, update_metrics=False)
|
||||
if rules:
|
||||
rules.invalidate_all()
|
||||
|
||||
@@ -219,7 +219,7 @@ class EmailPusher(object):
|
||||
|
||||
def seconds_until(self, ts_msec):
|
||||
secs = (ts_msec - self.clock.time_msec()) / 1000
|
||||
return max(secs, 0)
|
||||
return max(secs, 0) # Ensure non-negative
|
||||
|
||||
def get_room_throttle_ms(self, room_id):
|
||||
if room_id in self.throttle_params:
|
||||
|
||||
@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
|
||||
except:
|
||||
raise SynapseError(400, "Unable to parse state")
|
||||
|
||||
yield self.presence_handler.set_state(user, state)
|
||||
# yield self.presence_handler.set_state(user, state)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
@@ -481,6 +481,7 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
# raise RuntimeError("Guest access has been disabled")
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
pagination_config = PaginationConfig.from_request(request)
|
||||
content = yield self.initial_sync_handler.room_initial_sync(
|
||||
|
||||
@@ -52,6 +52,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler
|
||||
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.notifier import Notifier
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.push.pusherpool import PusherPool
|
||||
from synapse.rest.media.v1.media_repository import MediaRepository
|
||||
from synapse.state import StateHandler
|
||||
@@ -135,6 +136,7 @@ class HomeServer(object):
|
||||
'macaroon_generator',
|
||||
'tcp_replication',
|
||||
'read_marker_handler',
|
||||
'action_generator',
|
||||
]
|
||||
|
||||
def __init__(self, hostname, **kwargs):
|
||||
@@ -299,6 +301,9 @@ class HomeServer(object):
|
||||
def build_tcp_replication(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def build_action_generator(self):
|
||||
return ActionGenerator(self)
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import re
|
||||
import simplejson as json
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -36,16 +37,31 @@ class ApplicationServiceStore(SQLBaseStore):
|
||||
hs.config.app_service_config_files
|
||||
)
|
||||
|
||||
# We precompie a regex constructed from all the regexes that the AS's
|
||||
# have registered for exclusive users.
|
||||
exclusive_user_regexes = [
|
||||
regex.pattern
|
||||
for service in self.services_cache
|
||||
for regex in service.get_exlusive_user_regexes()
|
||||
]
|
||||
if exclusive_user_regexes:
|
||||
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
|
||||
self.exclusive_user_regex = re.compile(exclusive_user_regex)
|
||||
else:
|
||||
# We handle this case specially otherwise the constructed regex
|
||||
# will always match
|
||||
self.exclusive_user_regex = None
|
||||
|
||||
def get_app_services(self):
|
||||
return self.services_cache
|
||||
|
||||
def get_if_app_services_interested_in_user(self, user_id):
|
||||
"""Check if the user is one associated with an app service
|
||||
"""Check if the user is one associated with an app service (exclusively)
|
||||
"""
|
||||
for service in self.services_cache:
|
||||
if service.is_interested_in_user(user_id):
|
||||
return True
|
||||
return False
|
||||
if self.exclusive_user_regex:
|
||||
return bool(self.exclusive_user_regex.match(user_id))
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_app_service_by_user_id(self, user_id):
|
||||
"""Retrieve an application service from their user ID.
|
||||
|
||||
@@ -87,6 +87,8 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
self._rotate_notif_loop = self._clock.looping_call(
|
||||
self._rotate_notifs, 30 * 60 * 1000
|
||||
)
|
||||
self._rotate_delay = 3
|
||||
self._rotate_count = 10000
|
||||
|
||||
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
|
||||
"""
|
||||
@@ -629,7 +631,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
)
|
||||
if caught_up:
|
||||
break
|
||||
yield sleep(5)
|
||||
yield sleep(self._rotate_delay)
|
||||
finally:
|
||||
self._doing_notif_rotation = False
|
||||
|
||||
@@ -650,9 +652,10 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
txn.execute("""
|
||||
SELECT stream_ordering FROM event_push_actions
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
|
||||
""", (old_rotate_stream_ordering,))
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
|
||||
""", (old_rotate_stream_ordering, self._rotate_count))
|
||||
stream_row = txn.fetchone()
|
||||
# stream_row = (old_rotate_stream_ordering + self._rotate_count,)
|
||||
if stream_row:
|
||||
offset_stream_ordering, = stream_row
|
||||
rotate_to_stream_ordering = min(
|
||||
|
||||
@@ -972,7 +972,6 @@ class EventsStore(SQLBaseStore):
|
||||
"event_edge_hashes",
|
||||
"event_edges",
|
||||
"event_forward_extremities",
|
||||
"event_push_actions",
|
||||
"event_reference_hashes",
|
||||
"event_search",
|
||||
"event_signatures",
|
||||
|
||||
@@ -49,7 +49,7 @@ def _load_rules(rawrules, enabled_map):
|
||||
|
||||
|
||||
class PushRuleStore(SQLBaseStore):
|
||||
@cachedInlineCallbacks()
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
def get_push_rules_for_user(self, user_id):
|
||||
rows = yield self._simple_select_list(
|
||||
table="push_rules",
|
||||
@@ -73,7 +73,7 @@ class PushRuleStore(SQLBaseStore):
|
||||
|
||||
defer.returnValue(rules)
|
||||
|
||||
@cachedInlineCallbacks()
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
def get_push_rules_enabled_for_user(self, user_id):
|
||||
results = yield self._simple_select_list(
|
||||
table="push_rules_enable",
|
||||
|
||||
@@ -45,7 +45,9 @@ class ReceiptsStore(SQLBaseStore):
|
||||
return
|
||||
|
||||
# Returns an ObservableDeferred
|
||||
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
|
||||
res = self.get_users_with_read_receipts_in_room.cache.get(
|
||||
room_id, None, update_metrics=False,
|
||||
)
|
||||
|
||||
if res:
|
||||
if isinstance(res, defer.Deferred) and res.called:
|
||||
|
||||
@@ -600,7 +600,7 @@ def _parse_query(database_engine, search_term):
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
return " & ".join(result + ":*" for result in results)
|
||||
return " & ".join(result for result in results)
|
||||
elif isinstance(database_engine, Sqlite3Engine):
|
||||
return " & ".join(result + "*" for result in results)
|
||||
else:
|
||||
|
||||
+11
-29
@@ -563,20 +563,22 @@ class StateStore(SQLBaseStore):
|
||||
where a `state_key` of `None` matches all state_keys for the
|
||||
`type`.
|
||||
"""
|
||||
is_all, state_dict_ids = self._state_group_cache.get(group)
|
||||
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
|
||||
|
||||
type_to_key = {}
|
||||
missing_types = set()
|
||||
|
||||
for typ, state_key in types:
|
||||
key = (typ, state_key)
|
||||
if state_key is None:
|
||||
type_to_key[typ] = None
|
||||
missing_types.add((typ, state_key))
|
||||
missing_types.add(key)
|
||||
else:
|
||||
if type_to_key.get(typ, object()) is not None:
|
||||
type_to_key.setdefault(typ, set()).add(state_key)
|
||||
|
||||
if (typ, state_key) not in state_dict_ids:
|
||||
missing_types.add((typ, state_key))
|
||||
if key not in state_dict_ids and key not in known_absent:
|
||||
missing_types.add(key)
|
||||
|
||||
sentinel = object()
|
||||
|
||||
@@ -590,7 +592,7 @@ class StateStore(SQLBaseStore):
|
||||
return True
|
||||
return False
|
||||
|
||||
got_all = not (missing_types or types is None)
|
||||
got_all = is_all or not missing_types
|
||||
|
||||
return {
|
||||
k: v for k, v in state_dict_ids.iteritems()
|
||||
@@ -607,7 +609,7 @@ class StateStore(SQLBaseStore):
|
||||
Args:
|
||||
group: The state group to lookup
|
||||
"""
|
||||
is_all, state_dict_ids = self._state_group_cache.get(group)
|
||||
is_all, _, state_dict_ids = self._state_group_cache.get(group)
|
||||
|
||||
return state_dict_ids, is_all
|
||||
|
||||
@@ -624,7 +626,7 @@ class StateStore(SQLBaseStore):
|
||||
missing_groups = []
|
||||
if types is not None:
|
||||
for group in set(groups):
|
||||
state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
|
||||
state_dict_ids, _, got_all = self._get_some_state_from_cache(
|
||||
group, types
|
||||
)
|
||||
results[group] = state_dict_ids
|
||||
@@ -653,19 +655,7 @@ class StateStore(SQLBaseStore):
|
||||
# Now we want to update the cache with all the things we fetched
|
||||
# from the database.
|
||||
for group, group_state_dict in group_to_state_dict.iteritems():
|
||||
if types:
|
||||
# We delibrately put key -> None mappings into the cache to
|
||||
# cache absence of the key, on the assumption that if we've
|
||||
# explicitly asked for some types then we will probably ask
|
||||
# for them again.
|
||||
state_dict = {
|
||||
(intern_string(etype), intern_string(state_key)): None
|
||||
for (etype, state_key) in types
|
||||
}
|
||||
state_dict.update(results[group])
|
||||
results[group] = state_dict
|
||||
else:
|
||||
state_dict = results[group]
|
||||
state_dict = results[group]
|
||||
|
||||
state_dict.update(
|
||||
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
|
||||
@@ -677,17 +667,9 @@ class StateStore(SQLBaseStore):
|
||||
key=group,
|
||||
value=state_dict,
|
||||
full=(types is None),
|
||||
known_absent=types,
|
||||
)
|
||||
|
||||
# Remove all the entries with None values. The None values were just
|
||||
# used for bookkeeping in the cache.
|
||||
for group, state_dict in results.iteritems():
|
||||
results[group] = {
|
||||
key: event_id
|
||||
for key, event_id in state_dict.iteritems()
|
||||
if event_id
|
||||
}
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
def get_next_state_group(self):
|
||||
|
||||
@@ -23,7 +23,17 @@ import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))):
|
||||
class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))):
|
||||
"""Returned when getting an entry from the cache
|
||||
|
||||
Attributes:
|
||||
full (bool): Whether the cache has the full or dict or just some keys.
|
||||
If not full then not all requested keys will necessarily be present
|
||||
in `value`
|
||||
known_absent (set): Keys that were looked up in the dict and were not
|
||||
there.
|
||||
value (dict): The full or partial dict value
|
||||
"""
|
||||
def __len__(self):
|
||||
return len(self.value)
|
||||
|
||||
@@ -58,21 +68,31 @@ class DictionaryCache(object):
|
||||
)
|
||||
|
||||
def get(self, key, dict_keys=None):
|
||||
"""Fetch an entry out of the cache
|
||||
|
||||
Args:
|
||||
key
|
||||
dict_key(list): If given a set of keys then return only those keys
|
||||
that exist in the cache.
|
||||
|
||||
Returns:
|
||||
DictionaryEntry
|
||||
"""
|
||||
entry = self.cache.get(key, self.sentinel)
|
||||
if entry is not self.sentinel:
|
||||
self.metrics.inc_hits()
|
||||
|
||||
if dict_keys is None:
|
||||
return DictionaryEntry(entry.full, dict(entry.value))
|
||||
return DictionaryEntry(entry.full, entry.known_absent, dict(entry.value))
|
||||
else:
|
||||
return DictionaryEntry(entry.full, {
|
||||
return DictionaryEntry(entry.full, entry.known_absent, {
|
||||
k: entry.value[k]
|
||||
for k in dict_keys
|
||||
if k in entry.value
|
||||
})
|
||||
|
||||
self.metrics.inc_misses()
|
||||
return DictionaryEntry(False, {})
|
||||
return DictionaryEntry(False, set(), {})
|
||||
|
||||
def invalidate(self, key):
|
||||
self.check_thread()
|
||||
@@ -87,19 +107,34 @@ class DictionaryCache(object):
|
||||
self.sequence += 1
|
||||
self.cache.clear()
|
||||
|
||||
def update(self, sequence, key, value, full=False):
|
||||
def update(self, sequence, key, value, full=False, known_absent=None):
|
||||
"""Updates the entry in the cache
|
||||
|
||||
Args:
|
||||
sequence
|
||||
key
|
||||
value (dict): The value to update the cache with.
|
||||
full (bool): Whether the given value is the full dict, or just a
|
||||
partial subset there of. If not full then any existing entries
|
||||
for the key will be updated.
|
||||
known_absent (set): Set of keys that we know don't exist in the full
|
||||
dict.
|
||||
"""
|
||||
self.check_thread()
|
||||
if self.sequence == sequence:
|
||||
# Only update the cache if the caches sequence number matches the
|
||||
# number that the cache had before the SELECT was started (SYN-369)
|
||||
if known_absent is None:
|
||||
known_absent = set()
|
||||
if full:
|
||||
self._insert(key, value)
|
||||
self._insert(key, value, known_absent)
|
||||
else:
|
||||
self._update_or_insert(key, value)
|
||||
self._update_or_insert(key, value, known_absent)
|
||||
|
||||
def _update_or_insert(self, key, value):
|
||||
entry = self.cache.setdefault(key, DictionaryEntry(False, {}))
|
||||
def _update_or_insert(self, key, value, known_absent):
|
||||
entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {}))
|
||||
entry.value.update(value)
|
||||
entry.known_absent.update(known_absent)
|
||||
|
||||
def _insert(self, key, value):
|
||||
self.cache[key] = DictionaryEntry(True, value)
|
||||
def _insert(self, key, value, known_absent):
|
||||
self.cache[key] = DictionaryEntry(True, known_absent, value)
|
||||
|
||||
@@ -28,7 +28,7 @@ class DictCacheTestCase(unittest.TestCase):
|
||||
key = "test_simple_cache_hit_full"
|
||||
|
||||
v = self.cache.get(key)
|
||||
self.assertEqual((False, {}), v)
|
||||
self.assertEqual((False, set(), {}), v)
|
||||
|
||||
seq = self.cache.sequence
|
||||
test_value = {"test": "test_simple_cache_hit_full"}
|
||||
|
||||
Reference in New Issue
Block a user