Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
+34
@@ -1,3 +1,37 @@
|
||||
Changes in synapse v0.22.0-rc1 (2017-06-26)
|
||||
===========================================
|
||||
|
||||
Features:
|
||||
|
||||
* Add a user directory API (PR #2252, and many more)
|
||||
* Add shutdown room API to remove room from local server (PR #2291)
|
||||
* Add API to quarantine media (PR #2292)
|
||||
* Add new config option to not send event contents to push servers (PR #2301)
|
||||
Thanks to @cjdelisle!
|
||||
|
||||
Changes:
|
||||
|
||||
* Various performance fixes (PR #2177, #2233, #2230, #2238, #2248, #2256,
|
||||
#2274)
|
||||
* Deduplicate sync filters (PR #2219) Thanks to @krombel!
|
||||
* Correct a typo in UPGRADE.rst (PR #2231) Thanks to @aaronraimist!
|
||||
* Add count of one time keys to sync stream (PR #2237)
|
||||
* Only store event_auth for state events (PR #2247)
|
||||
* Store URL cache preview downloads separately (PR #2299)
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Fix users not getting notifications when AS listened to that user_id (PR
|
||||
#2216) Thanks to @slipeer!
|
||||
* Fix users without push set up not getting notifications after joining rooms
|
||||
(PR #2236)
|
||||
* Fix preview url API to trim long descriptions (PR #2243)
|
||||
* Fix bug where we used cached but unpersisted state group as prev group,
|
||||
resulting in broken state of restart (PR #2263)
|
||||
* Fix removing of pushers when using workers (PR #2267)
|
||||
* Fix CORS headers to allow Authorization header (PR #2285) Thanks to @krombel!
|
||||
|
||||
|
||||
Changes in synapse v0.21.1 (2017-06-15)
|
||||
=======================================
|
||||
|
||||
|
||||
+1
-1
@@ -16,4 +16,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.21.1"
|
||||
__version__ = "0.22.0-rc1"
|
||||
|
||||
+1
-2
@@ -23,7 +23,6 @@ from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, Membership, JoinRules
|
||||
from synapse.api.errors import AuthError, Codes
|
||||
from synapse.types import UserID
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -200,7 +199,7 @@ class Auth(object):
|
||||
default=[""]
|
||||
)[0]
|
||||
if user and access_token and ip_addr:
|
||||
logcontext.preserve_fn(self.store.insert_client_ip)(
|
||||
self.store.insert_client_ip(
|
||||
user=user,
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
|
||||
@@ -24,6 +24,7 @@ from synapse.http.server import JsonResource
|
||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
@@ -33,7 +34,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.room import PublicRoomListRestServlet
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.client_ips import ClientIpStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
@@ -65,8 +65,8 @@ class ClientReaderSlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
TransactionStore,
|
||||
SlavedClientIpStore,
|
||||
BaseSlavedStore,
|
||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@@ -23,13 +23,13 @@ from synapse.http.site import SynapseSite
|
||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.client_ips import ClientIpStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.media_repository import MediaRepositoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
@@ -60,10 +60,10 @@ logger = logging.getLogger("synapse.app.media_repository")
|
||||
class MediaRepositorySlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
TransactionStore,
|
||||
BaseSlavedStore,
|
||||
MediaRepositoryStore,
|
||||
ClientIpStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from synapse.rest.client.v1 import events
|
||||
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
|
||||
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
@@ -42,7 +43,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.client_ips import ClientIpStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.storage.roommember import RoomMemberStore
|
||||
@@ -77,9 +77,9 @@ class SynchrotronSlavedStore(
|
||||
SlavedPresenceStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
BaseSlavedStore,
|
||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
||||
):
|
||||
who_forgot_in_room = (
|
||||
RoomMemberStore.__dict__["who_forgot_in_room"]
|
||||
|
||||
@@ -26,12 +26,12 @@ from synapse.http.server import JsonResource
|
||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v2_alpha import user_directory
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.client_ips import ClientIpStore
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
|
||||
@@ -58,9 +58,9 @@ class UserDirectorySlaveStore(
|
||||
SlavedEventStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
UserDirectoryStore,
|
||||
BaseSlavedStore,
|
||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
||||
):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
|
||||
|
||||
@@ -33,6 +33,7 @@ from .jwt import JWTConfig
|
||||
from .password_auth_providers import PasswordAuthProviderConfig
|
||||
from .emailconfig import EmailConfig
|
||||
from .workers import WorkerConfig
|
||||
from .push import PushConfig
|
||||
|
||||
|
||||
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
|
||||
@@ -40,7 +41,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
|
||||
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
|
||||
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
|
||||
JWTConfig, PasswordConfig, EmailConfig,
|
||||
WorkerConfig, PasswordAuthProviderConfig,):
|
||||
WorkerConfig, PasswordAuthProviderConfig, PushConfig,):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import Config
|
||||
|
||||
|
||||
class PushConfig(Config):
|
||||
def read_config(self, config):
|
||||
self.push_redact_content = False
|
||||
|
||||
push_config = config.get("email", {})
|
||||
self.push_redact_content = push_config.get("redact_content", False)
|
||||
|
||||
def default_config(self, config_dir_path, server_name, **kwargs):
|
||||
return """
|
||||
# Control how push messages are sent to google/apple to notifications.
|
||||
# Normally every message said in a room with one or more people using
|
||||
# mobile devices will be posted to a push server hosted by matrix.org
|
||||
# which is registered with google and apple in order to allow push
|
||||
# notifications to be sent to these mobile devices.
|
||||
#
|
||||
# Setting redact_content to true will make the push messages contain no
|
||||
# message content which will provide increased privacy. This is a
|
||||
# temporary solution pending improvements to Android and iPhone apps
|
||||
# to get content from the app rather than the notification.
|
||||
#
|
||||
# For modern android devices the notification content will still appear
|
||||
# because it is loaded by the app. iPhone, however will send a
|
||||
# notification saying only that a message arrived and who it came from.
|
||||
#
|
||||
#push:
|
||||
# redact_content: false
|
||||
"""
|
||||
@@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler):
|
||||
device_map = yield self.store.get_devices_by_user(user_id)
|
||||
|
||||
ips = yield self.store.get_last_client_ip_by_device(
|
||||
devices=((user_id, device_id) for device_id in device_map.keys())
|
||||
user_id, device_id=None
|
||||
)
|
||||
|
||||
devices = device_map.values()
|
||||
@@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler):
|
||||
except errors.StoreError:
|
||||
raise errors.NotFoundError
|
||||
ips = yield self.store.get_last_client_ip_by_device(
|
||||
devices=((user_id, device_id),)
|
||||
user_id, device_id,
|
||||
)
|
||||
_update_device_from_client_ips(device, ips)
|
||||
defer.returnValue(device)
|
||||
|
||||
@@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json
|
||||
|
||||
import logging
|
||||
import random
|
||||
import ujson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -498,6 +499,14 @@ class MessageHandler(BaseHandler):
|
||||
logger.warn("Denying new event %r because %s", event, err)
|
||||
raise err
|
||||
|
||||
# Ensure that we can round trip before trying to persist in db
|
||||
try:
|
||||
dump = ujson.dumps(event.content)
|
||||
ujson.loads(dump)
|
||||
except:
|
||||
logger.exception("Failed to encode content: %r", event.content)
|
||||
raise
|
||||
|
||||
yield self.maybe_kick_guest_users(event, context)
|
||||
|
||||
if event.type == EventTypes.CanonicalAlias:
|
||||
|
||||
@@ -275,7 +275,7 @@ class HttpPusher(object):
|
||||
if event.type == 'm.room.member':
|
||||
d['notification']['membership'] = event.content['membership']
|
||||
d['notification']['user_is_target'] = event.state_key == self.user_id
|
||||
if 'content' in event:
|
||||
if not self.hs.config.push_redact_content and 'content' in event:
|
||||
d['notification']['content'] = event.content
|
||||
|
||||
# We no longer send aliases separately, instead, we send the human
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches.descriptors import Cache
|
||||
|
||||
|
||||
class SlavedClientIpStore(BaseSlavedStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(SlavedClientIpStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.client_ip_last_seen = Cache(
|
||||
name="client_ip_last_seen",
|
||||
keylen=4,
|
||||
max_entries=50000 * CACHE_SIZE_FACTOR,
|
||||
)
|
||||
|
||||
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
|
||||
now = int(self._clock.time_msec())
|
||||
user_id = user.to_string()
|
||||
key = (user_id, access_token, ip)
|
||||
|
||||
try:
|
||||
last_seen = self.client_ip_last_seen.get(key)
|
||||
except KeyError:
|
||||
last_seen = None
|
||||
|
||||
# Rate-limited inserts
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
return
|
||||
|
||||
self.hs.get_tcp_replication().send_user_ip(
|
||||
user_id, access_token, ip, user_agent, device_id, now
|
||||
)
|
||||
@@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
|
||||
|
||||
from .commands import (
|
||||
FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
|
||||
UserIpCommand,
|
||||
)
|
||||
from .protocol import ClientReplicationStreamProtocol
|
||||
|
||||
@@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
|
||||
cmd = InvalidateCacheCommand(cache_func.__name__, keys)
|
||||
self.send_command(cmd)
|
||||
|
||||
def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||
"""Tell the master that the user made a request.
|
||||
"""
|
||||
cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
|
||||
self.send_command(cmd)
|
||||
|
||||
def await_sync(self, data):
|
||||
"""Returns a deferred that is resolved when we receive a SYNC command
|
||||
with given data.
|
||||
|
||||
@@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command):
|
||||
return " ".join((self.cache_func, json.dumps(self.keys)))
|
||||
|
||||
|
||||
class UserIpCommand(Command):
|
||||
"""Sent periodically when a worker sees activity from a client.
|
||||
|
||||
Format::
|
||||
|
||||
USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
|
||||
"""
|
||||
NAME = "USER_IP"
|
||||
|
||||
def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||
self.user_id = user_id
|
||||
self.access_token = access_token
|
||||
self.ip = ip
|
||||
self.user_agent = user_agent
|
||||
self.device_id = device_id
|
||||
self.last_seen = last_seen
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
|
||||
|
||||
return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((
|
||||
self.user_id, self.access_token, self.ip, self.device_id,
|
||||
str(self.last_seen), self.user_agent,
|
||||
))
|
||||
|
||||
|
||||
# Map of command name to command type.
|
||||
COMMAND_MAP = {
|
||||
cmd.NAME: cmd
|
||||
@@ -320,6 +350,7 @@ COMMAND_MAP = {
|
||||
SyncCommand,
|
||||
RemovePusherCommand,
|
||||
InvalidateCacheCommand,
|
||||
UserIpCommand,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = (
|
||||
FederationAckCommand.NAME,
|
||||
RemovePusherCommand.NAME,
|
||||
InvalidateCacheCommand.NAME,
|
||||
UserIpCommand.NAME,
|
||||
ErrorCommand.NAME,
|
||||
)
|
||||
|
||||
@@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||
def on_INVALIDATE_CACHE(self, cmd):
|
||||
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
|
||||
|
||||
def on_USER_IP(self, cmd):
|
||||
self.streamer.on_user_ip(
|
||||
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
|
||||
cmd.last_seen,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def subscribe_to_stream(self, stream_name, token):
|
||||
"""Subscribe the remote to a streams.
|
||||
|
||||
@@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
|
||||
federation_ack_counter = metrics.register_counter("federation_ack")
|
||||
remove_pusher_counter = metrics.register_counter("remove_pusher")
|
||||
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
|
||||
user_ip_cache_counter = metrics.register_counter("user_ip_cache")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -238,6 +239,15 @@ class ReplicationStreamer(object):
|
||||
invalidate_cache_counter.inc()
|
||||
getattr(self.store, cache_func).invalidate(tuple(keys))
|
||||
|
||||
@measure_func("repl.on_user_ip")
|
||||
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||
"""The client saw a user request
|
||||
"""
|
||||
user_ip_cache_counter.inc()
|
||||
self.store.insert_client_ip(
|
||||
user_id, access_token, ip, user_agent, device_id, last_seen,
|
||||
)
|
||||
|
||||
def send_sync_to_all_connections(self, data):
|
||||
"""Sends a SYNC command to all clients.
|
||||
|
||||
|
||||
@@ -304,16 +304,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
ret = yield self.runInteraction("count_users", _count_users)
|
||||
defer.returnValue(ret)
|
||||
|
||||
def get_user_ip_and_agents(self, user):
|
||||
return self._simple_select_list(
|
||||
table="user_ips",
|
||||
keyvalues={"user_id": user.to_string()},
|
||||
retcols=[
|
||||
"access_token", "ip", "user_agent", "last_seen"
|
||||
],
|
||||
desc="get_user_ip_and_agents",
|
||||
)
|
||||
|
||||
def get_users(self):
|
||||
"""Function to reterive a list of users in users table.
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
from ._base import Cache
|
||||
from . import background_updates
|
||||
@@ -50,7 +50,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
columns=["user_id", "device_id", "last_seen"],
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
|
||||
self._batch_row_update = {}
|
||||
|
||||
self._client_ip_looper = self._clock.looping_call(
|
||||
self._update_client_ips_batch, 5 * 1000
|
||||
)
|
||||
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
|
||||
|
||||
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
|
||||
now = int(self._clock.time_msec())
|
||||
key = (user.to_string(), access_token, ip)
|
||||
@@ -62,34 +69,48 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
|
||||
# Rate-limited inserts
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
defer.returnValue(None)
|
||||
return
|
||||
|
||||
self.client_ip_last_seen.prefill(key, now)
|
||||
|
||||
# It's safe not to lock here: a) no unique constraint,
|
||||
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
|
||||
yield self._simple_upsert(
|
||||
"user_ips",
|
||||
keyvalues={
|
||||
"user_id": user.to_string(),
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"device_id": device_id,
|
||||
},
|
||||
values={
|
||||
"last_seen": now,
|
||||
},
|
||||
desc="insert_client_ip",
|
||||
lock=False,
|
||||
self._batch_row_update[key] = (user_agent, device_id, now)
|
||||
|
||||
def _update_client_ips_batch(self):
|
||||
to_update = self._batch_row_update
|
||||
self._batch_row_update = {}
|
||||
return self.runInteraction(
|
||||
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
|
||||
)
|
||||
|
||||
def _update_client_ips_batch_txn(self, txn, to_update):
|
||||
self.database_engine.lock_table(txn, "user_ips")
|
||||
|
||||
for entry in to_update.iteritems():
|
||||
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
|
||||
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
table="user_ips",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"device_id": device_id,
|
||||
},
|
||||
values={
|
||||
"last_seen": last_seen,
|
||||
},
|
||||
lock=False,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_last_client_ip_by_device(self, devices):
|
||||
def get_last_client_ip_by_device(self, user_id, device_id):
|
||||
"""For each device_id listed, give the user_ip it was last seen on
|
||||
|
||||
Args:
|
||||
devices (iterable[(str, str)]): list of (user_id, device_id) pairs
|
||||
user_id (str)
|
||||
device_id (str): If None fetches all devices for the user
|
||||
|
||||
Returns:
|
||||
defer.Deferred: resolves to a dict, where the keys
|
||||
@@ -100,6 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
res = yield self.runInteraction(
|
||||
"get_last_client_ip_by_device",
|
||||
self._get_last_client_ip_by_device_txn,
|
||||
user_id, device_id,
|
||||
retcols=(
|
||||
"user_id",
|
||||
"access_token",
|
||||
@@ -108,23 +130,34 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
"device_id",
|
||||
"last_seen",
|
||||
),
|
||||
devices=devices
|
||||
)
|
||||
|
||||
ret = {(d["user_id"], d["device_id"]): d for d in res}
|
||||
for key in self._batch_row_update:
|
||||
uid, access_token, ip = key
|
||||
if uid == user_id:
|
||||
user_agent, did, last_seen = self._batch_row_update[key]
|
||||
if not device_id or did == device_id:
|
||||
ret[(user_id, device_id)] = {
|
||||
"user_id": user_id,
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"device_id": did,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
defer.returnValue(ret)
|
||||
|
||||
@classmethod
|
||||
def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):
|
||||
def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
|
||||
where_clauses = []
|
||||
bindings = []
|
||||
for (user_id, device_id) in devices:
|
||||
if device_id is None:
|
||||
where_clauses.append("(user_id = ? AND device_id IS NULL)")
|
||||
bindings.extend((user_id, ))
|
||||
else:
|
||||
where_clauses.append("(user_id = ? AND device_id = ?)")
|
||||
bindings.extend((user_id, device_id))
|
||||
if device_id is None:
|
||||
where_clauses.append("user_id = ?")
|
||||
bindings.extend((user_id, ))
|
||||
else:
|
||||
where_clauses.append("(user_id = ? AND device_id = ?)")
|
||||
bindings.extend((user_id, device_id))
|
||||
|
||||
if not where_clauses:
|
||||
return []
|
||||
@@ -152,3 +185,37 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
|
||||
txn.execute(sql, bindings)
|
||||
return cls.cursor_to_dict(txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_ip_and_agents(self, user):
|
||||
user_id = user.to_string()
|
||||
results = {}
|
||||
|
||||
for key in self._batch_row_update:
|
||||
uid, access_token, ip = key
|
||||
if uid == user_id:
|
||||
user_agent, _, last_seen = self._batch_row_update[key]
|
||||
results[(access_token, ip)] = (user_agent, last_seen)
|
||||
|
||||
rows = yield self._simple_select_list(
|
||||
table="user_ips",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcols=[
|
||||
"access_token", "ip", "user_agent", "last_seen"
|
||||
],
|
||||
desc="get_user_ip_and_agents",
|
||||
)
|
||||
|
||||
results.update(
|
||||
((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
|
||||
for row in rows
|
||||
)
|
||||
defer.returnValue(list(
|
||||
{
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
for (access_token, ip), (user_agent, last_seen) in results.iteritems()
|
||||
))
|
||||
|
||||
@@ -25,7 +25,8 @@ CREATE TABLE users_who_share_rooms (
|
||||
|
||||
|
||||
CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id);
|
||||
CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id);
|
||||
CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id);
|
||||
CREATE INDEX users_who_share_rooms_o_idx ON users_who_share_rooms(other_user_id);
|
||||
|
||||
|
||||
-- Make sure that we popualte the table initially
|
||||
|
||||
@@ -43,10 +43,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
|
||||
"access_token", "ip", "user_agent", "device_id",
|
||||
)
|
||||
|
||||
# deliberately use an iterable here to make sure that the lookup
|
||||
# method doesn't iterate it twice
|
||||
device_list = iter(((user_id, "device_id"),))
|
||||
result = yield self.store.get_last_client_ip_by_device(device_list)
|
||||
result = yield self.store.get_last_client_ip_by_device(user_id, "device_id")
|
||||
|
||||
r = result[(user_id, "device_id")]
|
||||
self.assertDictContainsSubset(
|
||||
|
||||
Reference in New Issue
Block a user