From fcf01dd88e738e5745c17c7740e0d72b8bbbcb66 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Jun 2017 11:33:40 +0100 Subject: [PATCH 01/11] Reject local events that don't round trip the DB --- synapse/handlers/message.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a04f634c5c..5585ee85cf 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json import logging import random +import ujson logger = logging.getLogger(__name__) @@ -498,6 +499,14 @@ class MessageHandler(BaseHandler): logger.warn("Denying new event %r because %s", event, err) raise err + # Ensure that we can round trip before trying to persist in db + try: + dump = ujson.dumps(event.content) + load = ujson.loads(dump) + except: + logger.exception("Failed to encode content: %r", event.content) + raise + yield self.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: From 1591eddaea779d2805afb1acc102630a8e580f0d Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Fri, 23 Jun 2017 13:01:04 +0200 Subject: [PATCH 02/11] Add configuration parameter to allow redaction of content from push messages for google/apple devices --- synapse/config/homeserver.py | 3 ++- synapse/config/pushconfig.py | 40 ++++++++++++++++++++++++++++++++++++ synapse/push/httppusher.py | 2 +- 3 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 synapse/config/pushconfig.py diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 0f890fc04a..327c7e4fc6 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -33,6 +33,7 @@ from .jwt import JWTConfig from .password_auth_providers import PasswordAuthProviderConfig from .emailconfig import EmailConfig from .workers import WorkerConfig +from .pushconfig import PushConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, @@ -40,7 +41,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig,): + WorkerConfig, PasswordAuthProviderConfig, PushConfig,): pass diff --git a/synapse/config/pushconfig.py b/synapse/config/pushconfig.py new file mode 100644 index 0000000000..afe8aba0a8 --- /dev/null +++ b/synapse/config/pushconfig.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file can't be called email.py because if it is, we cannot: +import email.utils + +from ._base import Config + + +class PushConfig(Config): + def read_config(self, config): + self.push_redact_content = False + + push_config = config.get("email", {}) + self.push_redact_content = push_config.get("redact_content", False) + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Control how push messages are sent to google/apple to notifications. + # Normally every message is posted to a push server hosted by matrix.org + # which is registered with google and apple in order to allow push + # notifications to be sent to mobile devices. + # Setting redact_content to true will make the push messages contain no + # message content which will provide increased privacy. + # + #push: + # redact_content: false + """ diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index c0f8176e3d..f3f872895a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -275,7 +275,7 @@ class HttpPusher(object): if event.type == 'm.room.member': d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if 'content' in event: + if not hs.config.push_redact_content and 'content' in event: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human From 75eba3b07d58743f24068efef67d5755f58207ba Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Fri, 23 Jun 2017 15:15:18 +0200 Subject: [PATCH 03/11] Fix TravisCI tests for PR #2301 --- synapse/config/pushconfig.py => push.py | 3 --- synapse/config/homeserver.py | 2 +- synapse/push/httppusher.py | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) rename synapse/config/pushconfig.py => push.py (94%) diff --git a/synapse/config/pushconfig.py b/push.py similarity index 94% rename from synapse/config/pushconfig.py rename to push.py index afe8aba0a8..4e5f428820 100644 --- a/synapse/config/pushconfig.py +++ b/push.py @@ -13,9 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# This file can't be called email.py because if it is, we cannot: -import email.utils - from ._base import Config diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 327c7e4fc6..b22cacf8dc 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -33,7 +33,7 @@ from .jwt import JWTConfig from .password_auth_providers import PasswordAuthProviderConfig from .emailconfig import EmailConfig from .workers import WorkerConfig -from .pushconfig import PushConfig +from .push import PushConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index f3f872895a..8a5d473108 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -275,7 +275,7 @@ class HttpPusher(object): if event.type == 'm.room.member': d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if not hs.config.push_redact_content and 'content' in event: + if not self.hs.config.push_redact_content and 'content' in event: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human From bce144595ceb7cf5b02efd79ae67e23c0df9fa27 Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Fri, 23 Jun 2017 15:26:09 +0200 Subject: [PATCH 04/11] Fix TravisCI tests for PR #2301 - Fat finger mistake --- push.py => synapse/config/push.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename push.py => synapse/config/push.py (100%) diff --git a/push.py b/synapse/config/push.py similarity index 100% rename from push.py rename to synapse/config/push.py From 27bd0b9a91607beb3965c48edcab10e5f6924879 Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Sat, 24 Jun 2017 10:32:12 +0200 Subject: [PATCH 05/11] Change the config file generator to more descriptive explanation of push.redact_content --- synapse/config/push.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/synapse/config/push.py b/synapse/config/push.py index 4e5f428820..9c68318b40 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -26,11 +26,19 @@ class PushConfig(Config): def default_config(self, config_dir_path, server_name, **kwargs): return """ # Control how push messages are sent to google/apple to notifications. - # Normally every message is posted to a push server hosted by matrix.org + # Normally every message said in a room with one or more people using + # mobile devices will be posted to a push server hosted by matrix.org # which is registered with google and apple in order to allow push - # notifications to be sent to mobile devices. + # notifications to be sent to these mobile devices. + # # Setting redact_content to true will make the push messages contain no - # message content which will provide increased privacy. + # message content which will provide increased privacy. This is a + # temporary solution pending improvements to Android and iPhone apps + # to get content from the app rather than the notification. + # + # For modern android devices the notification content will still appear + # because it is loaded by the app. iPhone, however will send a + # notification saying only that a message arrived and who it came from. # #push: # redact_content: false From 8abdd7b553868613f246bbd02b9fc5de03050a81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jun 2017 14:01:30 +0100 Subject: [PATCH 06/11] Fix up indices for users_who_share_rooms --- synapse/storage/schema/delta/43/user_share.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql index f552b6eb7b..4501d90cbb 100644 --- a/synapse/storage/schema/delta/43/user_share.sql +++ b/synapse/storage/schema/delta/43/user_share.sql @@ -25,7 +25,8 @@ CREATE TABLE users_who_share_rooms ( CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id); -CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id); +CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id); +CREATE INDEX users_who_share_rooms_o_idx ON users_who_share_rooms(other_user_id); -- Make sure that we popualte the table initially From 1bce3e6b35eb3acba335c69c12ec06da9c7051df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jun 2017 14:03:27 +0100 Subject: [PATCH 07/11] Remove unused variables --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5585ee85cf..24c9ffdb20 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -502,7 +502,7 @@ class MessageHandler(BaseHandler): # Ensure that we can round trip before trying to persist in db try: dump = ujson.dumps(event.content) - load = ujson.loads(dump) + ujson.loads(dump) except: logger.exception("Failed to encode content: %r", event.content) raise From 976128f36831e62fe23ec30203292bb80c6a1bf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jun 2017 15:45:24 +0100 Subject: [PATCH 08/11] Update version and changelog --- CHANGES.rst | 34 ++++++++++++++++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 5ef7eec4c2..ce356a11bd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,37 @@ +Changes in synapse v0.22.0-rc1 (2017-06-26) +=========================================== + +Features: + +* Add a user directory API (PR #2252, and many more) +* Add shutdown room API to remove room from local server (PR #2291) +* Add API to quarantine media (PR #2292) +* Add new config option to not send event contents to push servers (PR #2301) + Thanks to @cjdelisle! + +Changes: + +* Various performance fixes (PR #2177, #2233, #2230, #2238, #2248, #2256, + #2274) +* Deduplicate sync filters (PR #2219) Thanks to @krombel! +* Correct a typo in UPGRADE.rst (PR #2231) Thanks to @aaronraimist! +* Add count of one time keys to sync stream (PR #2237) +* Only store event_auth for state events (PR #2247) +* Store URL cache preview downloads separately (PR #2299) + +Bug fixes: + +* Fix users not getting notifications when AS listened to that user_id (PR + #2216) Thanks to @slipeer! +* Fix users without push set up not getting notifications after joining rooms + (PR #2236) +* Fix preview url API to trim long descriptions (PR #2243) +* Fix bug where we used cached but unpersisted state group as prev group, + resulting in broken state of restart (PR #2263) +* Fix removing of pushers when using workers (PR #2267) +* Fix CORS headers to allow Authorization header (PR #2285) Thanks to @krombel! + + Changes in synapse v0.21.1 (2017-06-15) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index 9df7d18993..6b0a766391 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.21.1" +__version__ = "0.22.0-rc1" From ed3d0170d9317644004b1203ecedaba58866ab9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Jun 2017 13:37:04 +0100 Subject: [PATCH 09/11] Batch upsert user ips --- synapse/api/auth.py | 3 +- synapse/storage/client_ips.py | 57 +++++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 0c297cb022..10f4972369 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -23,7 +23,6 @@ from synapse import event_auth from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes from synapse.types import UserID -from synapse.util import logcontext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -200,7 +199,7 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - logcontext.preserve_fn(self.store.insert_client_ip)( + self.store.insert_client_ip( user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 014ab635b7..68955e740b 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from ._base import Cache from . import background_updates @@ -50,7 +50,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks + self._batch_row_update = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) key = (user.to_string(), access_token, ip) @@ -62,28 +68,41 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) + return self.client_ip_last_seen.prefill(key, now) - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "device_id": device_id, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, + self._batch_row_update[key] = (user_agent, device_id, now) + + def _update_client_ips_batch(self): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update ) + def _update_client_ips_batch_txn(self, txn, to_update): + self.database_engine.lock_table(txn, "user_ips") + + for entry in to_update.iteritems(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + + self._simple_upsert_txn( + txn, + table="user_ips", + keyvalues={ + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": device_id, + }, + values={ + "last_seen": last_seen, + }, + lock=False, + ) + @defer.inlineCallbacks def get_last_client_ip_by_device(self, devices): """For each device_id listed, give the user_ip it was last seen on From a0a561ae852c6a031f8611859d95e2ad563770ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Jun 2017 14:46:12 +0100 Subject: [PATCH 10/11] Fix up client ips to read from pending data --- synapse/handlers/device.py | 4 +- synapse/storage/__init__.py | 10 ----- synapse/storage/client_ips.py | 70 +++++++++++++++++++++++++++----- tests/storage/test_client_ips.py | 5 +-- 4 files changed, 62 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 982cda3edf..ed60d494ff 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler): device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in device_map.keys()) + user_id, device_id=None ) devices = device_map.values() @@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler): except errors.StoreError: raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id),) + user_id, device_id, ) _update_device_from_client_ips(device, ips) defer.returnValue(device) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f119c5a758..b92472df33 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -304,16 +304,6 @@ class DataStore(RoomMemberStore, RoomStore, ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) - def get_user_ip_and_agents(self, user): - return self._simple_select_list( - table="user_ips", - keyvalues={"user_id": user.to_string()}, - retcols=[ - "access_token", "ip", "user_agent", "last_seen" - ], - desc="get_user_ip_and_agents", - ) - def get_users(self): """Function to reterive a list of users in users table. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 68955e740b..88a5eb232f 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -50,6 +50,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) + # (user_id, access_token, ip) -> (user_agent, device_id, last_seen) self._batch_row_update = {} self._client_ip_looper = self._clock.looping_call( @@ -104,11 +105,12 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) @defer.inlineCallbacks - def get_last_client_ip_by_device(self, devices): + def get_last_client_ip_by_device(self, user_id, device_id): """For each device_id listed, give the user_ip it was last seen on Args: - devices (iterable[(str, str)]): list of (user_id, device_id) pairs + user_id (str) + device_id (str): If None fetches all devices for the user Returns: defer.Deferred: resolves to a dict, where the keys @@ -119,6 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): res = yield self.runInteraction( "get_last_client_ip_by_device", self._get_last_client_ip_by_device_txn, + user_id, device_id, retcols=( "user_id", "access_token", @@ -127,23 +130,34 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "device_id", "last_seen", ), - devices=devices ) ret = {(d["user_id"], d["device_id"]): d for d in res} + for key in self._batch_row_update: + uid, access_token, ip = key + if uid == user_id: + user_agent, did, last_seen = self._batch_row_update[key] + if not device_id or did == device_id: + ret[(user_id, device_id)] = { + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": did, + "last_seen": last_seen, + } defer.returnValue(ret) @classmethod - def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols): + def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols): where_clauses = [] bindings = [] - for (user_id, device_id) in devices: - if device_id is None: - where_clauses.append("(user_id = ? AND device_id IS NULL)") - bindings.extend((user_id, )) - else: - where_clauses.append("(user_id = ? AND device_id = ?)") - bindings.extend((user_id, device_id)) + if device_id is None: + where_clauses.append("user_id = ?") + bindings.extend((user_id, )) + else: + where_clauses.append("(user_id = ? AND device_id = ?)") + bindings.extend((user_id, device_id)) if not where_clauses: return [] @@ -171,3 +185,37 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): txn.execute(sql, bindings) return cls.cursor_to_dict(txn) + + @defer.inlineCallbacks + def get_user_ip_and_agents(self, user): + user_id = user.to_string() + results = {} + + for key in self._batch_row_update: + uid, access_token, ip = key + if uid == user_id: + user_agent, _, last_seen = self._batch_row_update[key] + results[(access_token, ip)] = (user_agent, last_seen) + + rows = yield self._simple_select_list( + table="user_ips", + keyvalues={"user_id": user_id}, + retcols=[ + "access_token", "ip", "user_agent", "last_seen" + ], + desc="get_user_ip_and_agents", + ) + + results.update( + ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"])) + for row in rows + ) + defer.returnValue(list( + { + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "last_seen": last_seen, + } + for (access_token, ip), (user_agent, last_seen) in results.iteritems() + )) diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 1f0c0e7c37..03df697575 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -43,10 +43,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase): "access_token", "ip", "user_agent", "device_id", ) - # deliberately use an iterable here to make sure that the lookup - # method doesn't iterate it twice - device_list = iter(((user_id, "device_id"),)) - result = yield self.store.get_last_client_ip_by_device(device_list) + result = yield self.store.get_last_client_ip_by_device(user_id, "device_id") r = result[(user_id, "device_id")] self.assertDictContainsSubset( From 78cefd78d6a743f179cd678c4766492819a80203 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Jun 2017 14:58:10 +0100 Subject: [PATCH 11/11] Make workers report to master for user ip updates --- synapse/app/client_reader.py | 4 +- synapse/app/media_repository.py | 4 +- synapse/app/synchrotron.py | 4 +- synapse/app/user_dir.py | 4 +- .../replication/slave/storage/client_ips.py | 48 +++++++++++++++++++ synapse/replication/tcp/client.py | 7 +++ synapse/replication/tcp/commands.py | 32 +++++++++++++ synapse/replication/tcp/protocol.py | 6 +++ synapse/replication/tcp/resource.py | 10 ++++ 9 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 synapse/replication/slave/storage/client_ips.py diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 9b72c649ac..09bc1935f1 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -24,6 +24,7 @@ from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore @@ -33,7 +34,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer -from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext @@ -65,8 +65,8 @@ class ClientReaderSlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, TransactionStore, + SlavedClientIpStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStore because the constructor is different ): pass diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 26c4416956..f57ec784fe 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -23,13 +23,13 @@ from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer -from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree @@ -60,10 +60,10 @@ logger = logging.getLogger("synapse.app.media_repository") class MediaRepositorySlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, + SlavedClientIpStore, TransactionStore, BaseSlavedStore, MediaRepositoryStore, - ClientIpStore, ): pass diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 13c00ef2ba..4bdd99a966 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -29,6 +29,7 @@ from synapse.rest.client.v1 import events from synapse.rest.client.v1.room import RoomInitialSyncRestServlet from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -42,7 +43,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer -from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore @@ -77,9 +77,9 @@ class SynchrotronSlavedStore( SlavedPresenceStore, SlavedDeviceInboxStore, SlavedDeviceStore, + SlavedClientIpStore, RoomStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStore because the constructor is different ): who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 6d2aebe8de..8c6300db9d 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -26,12 +26,12 @@ from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v2_alpha import user_directory from synapse.storage.engines import create_engine -from synapse.storage.client_ips import ClientIpStore from synapse.storage.user_directory import UserDirectoryStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -58,9 +58,9 @@ class UserDirectorySlaveStore( SlavedEventStore, SlavedApplicationServiceStore, SlavedRegistrationStore, + SlavedClientIpStore, UserDirectoryStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStore because the constructor is different ): def __init__(self, db_conn, hs): super(UserDirectorySlaveStore, self).__init__(db_conn, hs) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py new file mode 100644 index 0000000000..65250285e8 --- /dev/null +++ b/synapse/replication/slave/storage/client_ips.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.client_ips import LAST_SEEN_GRANULARITY +from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.caches.descriptors import Cache + + +class SlavedClientIpStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedClientIpStore, self).__init__(db_conn, hs) + + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + max_entries=50000 * CACHE_SIZE_FACTOR, + ) + + def insert_client_ip(self, user, access_token, ip, user_agent, device_id): + now = int(self._clock.time_msec()) + user_id = user.to_string() + key = (user_id, access_token, ip) + + try: + last_seen = self.client_ip_last_seen.get(key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + return + + self.hs.get_tcp_replication().send_user_ip( + user_id, access_token, ip, user_agent, device_id, now + ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 90fb6c1336..6d2513c4e2 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from .commands import ( FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand, + UserIpCommand, ) from .protocol import ClientReplicationStreamProtocol @@ -178,6 +179,12 @@ class ReplicationClientHandler(object): cmd = InvalidateCacheCommand(cache_func.__name__, keys) self.send_command(cmd) + def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + """Tell the master that the user made a request. + """ + cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) + self.send_command(cmd) + def await_sync(self, data): """Returns a deferred that is resolved when we receive a SYNC command with given data. diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 84d2a2272a..a009214e43 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command): return " ".join((self.cache_func, json.dumps(self.keys))) +class UserIpCommand(Command): + """Sent periodically when a worker sees activity from a client. + + Format:: + + USER_IP , , , , , + """ + NAME = "USER_IP" + + def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen): + self.user_id = user_id + self.access_token = access_token + self.ip = ip + self.user_agent = user_agent + self.device_id = device_id + self.last_seen = last_seen + + @classmethod + def from_line(cls, line): + user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5) + + return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen)) + + def to_line(self): + return " ".join(( + self.user_id, self.access_token, self.ip, self.device_id, + str(self.last_seen), self.user_agent, + )) + + # Map of command name to command type. COMMAND_MAP = { cmd.NAME: cmd @@ -320,6 +350,7 @@ COMMAND_MAP = { SyncCommand, RemovePusherCommand, InvalidateCacheCommand, + UserIpCommand, ) } @@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = ( FederationAckCommand.NAME, RemovePusherCommand.NAME, InvalidateCacheCommand.NAME, + UserIpCommand.NAME, ErrorCommand.NAME, ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 9fee2a484b..062272f8dd 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_INVALIDATE_CACHE(self, cmd): self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + def on_USER_IP(self, cmd): + self.streamer.on_user_ip( + cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id, + cmd.last_seen, + ) + @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): """Subscribe the remote to a streams. diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 69c46911ec..3ea3ca5a6f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync") federation_ack_counter = metrics.register_counter("federation_ack") remove_pusher_counter = metrics.register_counter("remove_pusher") invalidate_cache_counter = metrics.register_counter("invalidate_cache") +user_ip_cache_counter = metrics.register_counter("user_ip_cache") logger = logging.getLogger(__name__) @@ -238,6 +239,15 @@ class ReplicationStreamer(object): invalidate_cache_counter.inc() getattr(self.store, cache_func).invalidate(tuple(keys)) + @measure_func("repl.on_user_ip") + def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + """The client saw a user request + """ + user_ip_cache_counter.inc() + self.store.insert_client_ip( + user_id, access_token, ip, user_agent, device_id, last_seen, + ) + def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients.