From dbc0dfd2d5f4b09b1151070d55e2826736324f38 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 May 2017 14:28:34 +0100 Subject: [PATCH 01/13] Remove unused options --- synapse/state.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 02fee47f39..536e2dc65f 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -239,16 +239,9 @@ class StateHandler(object): defer.returnValue(context) logger.debug("calling resolve_state_groups from compute_event_context") - if event.is_state(): - entry = yield self.resolve_state_groups( - event.room_id, [e for e, _ in event.prev_events], - event_type=event.type, - state_key=event.state_key, - ) - else: - entry = yield self.resolve_state_groups( - event.room_id, [e for e, _ in event.prev_events], - ) + entry = yield self.resolve_state_groups( + event.room_id, [e for e, _ in event.prev_events], + ) curr_state = entry.state @@ -284,7 +277,7 @@ class StateHandler(object): @defer.inlineCallbacks @log_function - def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""): + def resolve_state_groups(self, room_id, event_ids): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. From 2b03751c3cf0e64833afd36a1d393a5ce1d67e8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 May 2017 14:47:39 +0100 Subject: [PATCH 02/13] Don't return weird prev_group --- synapse/state.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 536e2dc65f..3f93f9e27f 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -195,12 +195,12 @@ class StateHandler(object): Returns: synapse.events.snapshot.EventContext: """ - context = EventContext() if event.internal_metadata.is_outlier(): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. + context = EventContext() if old_state: context.prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state @@ -219,6 +219,7 @@ class StateHandler(object): defer.returnValue(context) if old_state: + context = EventContext() context.prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } @@ -245,6 +246,7 @@ class StateHandler(object): curr_state = entry.state + context = EventContext() context.prev_state_ids = curr_state if event.is_state(): context.state_group = self.store.get_next_state_group() @@ -257,11 +259,14 @@ class StateHandler(object): context.current_state_ids = dict(context.prev_state_ids) context.current_state_ids[key] = event.event_id - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids - if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids[key] = event.event_id + if entry.state_group: + context.prev_group = entry.state_group + context.delta_ids = { + key: event.event_id + } + elif entry.prev_group: + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids else: if entry.state_group is None: entry.state_group = self.store.get_next_state_group() @@ -305,8 +310,8 @@ class StateHandler(object): defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, - prev_group=name, - delta_ids={}, + prev_group=None, + delta_ids=None, )) with (yield self.resolve_linearizer.queue(group_names)): From dfbda5e0250adfa762f5491c7efb2666866db034 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 May 2017 17:08:41 +0100 Subject: [PATCH 03/13] Faster cache for get_joined_hosts --- synapse/federation/transaction_queue.py | 2 + synapse/replication/slave/storage/events.py | 2 + synapse/state.py | 16 ++-- synapse/storage/roommember.py | 93 ++++++++++++++++----- synapse/storage/state.py | 33 ++++++++ 5 files changed, 117 insertions(+), 29 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a15198e05d..4c25ef1106 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -187,6 +187,8 @@ class TransactionQueue(object): prev_id for prev_id, _ in event.prev_events ], ) + destinations = set(destinations) + logger.info("destinations: %r", destinations) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index fcaf58b93b..6cd3a843df 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) + get_state_group_delta = DataStore.get_state_group_delta.__func__ + _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( diff --git a/synapse/state.py b/synapse/state.py index 3f93f9e27f..95dbe02e50 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -170,9 +170,7 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_user_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_users = yield self.store.get_joined_users_from_state( - room_id, entry.state_id, entry.state - ) + joined_users = yield self.store.get_joined_users_from_state(room_id, entry) defer.returnValue(joined_users) @defer.inlineCallbacks @@ -181,9 +179,9 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_hosts_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_hosts = yield self.store.get_joined_hosts( - room_id, entry.state_id, entry.state - ) + logger.info("State: %r", entry.state_group) + joined_hosts = yield self.store.get_joined_hosts(room_id, entry) + logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @defer.inlineCallbacks @@ -307,11 +305,13 @@ class StateHandler(object): if len(group_names) == 1: name, state_list = state_groups_ids.items().pop() + prev_group, delta_ids = yield self.store.get_state_group_delta(name) + defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, - prev_group=None, - delta_ids=None, + prev_group=prev_group, + delta_ids=delta_ids, )) with (yield self.resolve_linearizer.queue(group_names)): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0829ae5bee..8c4a5f9f2e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,6 +18,7 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore +from synapse.util.async import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.stringutils import to_ascii @@ -392,7 +393,8 @@ class RoomMemberStore(SQLBaseStore): context=context, ) - def get_joined_users_from_state(self, room_id, state_group, state_ids): + def get_joined_users_from_state(self, room_id, state_entry): + state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -401,7 +403,7 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_users_from_context( - room_id, state_group, state_ids, + room_id, state_group, state_entry.state, context=state_entry, ) @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, @@ -534,7 +536,8 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(False) - def get_joined_hosts(self, room_id, state_group, state_ids): + def get_joined_hosts(self, room_id, state_entry): + state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -543,33 +546,21 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_hosts( - room_id, state_group, state_ids + room_id, state_group, state_entry.state, state_entry=state_entry, ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) - def _get_joined_hosts(self, room_id, state_group, current_state_ids): + # @defer.inlineCallbacks + def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's # with a state_group of None are likely to be different. # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None - joined_hosts = set() - for etype, state_key in current_state_ids: - if etype == EventTypes.Member: - try: - host = get_domain_from_id(state_key) - except: - logger.warn("state_key not user_id: %s", state_key) - continue - - if host in joined_hosts: - continue - - event_id = current_state_ids[(etype, state_key)] - event = yield self.get_event(event_id, allow_none=True) - if event and event.content["membership"] == Membership.JOIN: - joined_hosts.add(intern_string(host)) + cache = self._get_joined_hosts_cache(room_id) + joined_hosts = yield cache.get_destinations(state_entry) + logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @@ -647,3 +638,63 @@ class RoomMemberStore(SQLBaseStore): yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME) defer.returnValue(result) + + @cached(max_entries=10000, iterable=True) + def _get_joined_hosts_cache(self, room_id): + return _JoinedHostsCache(self, room_id) + + +class _JoinedHostsCache(object): + def __init__(self, store, room_id): + self.store = store + self.room_id = room_id + + self.hosts_to_joined_users = {} + + self.state_group = object() + + self.linearizer = Linearizer("_JoinedHostsCache") + + self._len = 0 + + @defer.inlineCallbacks + def get_destinations(self, state_entry): + if state_entry.state_group == self.state_group: + defer.returnValue(frozenset(self.hosts_to_joined_users)) + + with (yield self.linearizer.queue(())): + if state_entry.state_group == self.state_group: + pass + elif state_entry.prev_group == self.state_group: + for (typ, state_key), event_id in state_entry.delta_ids.iteritems(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = self.hosts_to_joined_users.setdefault(host, set()) + + event = yield self.store.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + self.hosts_to_joined_users.pop(host, None) + else: + joined_users = yield self.store.get_joined_users_from_state( + self.room_id, state_entry, + ) + + self.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + self.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + self.state_group = state_entry.state_group + self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) + defer.returnValue(frozenset(self.hosts_to_joined_users)) + + def __len__(self): + return self._len diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a7c3d401d4..01474ff5ff 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -98,6 +98,39 @@ class StateStore(SQLBaseStore): _get_current_state_ids_txn, ) + def get_state_group_delta(self, state_group): + def _get_state_group_delta_txn(txn): + prev_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + }, + retcol="prev_state_group", + allow_none=True, + ) + + if not prev_group: + return None, None + + delta_ids = self._simple_select_list_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + }, + retcols=("type", "state_key", "event_id",) + ) + + return prev_group, { + (row["type"], row["state_key"]): row["event_id"] + for row in delta_ids + } + return self.runInteraction( + "get_state_group_delta", + _get_state_group_delta_txn, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: From 23da6383609f9b16e9ea6efd19096516621408be Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2017 10:02:04 +0100 Subject: [PATCH 04/13] Fix typing tests --- synapse/handlers/typing.py | 12 +++++------- tests/test_state.py | 2 ++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b7818af5c..82dedbbc99 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -89,7 +89,7 @@ class TypingHandler(object): until = self._member_typing_until.get(member, None) if not until or until <= now: logger.info("Timing out typing for: %s", member.user_id) - preserve_fn(self._stopped_typing)(member) + self._stopped_typing(member) continue # Check if we need to resend a keep alive over federation for this @@ -147,7 +147,7 @@ class TypingHandler(object): # No point sending another notification defer.returnValue(None) - yield self._push_update( + self._push_update( member=member, typing=True, ) @@ -171,7 +171,7 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=target_user_id) - yield self._stopped_typing(member) + self._stopped_typing(member) @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -180,7 +180,6 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=user_id) yield self._stopped_typing(member) - @defer.inlineCallbacks def _stopped_typing(self, member): if member.user_id not in self._room_typing.get(member.room_id, set()): # No point @@ -189,16 +188,15 @@ class TypingHandler(object): self._member_typing_until.pop(member, None) self._member_last_federation_poke.pop(member, None) - yield self._push_update( + self._push_update( member=member, typing=False, ) - @defer.inlineCallbacks def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - yield self._push_remote(member, typing) + preserve_fn(self._push_remote)(member, typing) self._push_update_local( member=member, diff --git a/tests/test_state.py b/tests/test_state.py index 6454f994e3..feb84f3d48 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -143,6 +143,7 @@ class StateTestCase(unittest.TestCase): "add_event_hashes", "get_events", "get_next_state_group", + "get_state_group_delta", ] ) hs = Mock(spec_set=[ @@ -154,6 +155,7 @@ class StateTestCase(unittest.TestCase): hs.get_auth.return_value = Auth(hs) self.store.get_next_state_group.side_effect = Mock + self.store.get_state_group_delta.return_value = (None, None) self.state = StateHandler(hs) self.event_id = 0 From 619e8ecd0c123602c20eb3a4ce45bdefb79d5900 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 26 May 2017 10:46:03 +0100 Subject: [PATCH 05/13] Handle None state group correctly --- synapse/state.py | 6 +++--- synapse/storage/roommember.py | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 95dbe02e50..5fbe0a0977 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -364,11 +364,11 @@ class StateHandler(object): prev_group = None delta_ids = None - for old_group, old_ids in state_groups_ids.items(): - if not set(new_state.iterkeys()) - set(old_ids.iterkeys()): + for old_group, old_ids in state_groups_ids.iteritems(): + if not set(new_state) - set(old_ids): n_delta_ids = { k: v - for k, v in new_state.items() + for k, v in new_state.iteritems() if old_ids.get(k) != v } if not delta_ids or len(n_delta_ids) < len(delta_ids): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8c4a5f9f2e..0e9e71f600 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -692,7 +692,10 @@ class _JoinedHostsCache(object): host = intern_string(get_domain_from_id(user_id)) self.hosts_to_joined_users.setdefault(host, set()).add(user_id) - self.state_group = state_entry.state_group + if state_entry.state_group: + self.state_group = state_entry.state_group + else: + self.state_group = object() self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) defer.returnValue(frozenset(self.hosts_to_joined_users)) From 6f83c4537cd6b0eeef2cfa735df085bf70228131 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 10:18:44 +0100 Subject: [PATCH 06/13] Increase size of IP cache --- synapse/storage/client_ips.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 747d2df622..014ab635b7 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -20,6 +20,8 @@ from twisted.internet import defer from ._base import Cache from . import background_updates +import os + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -28,12 +30,15 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, - max_entries=5000, + max_entries=50000 * CACHE_SIZE_FACTOR, ) super(ClientIpStore, self).__init__(hs) From 65f0513a3306d21aa5e6959b21642ba15dbdcad5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:02:38 +0100 Subject: [PATCH 07/13] Split up device_lists_outbound_pokes table for faster updates. --- synapse/storage/devices.py | 82 ++++++------------- .../schema/delta/42/device_list_last_id.sql | 33 ++++++++ 2 files changed, 57 insertions(+), 58 deletions(-) create mode 100644 synapse/storage/schema/delta/42/device_list_last_id.sql diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d9936c88bb..77b02c8a28 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,10 +37,6 @@ class DeviceStore(SQLBaseStore): max_entries=10000, ) - self._clock.looping_call( - self._prune_old_outbound_device_pokes, 60 * 60 * 1000 - ) - self.register_background_index_update( "device_lists_stream_idx", index_name="device_lists_stream_user_id", @@ -368,7 +364,7 @@ class DeviceStore(SQLBaseStore): prev_sent_id_sql = """ SELECT coalesce(max(stream_id), 0) as stream_id - FROM device_lists_outbound_pokes + FROM device_lists_outbound_last_success WHERE destination = ? AND user_id = ? AND stream_id <= ? """ @@ -510,32 +506,43 @@ class DeviceStore(SQLBaseStore): ) def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): - # First we DELETE all rows such that only the latest row for each - # (destination, user_id is left. We do this by selecting first and - # deleting. + # We update the device_lists_outbound_last_success with the successfully + # poked users. We do the join to see which users need to be inserted and + # which updated. sql = """ - SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? + SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + FROM device_lists_outbound_pokes as o + LEFT JOIN device_lists_outbound_last_success as s + USING (destination, user_id) + WHERE destination = ? AND o.stream_id <= ? GROUP BY user_id - HAVING count(*) > 1 """ txn.execute(sql, (destination, stream_id,)) rows = txn.fetchall() sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND user_id = ? AND stream_id < ? + UPDATE device_lists_outbound_last_success + SET stream_id = ? + WHERE destination = ? AND user_id = ? """ txn.executemany( - sql, ((destination, row[0], row[1],) for row in rows) + sql, ((row[1], destination, row[0],) for row in rows if row[2]) ) - # Mark everything that is left as sent sql = """ - UPDATE device_lists_outbound_pokes SET sent = ? + INSERT INTO device_lists_outbound_last_success + (destination, user_id, stream_id) VALUES (?, ?, ?) + """ + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows if not row[2]) + ) + + # Delete all sent outbound pokes + sql = """ + DELETE FROM device_lists_outbound_pokes WHERE destination = ? AND stream_id <= ? """ - txn.execute(sql, (True, destination, stream_id,)) + txn.execute(sql, (destination, stream_id,)) @defer.inlineCallbacks def get_user_whose_devices_changed(self, from_key): @@ -634,44 +641,3 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - - def _prune_old_outbound_device_pokes(self): - """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. We keep one entry per - (destination, user_id) tuple to ensure that the prev_ids remain correct - if the server does come back. - """ - yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 - - def _prune_txn(txn): - select_sql = """ - SELECT destination, user_id, max(stream_id) as stream_id - FROM device_lists_outbound_pokes - GROUP BY destination, user_id - HAVING min(ts) < ? AND count(*) > 1 - """ - - txn.execute(select_sql, (yesterday,)) - rows = txn.fetchall() - - if not rows: - return - - delete_sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? - """ - - txn.executemany( - delete_sql, - ( - (yesterday, row[0], row[1], row[2]) - for row in rows - ) - ) - - logger.info("Pruned %d device list outbound pokes", txn.rowcount) - - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn - ) diff --git a/synapse/storage/schema/delta/42/device_list_last_id.sql b/synapse/storage/schema/delta/42/device_list_last_id.sql new file mode 100644 index 0000000000..9ab8c14fa3 --- /dev/null +++ b/synapse/storage/schema/delta/42/device_list_last_id.sql @@ -0,0 +1,33 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Table of last stream_id that we sent to destination for user_id. This is +-- used to fill out the `prev_id` fields of outbound device list updates. +CREATE TABLE device_lists_outbound_last_success ( + destination TEXT NOT NULL, + user_id TEXT NOT NULL, + stream_id BIGINT NOT NULL +); + +INSERT INTO device_lists_outbound_last_success + SELECT destination, user_id, coalesce(max(stream_id), 0) as stream_id + FROM device_lists_outbound_pokes + WHERE sent = (1 = 1) -- sqlite doesn't have inbuilt boolean values + GROUP BY destination, user_id; + +CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success( + destination, user_id, stream_id +); From 6e2a7ee1bc7f576592573de2300a3a2e0f82e001 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:05:17 +0100 Subject: [PATCH 08/13] Remove spurious log lines --- synapse/federation/transaction_queue.py | 1 - synapse/state.py | 2 -- synapse/storage/roommember.py | 1 - 3 files changed, 4 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4c25ef1106..003eaba893 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -188,7 +188,6 @@ class TransactionQueue(object): ], ) destinations = set(destinations) - logger.info("destinations: %r", destinations) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server diff --git a/synapse/state.py b/synapse/state.py index 5fbe0a0977..d1b1a70a91 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -179,9 +179,7 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_hosts_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - logger.info("State: %r", entry.state_group) joined_hosts = yield self.store.get_joined_hosts(room_id, entry) - logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @defer.inlineCallbacks diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0e9e71f600..7155bfdc69 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -560,7 +560,6 @@ class RoomMemberStore(SQLBaseStore): cache = self._get_joined_hosts_cache(room_id) joined_hosts = yield cache.get_destinations(state_entry) - logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) From 6ba21bf2b8207e714d0d523379727bcf2c14684d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:08:36 +0100 Subject: [PATCH 09/13] Comments --- synapse/storage/roommember.py | 9 +++++++++ synapse/storage/state.py | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7155bfdc69..8656455f6e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -644,6 +644,10 @@ class RoomMemberStore(SQLBaseStore): class _JoinedHostsCache(object): + """Cache for joined hosts in a room that is optimised to handle updates + via state deltas. + """ + def __init__(self, store, room_id): self.store = store self.room_id = room_id @@ -658,6 +662,11 @@ class _JoinedHostsCache(object): @defer.inlineCallbacks def get_destinations(self, state_entry): + """Get set of destinations for a state entry + + Args: + state_entry(synapse.state._StateCacheEntry) + """ if state_entry.state_group == self.state_group: defer.returnValue(frozenset(self.hosts_to_joined_users)) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 01474ff5ff..c3eecbe824 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -99,6 +99,12 @@ class StateStore(SQLBaseStore): ) def get_state_group_delta(self, state_group): + """Given a state group try to return a previous group and a delta between + the old and the new. + + Returns: + (prev_group, delta_ids), where both may be None. + """ def _get_state_group_delta_txn(txn): prev_group = self._simple_select_one_onecol_txn( txn, From 1a81a1898e3ce1cae7e4a96183b6f404231bceab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:16:56 +0100 Subject: [PATCH 10/13] Keep pruning background task --- synapse/storage/devices.py | 45 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 77b02c8a28..83b1d2eeba 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,6 +37,10 @@ class DeviceStore(SQLBaseStore): max_entries=10000, ) + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) + self.register_background_index_update( "device_lists_stream_idx", index_name="device_lists_stream_user_id", @@ -641,3 +645,44 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + + def _prune_old_outbound_device_pokes(self): + """Delete old entries out of the device_lists_outbound_pokes to ensure + that we don't fill up due to dead servers. We keep one entry per + (destination, user_id) tuple to ensure that the prev_ids remain correct + if the server does come back. + """ + yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 + + def _prune_txn(txn): + select_sql = """ + SELECT destination, user_id, max(stream_id) as stream_id + FROM device_lists_outbound_pokes + GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 + """ + + txn.execute(select_sql, (yesterday,)) + rows = txn.fetchall() + + if not rows: + return + + delete_sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? + """ + + txn.executemany( + delete_sql, + ( + (yesterday, row[0], row[1], row[2]) + for row in rows + ) + ) + + logger.info("Pruned %d device list outbound pokes", txn.rowcount) + + return self.runInteraction( + "_prune_old_outbound_device_pokes", _prune_txn + ) From 64ed74c01efa035a2c9d95c97b1bf1b1f1c83ff6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:20:47 +0100 Subject: [PATCH 11/13] When pruning, delete from device_lists_outbound_last_success --- synapse/storage/devices.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 83b1d2eeba..bb27fd1f70 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -681,6 +681,14 @@ class DeviceStore(SQLBaseStore): ) ) + # Since we've deleted unsent deltas, we need to remove the entry + # of last successful sent so that the prev_ids are correctly set. + sql = """ + DELETE FROM device_lists_outbound_last_success + WHERE destination = ? AND user_id = ? + """ + txn.executemany(sql, ((row[0], row[1]) for row in rows)) + logger.info("Pruned %d device list outbound pokes", txn.rowcount) return self.runInteraction( From 2f34ad31ac1e5bdc1ddc38ee3f5149bcc4c6e663 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 11:50:44 +0100 Subject: [PATCH 12/13] Add some logging to user directory --- synapse/handlers/user_directory.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0182cf86d6..4e568de8c4 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -130,8 +130,15 @@ class UserDirectoyHandler(object): # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() + logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) + num_processed_rooms = 1 + for room_id in room_ids: + logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) yield self._handle_intial_room(room_id) + num_processed_rooms += 1 + + logger.info("Processed all rooms.") self.initially_handled_users = None From ecdd2a36583ef822cbdf0d9aba4a3b2007713cb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Jun 2017 12:02:53 +0100 Subject: [PATCH 13/13] Don't start user_directory handling on workers --- synapse/handlers/user_directory.py | 3 +++ synapse/notifier.py | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4e568de8c4..43eb1c78e9 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -46,6 +46,9 @@ class UserDirectoyHandler(object): self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + + self.notifier.add_replication_callback(self.notify_new_event) # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already diff --git a/synapse/notifier.py b/synapse/notifier.py index 6b1709d700..385208b574 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -167,7 +167,6 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() - self.user_directory_handler = hs.get_user_directory_handler() if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() @@ -255,8 +254,6 @@ class Notifier(object): room_stream_id ) - preserve_fn(self.user_directory_handler.notify_new_event)() - if self.federation_sender: preserve_fn(self.federation_sender.notify_new_events)( room_stream_id