From ba502fb89a4c57c57de81669bfaa5ef02b4af904 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 14 Jun 2017 02:23:06 +0100 Subject: [PATCH 01/11] add notes on running out of FDs --- README.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/README.rst b/README.rst index 35141ac71b..12f0c0c51a 100644 --- a/README.rst +++ b/README.rst @@ -528,6 +528,30 @@ fix try re-installing from PyPI or directly from # Install from github pip install --user https://github.com/pyca/pynacl/tarball/master +Running out of File Handles +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If synapse runs out of filehandles, it typically fails badly - live-locking +at 100% CPU, and/or failing to accept new TCP connections (blocking the +connecting client). Matrix currently can legitimately use a lot of file handles, +thanks to busy rooms like #matrix:matrix.org containing hundreds of participating +servers. The first time a server talks in a room it will try to connect +simultaneously to all participating servers, which could exhaust the available +file descriptors between DNS queries & HTTPS sockets, especially if DNS is slow +to respond. (We need to improve the routing algorithm used to be better than +full mesh, but as of June 2017 this hasn't happened yet). + +If you hit this failure mode, we recommend increasing the maximum number of +open file handles to be at least 4096 (assuming a default of 1024 or 256). +This is typically done by editing ``/etc/security/limits.conf`` + +Separately, Synapse may leak file handles if inbound HTTP requests get stuck +during processing - e.g. blocked behind a lock or talking to a remote server etc. +This is best diagnosed by matching up the 'Received request' and 'Processed request' +log lines and looking for any 'Processed request' lines which take more than +a few seconds to execute. Please let us know at #matrix-dev:matrix.org if +you see this failure mode so we can help debug it, however. + ArchLinux ~~~~~~~~~ From 617304b2cf72460006a7eda26dba9fef14642305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Jun 2017 19:37:17 +0100 Subject: [PATCH 02/11] Fix phone home stats --- synapse/app/homeserver.py | 49 ++++++--------------- synapse/storage/__init__.py | 26 ++++++----- synapse/storage/events.py | 88 ++++++++++++------------------------- 3 files changed, 56 insertions(+), 107 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3457402596..c9a2f148d7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,7 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d from synapse.server import HomeServer -from twisted.internet import reactor, task, defer +from twisted.internet import reactor, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File @@ -53,7 +53,7 @@ from synapse.api.urls import ( from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.metrics import register_memory_metrics, get_metrics_for +from synapse.metrics import register_memory_metrics from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer @@ -398,7 +398,8 @@ def run(hs): ThreadPool._worker = profile(ThreadPool._worker) reactor.run = profile(reactor.run) - start_time = hs.get_clock().time() + clock = hs.get_clock() + start_time = clock.time() stats = {} @@ -410,41 +411,14 @@ def run(hs): if uptime < 0: uptime = 0 - # If the stats directory is empty then this is the first time we've - # reported stats. - first_time = not stats - stats["homeserver"] = hs.config.server_name stats["timestamp"] = now stats["uptime_seconds"] = uptime stats["total_users"] = yield hs.get_datastore().count_all_users() - - room_count = yield hs.get_datastore().get_room_count() - stats["total_room_count"] = room_count - stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() - daily_messages = yield hs.get_datastore().count_daily_messages() - if daily_messages is not None: - stats["daily_messages"] = daily_messages - else: - stats.pop("daily_messages", None) - - if first_time: - # Add callbacks to report the synapse stats as metrics whenever - # prometheus requests them, typically every 30s. - # As some of the stats are expensive to calculate we only update - # them when synapse phones home to matrix.org every 24 hours. - metrics = get_metrics_for("synapse.usage") - metrics.add_callback("timestamp", lambda: stats["timestamp"]) - metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"]) - metrics.add_callback("total_users", lambda: stats["total_users"]) - metrics.add_callback("total_room_count", lambda: stats["total_room_count"]) - metrics.add_callback( - "daily_active_users", lambda: stats["daily_active_users"] - ) - metrics.add_callback( - "daily_messages", lambda: stats.get("daily_messages", 0) - ) + stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() + daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() + stats["daily_sent_messages"] = daily_sent_messages logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: @@ -456,9 +430,12 @@ def run(hs): logger.warn("Error reporting stats: %s", e) if hs.config.report_stats: - phone_home_task = task.LoopingCall(phone_stats_home) - logger.info("Scheduling stats reporting for 24 hour intervals") - phone_home_task.start(60 * 60 * 24, now=False) + logger.info("Scheduling stats reporting for 3 hour intervals") + clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) + + # We wait 5 minutes to send the first set of stats as the server can + # be quite busy the first few minutes + clock.call_later(5 * 60, phone_stats_home) def in_thread(): # Uncomment to enable tracing of log context changes. diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d604e7668f..2970df138b 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -231,7 +231,7 @@ class DataStore(RoomMemberStore, RoomStore, cur.close() self.find_stream_orderings_looping_call = self._clock.looping_call( - self._find_stream_orderings_for_times, 60 * 60 * 1000 + self._find_stream_orderings_for_times, 10 * 60 * 1000 ) self._stream_order_on_start = self.get_room_max_stream_ordering() @@ -272,17 +272,19 @@ class DataStore(RoomMemberStore, RoomStore, Counts the number of users who used this homeserver in the last 24 hours. """ def _count_users(txn): - txn.execute( - "SELECT COUNT(DISTINCT user_id) AS users" - " FROM user_ips" - " WHERE last_seen > ?", - # This is close enough to a day for our purposes. - (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),) - ) - rows = self.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24), + + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT user_id FROM user_ips + WHERE last_seen > ? + GROUP BY user_id + ) u + """ + + txn.execute(sql, (yesterday,)) + count, = txn.fetchone() + return count ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c4aeb48800..8e7ae73a7d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -38,7 +38,6 @@ from functools import wraps import synapse.metrics import logging -import math import ujson as json # these are only included to make the type annotations work @@ -1599,68 +1598,39 @@ class EventsStore(SQLBaseStore): call to this function, it will return None. """ def _count_messages(txn): - now = self.hs.get_clock().time() - - txn.execute( - "SELECT reported_stream_token, reported_time FROM stats_reporting" - ) - last_reported = self.cursor_to_dict(txn) - - txn.execute( - "SELECT stream_ordering" - " FROM events" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ) - now_reporting = self.cursor_to_dict(txn) - if not now_reporting: - logger.info("Calculating daily messages skipped; no now_reporting") - return None - now_reporting = now_reporting[0]["stream_ordering"] - - txn.execute("DELETE FROM stats_reporting") - txn.execute( - "INSERT INTO stats_reporting" - " (reported_stream_token, reported_time)" - " VALUES (?, ?)", - (now_reporting, now,) - ) - - if not last_reported: - logger.info("Calculating daily messages skipped; no last_reported") - return None - - # Close enough to correct for our purposes. - yesterday = (now - 24 * 60 * 60) - since_yesterday_seconds = yesterday - last_reported[0]["reported_time"] - any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60 - if any_since_yesterday: - logger.info( - "Calculating daily messages skipped; since_yesterday_seconds: %d" % - (since_yesterday_seconds,) - ) - return None - - txn.execute( - "SELECT COUNT(*) as messages" - " FROM events NATURAL JOIN event_json" - " WHERE json like '%m.room.message%'" - " AND stream_ordering > ?" - " AND stream_ordering <= ?", - ( - last_reported[0]["reported_stream_token"], - now_reporting, - ) - ) - rows = self.cursor_to_dict(txn) - if not rows: - logger.info("Calculating daily messages skipped; messages count missing") - return None - return rows[0]["messages"] + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count ret = yield self.runInteraction("count_messages", _count_messages) defer.returnValue(ret) + @defer.inlineCallbacks + def count_daily_sent_messages(self): + def _count_messages(txn): + # This is good enough as if you have silly characters in your own + # hostname then thats your own fault. + like_clause = "%:" + self.hs.hostname + + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND sender LIKE ? + AND stream_ordering > ? + """ + + txn.execute(sql, (like_clause, self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_daily_sent_messages", _count_messages) + defer.returnValue(ret) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] From 93e7a38370cfadb9dc65e18b16e7d76c05546e48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:30:54 +0100 Subject: [PATCH 03/11] Remove unhelpful test --- tests/storage/test_events.py | 115 ----------------------------------- 1 file changed, 115 deletions(-) delete mode 100644 tests/storage/test_events.py diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py deleted file mode 100644 index 14443b53bc..0000000000 --- a/tests/storage/test_events.py +++ /dev/null @@ -1,115 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from mock import Mock -from synapse.types import RoomID, UserID - -from tests import unittest -from twisted.internet import defer -from tests.storage.event_injector import EventInjector - -from tests.utils import setup_test_homeserver - - -class EventsStoreTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - self.hs = yield setup_test_homeserver( - resource_for_federation=Mock(), - http_client=None, - ) - self.store = self.hs.get_datastore() - self.db_pool = self.hs.get_db_pool() - self.message_handler = self.hs.get_handlers().message_handler - self.event_injector = EventInjector(self.hs) - - @defer.inlineCallbacks - def test_count_daily_messages(self): - yield self.db_pool.runQuery("DELETE FROM stats_reporting") - - self.hs.clock.now = 100 - - # Never reported before, and nothing which could be reported - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - count = yield self.db_pool.runQuery("SELECT COUNT(*) FROM stats_reporting") - self.assertEqual([(0,)], count) - - # Create something to report - room = RoomID.from_string("!abc123:test") - user = UserID.from_string("@raccoonlover:test") - yield self.event_injector.create_room(room, user) - - self.base_event = yield self._get_last_stream_token() - - yield self.event_injector.inject_message(room, user, "Raccoons are really cute") - - # Never reported before, something could be reported, but isn't because - # it isn't old enough. - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - yield self._assert_stats_reporting(1, self.hs.clock.now) - - # Already reported yesterday, two new events from today. - yield self.event_injector.inject_message(room, user, "Yeah they are!") - yield self.event_injector.inject_message(room, user, "Incredibly!") - self.hs.clock.now += 60 * 60 * 24 - count = yield self.store.count_daily_messages() - self.assertEqual(2, count) # 2 since yesterday - yield self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever - - # Last reported too recently. - yield self.event_injector.inject_message(room, user, "Who could disagree?") - self.hs.clock.now += 60 * 60 * 22 - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - yield self._assert_stats_reporting(4, self.hs.clock.now) - - # Last reported too long ago - yield self.event_injector.inject_message(room, user, "No one.") - self.hs.clock.now += 60 * 60 * 26 - count = yield self.store.count_daily_messages() - self.assertIsNone(count) - yield self._assert_stats_reporting(5, self.hs.clock.now) - - # And now let's actually report something - yield self.event_injector.inject_message(room, user, "Indeed.") - yield self.event_injector.inject_message(room, user, "Indeed.") - yield self.event_injector.inject_message(room, user, "Indeed.") - # A little over 24 hours is fine :) - self.hs.clock.now += (60 * 60 * 24) + 50 - count = yield self.store.count_daily_messages() - self.assertEqual(3, count) - yield self._assert_stats_reporting(8, self.hs.clock.now) - - @defer.inlineCallbacks - def _get_last_stream_token(self): - rows = yield self.db_pool.runQuery( - "SELECT stream_ordering" - " FROM events" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ) - if not rows: - defer.returnValue(0) - else: - defer.returnValue(rows[0][0]) - - @defer.inlineCallbacks - def _assert_stats_reporting(self, messages, time): - rows = yield self.db_pool.runQuery( - "SELECT reported_stream_token, reported_time FROM stats_reporting" - ) - self.assertEqual([(self.base_event + messages, time,)], rows) From 4b461a69311fc96ce13ed75ebe388fd6718286d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:39:39 +0100 Subject: [PATCH 04/11] Add some more stats --- synapse/app/homeserver.py | 7 +++++++ synapse/storage/events.py | 15 +++++++++++++++ synapse/storage/registration.py | 13 +++++++++++++ 3 files changed, 35 insertions(+) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index c9a2f148d7..6af8259be0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -415,8 +415,15 @@ def run(hs): stats["timestamp"] = now stats["uptime_seconds"] = uptime stats["total_users"] = yield hs.get_datastore().count_all_users() + stats["total_users"] = yield hs.get_datastore().count_nonbridged_users() + + room_count = yield hs.get_datastore().get_room_count() + stats["total_room_count"] = room_count + stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() + stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms() stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() + daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() stats["daily_sent_messages"] = daily_sent_messages diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 8e7ae73a7d..f29d71589d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1631,6 +1631,21 @@ class EventsStore(SQLBaseStore): ret = yield self.runInteraction("count_daily_sent_messages", _count_messages) defer.returnValue(ret) + @defer.inlineCallbacks + def count_daily_active_rooms(self): + def _count(txn): + sql = """ + SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_daily_active_rooms", _count) + defer.returnValue(ret) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index ec2c52ab93..20acd58fcf 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -437,6 +437,19 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + @defer.inlineCallbacks + def count_nonbridged_users(self): + def _count_users(txn): + txn.execute(""" + SELECT COALESCE(COUNT(*), 0) FROM users + WHERE appservice_id IS NULL + """) + count, = txn.fetchone() + return count + + ret = yield self.runInteraction("count_users", _count_users) + defer.returnValue(ret) + @defer.inlineCallbacks def find_next_generated_user_id_localpart(self): """ From ebcd55d641345467d583836ce1f06ffe5d36f98d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:45:48 +0100 Subject: [PATCH 05/11] Add DB schema for tracking users who share rooms --- synapse/storage/prepare_database.py | 2 +- .../storage/schema/delta/43/user_share.sql | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/43/user_share.sql diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index eaba699e29..72b670b83b 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 42 +SCHEMA_VERSION = 43 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql new file mode 100644 index 0000000000..f552b6eb7b --- /dev/null +++ b/synapse/storage/schema/delta/43/user_share.sql @@ -0,0 +1,32 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Table keeping track of who shares a room with who. We only keep track +-- of this for local users, so `user_id` is local users only (but we do keep track +-- of which remote users share a room) +CREATE TABLE users_who_share_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + share_private BOOLEAN NOT NULL -- is the shared room private? i.e. they share a private room +); + + +CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id); +CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id); + + +-- Make sure that we popualte the table initially +UPDATE user_directory_stream_pos SET stream_id = NULL; From 72613bc3798d34a7bf93defd6624b84669078e2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 09:59:04 +0100 Subject: [PATCH 06/11] Implement initial population of users who share rooms table --- synapse/handlers/user_directory.py | 78 +++++++++++++++++- synapse/storage/user_directory.py | 124 +++++++++++++++++++++++++++-- 2 files changed, 193 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index f4451e5dfb..581c078bb2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,12 +14,12 @@ # limitations under the License. import logging - from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure +from synapse.util.async import sleep logger = logging.getLogger(__name__) @@ -41,12 +41,15 @@ class UserDirectoyHandler(object): one public room. """ + INITIAL_SLEEP_MS = 50 + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.notifier.add_replication_callback(self.notify_new_event) @@ -55,6 +58,9 @@ class UserDirectoyHandler(object): self.initially_handled_users = set() self.initially_handled_users_in_public = set() + self.initially_handled_users_share = set() + self.initially_handled_users_share_private_room = set() + # The current position in the current_state_delta stream self.pos = None @@ -140,10 +146,14 @@ class UserDirectoyHandler(object): logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) yield self._handle_intial_room(room_id) num_processed_rooms += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) logger.info("Processed all rooms.") self.initially_handled_users = None + self.initially_handled_users_in_public = None + self.initially_handled_users_share = None + self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @@ -158,7 +168,8 @@ class UserDirectoyHandler(object): is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users + user_ids = set(users_with_profile) + unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( room_id, { @@ -175,6 +186,69 @@ class UserDirectoyHandler(object): ) self.initially_handled_users_in_public != unhandled_users + # We now go and figure out the new users who share rooms with user entries + # We sleep aggressively here as otherwise it can starve resources. + # We also batch up inserts/updates, but try to avoid too many at once. + to_insert = set() + to_update = set() + count = 0 + for user_id in user_ids: + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + if not self.is_mine_id(user_id): + count += 1 + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % 100 == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + count += 1 + + user_set = (user_id, other_user_id) + + if user_set in self.initially_handled_users_share_private_room: + continue + + if user_set in self.initially_handled_users_share: + if is_public: + continue + to_update.add(user_set) + else: + to_insert.add(user_set) + + if is_public: + self.initially_handled_users_share.add(user_set) + else: + self.initially_handled_users_share_private_room.add(user_set) + + if len(to_insert) > 100: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if len(to_update) > 100: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 137aca2881..0123e28f99 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -16,16 +16,19 @@ from twisted.internet import defer from ._base import SQLBaseStore + from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id import re +import logging + +logger = logging.getLogger(__name__) class UserDirectoryStore(SQLBaseStore): - @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): """Check if the room is either world_readable or publically joinable @@ -281,14 +284,118 @@ class UserDirectoryStore(SQLBaseStore): desc="get_users_in_dir_due_to_room", ) + @defer.inlineCallbacks def get_all_rooms(self): - """Get all room_ids we've ever known about + """Get all room_ids we've ever known about, in ascending order of "size" """ - return self._simple_select_onecol( - table="current_state_events", - keyvalues={}, - retcol="DISTINCT room_id", - desc="get_all_rooms", + sql = """ + SELECT room_id FROM current_state_events + GROUP BY room_id + ORDER BY count(*) ASC + """ + rows = yield self._execute("get_all_rooms", None, sql) + defer.returnValue([room_id for room_id, in rows]) + + def add_users_who_share_room(self, room_id, share_private, user_id_tuples): + """Insert entries into the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _add_users_who_share_room_txn(txn): + self._simple_insert_many_txn( + txn, + table="users_who_share_rooms", + values=[ + { + "user_id": user_id, + "other_user_id": other_user_id, + "room_id": room_id, + "share_private": share_private, + } + for user_id, other_user_id in user_id_tuples + ], + ) + for user_id, other_user_id in user_id_tuples: + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + return self.runInteraction( + "add_users_who_share_room", _add_users_who_share_room_txn + ) + + def update_users_who_share_room(self, room_id, share_private, user_id_sets): + """Updates entries in the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _update_users_who_share_room_txn(txn): + sql = """ + UPDATE users_who_share_rooms + SET room_id = ?, share_private = ? + WHERE user_id = ? AND other_user_id = ? + """ + txn.executemany( + sql, + ( + (room_id, share_private, uid, oid) + for uid, oid in user_id_sets + ) + ) + for user_id, other_user_id in user_id_sets: + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + return self.runInteraction( + "update_users_who_share_room", _update_users_who_share_room_txn + ) + + def remove_user_who_share_room(self, user_id, other_user_id): + """Deletes entries in the users_who_share_rooms table. The first + user should be a local user. + + Args: + room_id (str) + share_private (bool): Is the room private + user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + """ + def _remove_user_who_share_room_txn(txn): + self._simple_delete_txn( + txn, + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + "other_user_id": other_user_id, + }, + ) + txn.call_after( + self.get_users_who_share_room_from_dir.invalidate, + (user_id,), + ) + txn.call_after( + self.get_if_users_share_a_room.invalidate, + (user_id, other_user_id), + ) + + return self.runInteraction( + "remove_user_who_share_room", _remove_user_who_share_room_txn ) def delete_all_from_user_dir(self): @@ -298,8 +405,11 @@ class UserDirectoryStore(SQLBaseStore): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM users_in_pubic_room") + txn.execute("DELETE FROM users_who_share_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) txn.call_after(self.get_user_in_public_room.invalidate_all) + txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) + txn.call_after(self.get_if_users_share_a_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn ) From 4564b05483c8568b5435cfc527a73a6a7696fa61 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 10:15:00 +0100 Subject: [PATCH 07/11] Implement updating users who share rooms on the fly --- synapse/handlers/user_directory.py | 148 ++++++++++++++++++++++++----- synapse/storage/user_directory.py | 111 +++++++++++++++++++++- 2 files changed, 235 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 581c078bb2..aa8af95177 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -390,12 +390,77 @@ class UserDirectoyHandler(object): room_id ) - if not is_public: - return + if is_public: + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) + # Now we update users who share rooms with users. We do this by getting + # all the current users in the room and seeing which aren't already + # marked in the database as sharing with `user_id` + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + to_insert = set() + to_update = set() + + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id): + # Returns a map of other_user_id -> shared_private. We only need + # to update mappings if for users that either don't share a room + # already (aren't in the map) or, if the room is private, those that + # only share a public room. + user_ids_shared = yield self.store.get_users_who_share_room_from_dir( + user_id + ) + + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + shared_is_private = user_ids_shared.get(other_user_id) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((user_id, other_user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((user_id, other_user_id)) + + # Next we need to update for every local user in the room + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + if self.is_mine_id(other_user_id): + shared_is_private = yield self.store.get_if_users_share_a_room( + other_user_id, user_id, + ) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((other_user_id, user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((other_user_id, user_id)) + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -413,32 +478,29 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if not update_user_in_public and not update_user_dir: - return + if (update_user_in_public or update_user_dir): + # XXX: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + if (not update_user_in_public and not update_user_dir): + break - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break + is_in_room = yield self.store.is_host_joined( + j_room_id, self.server_name, + ) - is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name, - ) + if not is_in_room: + continue - if not is_in_room: - continue + if update_user_dir: + update_user_dir = False + yield self.store.update_user_in_user_dir(user_id, j_room_id) - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) - - if update_user_in_public: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( j_room_id ) - if is_public: + if update_user_in_public and is_public: yield self.store.update_user_in_public_user_list(user_id, j_room_id) update_user_in_public = False @@ -447,6 +509,46 @@ class UserDirectoyHandler(object): elif update_user_in_public: yield self.store.remove_from_user_in_public_room(user_id) + # Now handle users_who_share_rooms. + + # Get a list of user tuples that were in the DB due to this room and + # users (this includes tuples where the other user matches `user_id`) + user_tuples = yield self.store.get_users_in_share_dir_with_room_id( + user_id, room_id, + ) + + for user_id, other_user_id in user_tuples: + # For each user tuple get a list of rooms that they still share, + # trying to find a private room, and update the entry in the DB + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) + + # If they dont share a room anymore, remove the mapping + if not rooms: + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) + continue + + found_public_share = None + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + found_public_share = j_room_id + else: + found_public_share = None + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + break + + if found_public_share: + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + @defer.inlineCallbacks def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): """Check member event changes for any profile changes and update the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 0123e28f99..2a17cbc9e9 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -273,17 +273,38 @@ class UserDirectoryStore(SQLBaseStore): desc="get_users_in_public_due_to_room", ) + @defer.inlineCallbacks def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id """ - return self._simple_select_onecol( + user_ids_dir = yield self._simple_select_onecol( table="user_directory", keyvalues={"room_id": room_id}, retcol="user_id", desc="get_users_in_dir_due_to_room", ) + user_ids_pub = yield self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids_share = yield self._simple_select_onecol( + table="users_who_share_rooms", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids = set(user_ids_dir) + user_ids.update(user_ids_pub) + user_ids.update(user_ids_share) + + defer.returnValue(user_ids) + @defer.inlineCallbacks def get_all_rooms(self): """Get all room_ids we've ever known about, in ascending order of "size" @@ -398,6 +419,94 @@ class UserDirectoryStore(SQLBaseStore): "remove_user_who_share_room", _remove_user_who_share_room_txn ) + @cached(max_entries=500000) + def get_if_users_share_a_room(self, user_id, other_user_id): + """Gets if users share a room. + + Args: + user_id (str): Must be a local user_id + other_user_id (str) + + Returns: + bool|None: None if they don't share a room, otherwise whether they + share a private room or not. + """ + return self._simple_select_one_onecol( + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + "other_user_id": other_user_id, + }, + retcol="share_private", + allow_none=True, + ) + + @cachedInlineCallbacks(max_entries=500000, iterable=True) + def get_users_who_share_room_from_dir(self, user_id): + """Returns the set of users who share a room with `user_id` + + Args: + user_id(str): Must be a local user + + Returns: + dict: user_id -> share_private mapping + """ + rows = yield self._simple_select_list( + table="users_who_share_rooms", + keyvalues={ + "user_id": user_id, + }, + retcols=("other_user_id", "share_private",), + desc="get_users_who_share_room_with_user", + ) + + defer.returnValue({ + row["other_user_id"]: row["share_private"] + for row in rows + }) + + def get_users_in_share_dir_with_room_id(self, user_id, room_id): + """Get all user tuples that are in the users_who_share_rooms due to the + given room_id. + + Returns: + [(user_id, other_user_id)]: where one of the two will match the given + user_id. + """ + sql = """ + SELECT user_id, other_user_id FROM users_who_share_rooms + WHERE room_id = ? AND (user_id = ? OR other_user_id = ?) + """ + return self._execute( + "get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id + ) + + @defer.inlineCallbacks + def get_rooms_in_common_for_users(self, user_id, other_user_id): + """Given two user_ids find out the list of rooms they share. + """ + sql = """ + SELECT room_id FROM ( + SELECT c.room_id FROM current_state_events AS c + INNER JOIN room_memberships USING (event_id) + WHERE type = 'm.room.member' + AND membership = 'join' + AND state_key = ? + ) AS f1 INNER JOIN ( + SELECT c.room_id FROM current_state_events AS c + INNER JOIN room_memberships USING (event_id) + WHERE type = 'm.room.member' + AND membership = 'join' + AND state_key = ? + ) f2 USING (room_id) + """ + + rows = yield self._execute( + "get_rooms_in_common_for_users", None, sql, user_id, other_user_id + ) + + defer.returnValue([room_id for room_id, in rows]) + def delete_all_from_user_dir(self): """Delete the entire user directory """ From a9d6fa8b2b31096b8f9fdb01b8fb5a2c6386e61f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 10:00:28 +0100 Subject: [PATCH 08/11] Include users who share room with requester in user directory --- synapse/handlers/user_directory.py | 4 +- .../rest/client/v2_alpha/user_directory.py | 8 ++- synapse/storage/user_directory.py | 61 ++++++++++++------- 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index aa8af95177..8928786fd6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -71,7 +71,7 @@ class UserDirectoyHandler(object): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) - def search_users(self, search_term, limit): + def search_users(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -88,7 +88,7 @@ class UserDirectoyHandler(object): ] } """ - return self.store.search_user_dir(search_term, limit) + return self.store.search_user_dir(user_id, search_term, limit) @defer.inlineCallbacks def notify_new_event(self): diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index 17d3dffc8f..6e012da4aa 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -55,7 +55,9 @@ class UserDirectorySearchRestServlet(RestServlet): ] } """ - yield self.auth.get_user_by_req(request, allow_guest=False) + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) limit = body.get("limit", 10) @@ -66,7 +68,9 @@ class UserDirectorySearchRestServlet(RestServlet): except: raise SynapseError(400, "`search_term` is required field") - results = yield self.user_directory_handler.search_users(search_term, limit) + results = yield self.user_directory_handler.search_users( + user_id, search_term, limit, + ) defer.returnValue((200, results)) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 2a17cbc9e9..52b184fe78 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -611,7 +611,7 @@ class UserDirectoryStore(SQLBaseStore): ) @defer.inlineCallbacks - def search_user_dir(self, search_term, limit): + def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -637,46 +637,63 @@ class UserDirectoryStore(SQLBaseStore): # The array of numbers are the weights for the various part of the # search: (domain, _, display name, localpart) sql = """ - SELECT user_id, display_name, avatar_url + SELECT d.user_id, display_name, avatar_url FROM user_directory_search - INNER JOIN user_directory USING (user_id) - INNER JOIN users_in_pubic_room USING (user_id) - WHERE vector @@ to_tsquery('english', ?) + INNER JOIN user_directory AS d USING (user_id) + LEFT JOIN users_in_pubic_room AS p USING (user_id) + LEFT JOIN ( + SELECT other_user_id AS user_id FROM users_who_share_rooms + WHERE user_id = ? AND share_private + ) AS s USING (user_id) + WHERE + (s.user_id IS NOT NULL OR p.user_id IS NOT NULL) + AND vector @@ to_tsquery('english', ?) ORDER BY - 2 * ts_rank_cd( - '{0.1, 0.1, 0.9, 1.0}', - vector, - to_tsquery('english', ?), - 8 - ) - + ts_rank_cd( - '{0.1, 0.1, 0.9, 1.0}', - vector, - to_tsquery('english', ?), - 8 + (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) + * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END) + * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END) + * ( + 3 * ts_rank_cd( + '{0.1, 0.1, 0.9, 1.0}', + vector, + to_tsquery('english', ?), + 8 + ) + + ts_rank_cd( + '{0.1, 0.1, 0.9, 1.0}', + vector, + to_tsquery('english', ?), + 8 + ) ) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? """ - args = (full_query, exact_query, prefix_query, limit + 1,) + args = (user_id, full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) sql = """ - SELECT user_id, display_name, avatar_url + SELECT d.user_id, display_name, avatar_url FROM user_directory_search - INNER JOIN user_directory USING (user_id) - INNER JOIN users_in_pubic_room USING (user_id) - WHERE value MATCH ? + INNER JOIN user_directory AS d USING (user_id) + LEFT JOIN users_in_pubic_room AS p USING (user_id) + LEFT JOIN ( + SELECT other_user_id AS user_id FROM users_who_share_rooms + WHERE user_id = ? AND share_private + ) AS s USING (user_id) + WHERE + (s.user_id IS NOT NULL OR p.user_id IS NOT NULL) + AND value MATCH ? ORDER BY rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? """ - args = (search_query, limit + 1) + args = (user_id, search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") From 5ddd199870e4ecaa60541779f8aa64bce4ca95cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 10:49:10 +0100 Subject: [PATCH 09/11] Typo --- synapse/app/homeserver.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6af8259be0..081e7cce59 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -415,7 +415,9 @@ def run(hs): stats["timestamp"] = now stats["uptime_seconds"] = uptime stats["total_users"] = yield hs.get_datastore().count_all_users() - stats["total_users"] = yield hs.get_datastore().count_nonbridged_users() + + total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() + stats["total_nonbridged_users"] = total_nonbridged_users room_count = yield hs.get_datastore().get_room_count() stats["total_room_count"] = room_count From 1ff419d34317c3141f378750d051def05ce9341c Mon Sep 17 00:00:00 2001 From: Krombel Date: Fri, 16 Jun 2017 11:17:10 +0200 Subject: [PATCH 10/11] allow Authorization header which handling got implemented in #1098 Signed-off-by: Matthias Kesler --- synapse/http/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 14715878c5..7ef3d526b1 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -412,7 +412,7 @@ def set_cors_headers(request): ) request.setHeader( "Access-Control-Allow-Headers", - "Origin, X-Requested-With, Content-Type, Accept" + "Origin, X-Requested-With, Content-Type, Accept, Authorization" ) From 6aa5bc86351a617546f0adacfebab3388716be3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 12:47:05 +0100 Subject: [PATCH 11/11] Initial worker impl --- synapse/app/federation_sender.py | 2 +- synapse/app/user_dir.py | 270 +++++++++++++++++++++++++++++ synapse/config/server.py | 4 + synapse/handlers/user_directory.py | 19 +- synapse/replication/tcp/streams.py | 22 +++ synapse/storage/events.py | 18 ++ 6 files changed, 328 insertions(+), 7 deletions(-) create mode 100644 synapse/app/user_dir.py diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index e51a69074d..03327dc47a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,7 @@ import sys import logging import gc -logger = logging.getLogger("synapse.app.appservice") +logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py new file mode 100644 index 0000000000..9d8edaa8e3 --- /dev/null +++ b/synapse/app/user_dir.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha import user_directory +from synapse.storage.engines import create_engine +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from synapse import events + +from twisted.internet import reactor +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.user_dir") + + +class UserDirectorySlaveStore( + SlavedEventStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + UserDirectoryStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStore because the constructor is different +): + def __init__(self, db_conn, hs): + super(UserDirectorySlaveStore, self).__init__(db_conn, hs) + + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + + self._current_state_delta_pos = events_max + + def stream_positions(self): + result = super(UserDirectorySlaveStore, self).stream_positions() + result["current_state_deltas"] = self._current_state_delta_pos + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "current_state_deltas": + self._current_state_delta_pos = token + for row in rows: + self._curr_state_delta_stream_cache.entity_has_changed( + row.room_id, token + ) + return super(UserDirectorySlaveStore, self).process_replication_rows( + stream_name, token, rows + ) + + +class UserDirectoryServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + user_directory.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse user_dir now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return UserDirectoryReplicationHandler(self) + + +class UserDirectoryReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) + self.user_directory = hs.get_user_directory_handler() + + def on_rdata(self, stream_name, token, rows): + super(UserDirectoryReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + if stream_name == "current_state_deltas": + preserve_fn(self.user_directory.notify_new_event)() + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse user directory", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.user_dir" + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + if config.update_user_directory: + sys.stderr.write( + "\nThe update_user_directory must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``update_user_directory: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.update_user_directory = True + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ps = UserDirectoryServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-user-dir", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/server.py b/synapse/config/server.py index 3910b9dc31..28b4e5f50c 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -35,6 +35,10 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + # Whether to update the user directory or not. This should be set to + # false only if we are updating the user directory in a worker + self.update_user_directory = config.get("update_user_directory", True) + self.filter_timeline_limit = config.get("filter_timeline_limit", -1) if self.public_baseurl is not None: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8928786fd6..d33a20a1f2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,8 +50,7 @@ class UserDirectoyHandler(object): self.clock = hs.get_clock() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - - self.notifier.add_replication_callback(self.notify_new_event) + self.update_user_directory = hs.config.update_user_directory # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already @@ -67,9 +66,12 @@ class UserDirectoyHandler(object): # Guard to ensure we only process deltas one at a time self._is_processing = False - # We kick this off so that we don't have to wait for a change before - # we start populating the user directory - self.clock.call_later(0, self.notify_new_event) + if self.update_user_directory: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory + self.clock.call_later(0, self.notify_new_event) def search_users(self, user_id, search_term, limit): """Searches for users in directory @@ -94,6 +96,9 @@ class UserDirectoyHandler(object): def notify_new_event(self): """Called when there may be more deltas to process """ + if not self.update_user_directory: + return + if self._is_processing: return @@ -324,7 +329,7 @@ class UserDirectoyHandler(object): event_id (str|None): The new event after the state change typ (str): Type of the event """ - logger.debug("Handling change for %s", typ) + logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( @@ -394,6 +399,8 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) if not row: yield self.store.add_users_to_public_room(room_id, [user_id]) + else: + logger.debug("Not adding user to public dir, %r", user_id) # Now we update users who share rooms with users. We do this by getting # all the current users in the room and seeing which aren't already diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 369d5f2428..fbafe12cc2 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) +CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( + "room_id", # str + "type", # str + "state_key", # str + "event_id", # str, optional +)) class Stream(object): @@ -443,6 +449,21 @@ class AccountDataStream(Stream): defer.returnValue(results) +class CurrentStateDeltaStream(Stream): + """Current state for a room was changed + """ + NAME = "current_state_deltas" + ROW_TYPE = CurrentStateDeltaStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_current_state_delta_stream_id + self.update_function = store.get_all_updated_current_state_deltas + + super(CurrentStateDeltaStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -460,5 +481,6 @@ STREAMS_MAP = { FederationStream, TagAccountDataStream, AccountDataStream, + CurrentStateDeltaStream, ) } diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72ce84b0b8..90041b0da4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2284,6 +2284,24 @@ class EventsStore(SQLBaseStore): defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) + def get_max_current_state_delta_stream_id(self): + return self._stream_id_gen.get_current_token() + + def get_all_updated_current_state_deltas(self, from_token, to_token, limit): + def get_all_updated_current_state_deltas_txn(txn): + sql = """ + SELECT stream_id, room_id, type, state_key, event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_current_state_deltas", + get_all_updated_current_state_deltas_txn, + ) + AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events",