1
0

Merge branch 'develop' of github.com:matrix-org/synapse into neilj/drop_tables_in_1830

This commit is contained in:
Neil Johnson
2019-04-03 16:13:28 +01:00
15 changed files with 131 additions and 121 deletions

1
changelog.d/4985.misc Normal file
View File

@@ -0,0 +1 @@
Rewrite KeyringTestCase as a HomeserverTestCase.

1
changelog.d/4998.misc Normal file
View File

@@ -0,0 +1 @@
Fix grammar in get_current_users_in_room and give it a docstring.

View File

@@ -68,7 +68,7 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
servers = set(get_domain_from_id(u) for u in users)
if not servers:
@@ -268,7 +268,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND
)
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
extra_servers = set(get_domain_from_id(u) for u in users)
servers = set(extra_servers) | set(servers)

View File

@@ -102,7 +102,7 @@ class EventStreamHandler(BaseHandler):
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = yield self.state.get_current_user_in_room(event.room_id)
users = yield self.state.get_current_users_in_room(event.room_id)
states = yield presence_handler.get_states(
users,
as_event=True,

View File

@@ -192,7 +192,7 @@ class MessageHandler(object):
"Getting joined members after leaving is not implemented"
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
users_with_profile = yield self.state.get_current_users_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there

View File

@@ -883,7 +883,7 @@ class PresenceHandler(object):
# TODO: Check that this is actually a new server joining the
# room.
user_ids = yield self.state.get_current_user_in_room(room_id)
user_ids = yield self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, user_ids))
states = yield self.current_state_for_users(user_ids)

View File

@@ -167,7 +167,7 @@ class RoomListHandler(BaseHandler):
if not latest_event_ids:
return
joined_users = yield self.state_handler.get_current_user_in_room(
joined_users = yield self.state_handler.get_current_users_in_room(
room_id, latest_event_ids,
)

View File

@@ -1049,11 +1049,11 @@ class SyncHandler(object):
# TODO: Be more clever than this, i.e. remove users who we already
# share a room with?
for room_id in newly_joined_rooms:
joined_users = yield self.state.get_current_user_in_room(room_id)
joined_users = yield self.state.get_current_users_in_room(room_id)
newly_joined_users.update(joined_users)
for room_id in newly_left_rooms:
left_users = yield self.state.get_current_user_in_room(room_id)
left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)
# TODO: Check that these users are actually new, i.e. either they
@@ -1213,7 +1213,7 @@ class SyncHandler(object):
extra_users_ids = set(newly_joined_users)
for room_id in newly_joined_rooms:
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
@@ -1855,7 +1855,7 @@ class SyncHandler(object):
extrems = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering,
)
users_in_room = yield self.state.get_current_user_in_room(
users_in_room = yield self.state.get_current_users_in_room(
room_id, extrems,
)
if user_id in users_in_room:

View File

@@ -218,7 +218,7 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _push_remote(self, member, typing):
try:
users = yield self.state.get_current_user_in_room(member.room_id)
users = yield self.state.get_current_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
@@ -261,7 +261,7 @@ class TypingHandler(object):
)
return
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
domains = set(get_domain_from_id(u) for u in users)
if self.server_name in domains:

View File

@@ -276,7 +276,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# ignore the change
return
users_with_profile = yield self.state.get_current_user_in_room(room_id)
users_with_profile = yield self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in iterkeys(users_with_profile):
@@ -325,7 +325,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id
)
# Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_user_in_room(room_id)
users_with_profile = yield self.state.get_current_users_in_room(room_id)
if is_public:
yield self.store.add_users_in_public_rooms(room_id, (user_id,))

View File

@@ -499,7 +499,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
# desirable in case the first attempt at blocking the room failed below.
yield self.store.block_room(room_id, requester_user_id)
users = yield self.state.get_current_user_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:

View File

@@ -161,10 +161,21 @@ class StateHandler(object):
defer.returnValue(state)
@defer.inlineCallbacks
def get_current_user_in_room(self, room_id, latest_event_ids=None):
def get_current_users_in_room(self, room_id, latest_event_ids=None):
"""
Get the users who are currently in a room.
Args:
room_id (str): The ID of the room.
latest_event_ids (List[str]|None): Precomputed list of latest
event IDs. Will be computed if None.
Returns:
Deferred[Dict[str,ProfileInfo]]: Dictionary of user IDs to their
profileinfo.
"""
if not latest_event_ids:
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_user_in_room")
logger.debug("calling resolve_state_groups from get_current_users_in_room")
entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
joined_users = yield self.store.get_joined_users_from_state(room_id, entry)
defer.returnValue(joined_users)

View File

@@ -194,7 +194,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
room_id
)
users_with_profile = yield state.get_current_user_in_room(room_id)
users_with_profile = yield state.get_current_users_in_room(room_id)
user_ids = set(users_with_profile)
# Update each user in the user directory.

View File

@@ -19,14 +19,14 @@ from mock import Mock
import signedjson.key
import signedjson.sign
from twisted.internet import defer, reactor
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.crypto import keyring
from synapse.util import Clock, logcontext
from synapse.util import logcontext
from synapse.util.logcontext import LoggingContext
from tests import unittest, utils
from tests import unittest
class MockPerspectiveServer(object):
@@ -52,75 +52,50 @@ class MockPerspectiveServer(object):
return res
class KeyringTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
class KeyringTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
self.mock_perspective_server = MockPerspectiveServer()
self.http_client = Mock()
self.hs = yield utils.setup_test_homeserver(
self.addCleanup, handlers=None, http_client=self.http_client
)
hs = self.setup_test_homeserver(handlers=None, http_client=self.http_client)
keys = self.mock_perspective_server.get_verify_keys()
self.hs.config.perspectives = {self.mock_perspective_server.server_name: keys}
def assert_sentinel_context(self):
if LoggingContext.current_context() != LoggingContext.sentinel:
self.fail(
"Expected sentinel context but got %s" % (
LoggingContext.current_context(),
)
)
hs.config.perspectives = {self.mock_perspective_server.server_name: keys}
return hs
def check_context(self, _, expected):
self.assertEquals(
getattr(LoggingContext.current_context(), "request", None), expected
)
@defer.inlineCallbacks
def test_wait_for_previous_lookups(self):
kr = keyring.Keyring(self.hs)
lookup_1_deferred = defer.Deferred()
lookup_2_deferred = defer.Deferred()
with LoggingContext("one") as context_one:
context_one.request = "one"
# we run the lookup in a logcontext so that the patched inlineCallbacks can check
# it is doing the right thing with logcontexts.
wait_1_deferred = run_in_context(
kr.wait_for_previous_lookups, ["server1"], {"server1": lookup_1_deferred}
)
wait_1_deferred = kr.wait_for_previous_lookups(
["server1"], {"server1": lookup_1_deferred}
)
# there were no previous lookups, so the deferred should be ready
self.successResultOf(wait_1_deferred)
# there were no previous lookups, so the deferred should be ready
self.assertTrue(wait_1_deferred.called)
# ... so we should have preserved the LoggingContext.
self.assertIs(LoggingContext.current_context(), context_one)
wait_1_deferred.addBoth(self.check_context, "one")
# set off another wait. It should block because the first lookup
# hasn't yet completed.
wait_2_deferred = run_in_context(
kr.wait_for_previous_lookups, ["server1"], {"server1": lookup_2_deferred}
)
with LoggingContext("two") as context_two:
context_two.request = "two"
self.assertFalse(wait_2_deferred.called)
# set off another wait. It should block because the first lookup
# hasn't yet completed.
wait_2_deferred = kr.wait_for_previous_lookups(
["server1"], {"server1": lookup_2_deferred}
)
self.assertFalse(wait_2_deferred.called)
# let the first lookup complete (in the sentinel context)
lookup_1_deferred.callback(None)
# ... so we should have reset the LoggingContext.
self.assert_sentinel_context()
# now the second wait should complete.
self.successResultOf(wait_2_deferred)
wait_2_deferred.addBoth(self.check_context, "two")
# let the first lookup complete (in the sentinel context)
lookup_1_deferred.callback(None)
# now the second wait should complete and restore our
# loggingcontext.
yield wait_2_deferred
@defer.inlineCallbacks
def test_verify_json_objects_for_server_awaits_previous_requests(self):
clock = Clock(reactor)
key1 = signedjson.key.generate_signing_key(1)
kr = keyring.Keyring(self.hs)
@@ -145,81 +120,103 @@ class KeyringTestCase(unittest.TestCase):
self.http_client.post_json.side_effect = get_perspectives
with LoggingContext("11") as context_11:
context_11.request = "11"
# start off a first set of lookups
@defer.inlineCallbacks
def first_lookup():
with LoggingContext("11") as context_11:
context_11.request = "11"
# start off a first set of lookups
res_deferreds = kr.verify_json_objects_for_server(
[("server10", json1), ("server11", {})]
)
res_deferreds = kr.verify_json_objects_for_server(
[("server10", json1), ("server11", {})]
)
# the unsigned json should be rejected pretty quickly
self.assertTrue(res_deferreds[1].called)
try:
yield res_deferreds[1]
self.assertFalse("unsigned json didn't cause a failure")
except SynapseError:
pass
# the unsigned json should be rejected pretty quickly
self.assertTrue(res_deferreds[1].called)
try:
yield res_deferreds[1]
self.assertFalse("unsigned json didn't cause a failure")
except SynapseError:
pass
self.assertFalse(res_deferreds[0].called)
res_deferreds[0].addBoth(self.check_context, None)
self.assertFalse(res_deferreds[0].called)
res_deferreds[0].addBoth(self.check_context, None)
# wait a tick for it to send the request to the perspectives server
# (it first tries the datastore)
yield clock.sleep(1) # XXX find out why this takes so long!
self.http_client.post_json.assert_called_once()
yield logcontext.make_deferred_yieldable(res_deferreds[0])
self.assertIs(LoggingContext.current_context(), context_11)
# let verify_json_objects_for_server finish its work before we kill the
# logcontext
yield self.clock.sleep(0)
context_12 = LoggingContext("12")
context_12.request = "12"
with logcontext.PreserveLoggingContext(context_12):
# a second request for a server with outstanding requests
# should block rather than start a second call
d0 = first_lookup()
# wait a tick for it to send the request to the perspectives server
# (it first tries the datastore)
self.pump()
self.http_client.post_json.assert_called_once()
# a second request for a server with outstanding requests
# should block rather than start a second call
@defer.inlineCallbacks
def second_lookup():
with LoggingContext("12") as context_12:
context_12.request = "12"
self.http_client.post_json.reset_mock()
self.http_client.post_json.return_value = defer.Deferred()
res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1)]
[("server10", json1, )]
)
yield clock.sleep(1)
self.http_client.post_json.assert_not_called()
res_deferreds_2[0].addBoth(self.check_context, None)
yield logcontext.make_deferred_yieldable(res_deferreds_2[0])
# complete the first request
with logcontext.PreserveLoggingContext():
persp_deferred.callback(persp_resp)
self.assertIs(LoggingContext.current_context(), context_11)
# let verify_json_objects_for_server finish its work before we kill the
# logcontext
yield self.clock.sleep(0)
with logcontext.PreserveLoggingContext():
yield res_deferreds[0]
yield res_deferreds_2[0]
d2 = second_lookup()
self.pump()
self.http_client.post_json.assert_not_called()
# complete the first request
persp_deferred.callback(persp_resp)
self.get_success(d0)
self.get_success(d2)
@defer.inlineCallbacks
def test_verify_json_for_server(self):
kr = keyring.Keyring(self.hs)
key1 = signedjson.key.generate_signing_key(1)
yield self.hs.datastore.store_server_verify_key(
r = self.hs.datastore.store_server_verify_key(
"server9", "", time.time() * 1000, signedjson.key.get_verify_key(key1)
)
self.get_success(r)
json1 = {}
signedjson.sign.sign_json(json1, "server9", key1)
with LoggingContext("one") as context_one:
context_one.request = "one"
# should fail immediately on an unsigned object
d = _verify_json_for_server(kr, "server9", {})
self.failureResultOf(d, SynapseError)
defer = kr.verify_json_for_server("server9", {})
try:
yield defer
self.fail("should fail on unsigned json")
except SynapseError:
pass
self.assertIs(LoggingContext.current_context(), context_one)
d = _verify_json_for_server(kr, "server9", json1)
self.assertFalse(d.called)
self.get_success(d)
defer = kr.verify_json_for_server("server9", json1)
self.assertFalse(defer.called)
self.assert_sentinel_context()
yield defer
self.assertIs(LoggingContext.current_context(), context_one)
@defer.inlineCallbacks
def run_in_context(f, *args, **kwargs):
with LoggingContext("testctx"):
rv = yield f(*args, **kwargs)
defer.returnValue(rv)
def _verify_json_for_server(keyring, server_name, json_object):
"""thin wrapper around verify_json_for_server which makes sure it is wrapped
with the patched defer.inlineCallbacks.
"""
@defer.inlineCallbacks
def v():
rv1 = yield keyring.verify_json_for_server(server_name, json_object)
defer.returnValue(rv1)
return run_in_context(v)

View File

@@ -121,9 +121,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.datastore.get_joined_hosts_for_room = get_joined_hosts_for_room
def get_current_user_in_room(room_id):
def get_current_users_in_room(room_id):
return set(str(u) for u in self.room_members)
hs.get_state_handler().get_current_user_in_room = get_current_user_in_room
hs.get_state_handler().get_current_users_in_room = get_current_users_in_room
self.datastore.get_user_directory_stream_pos.return_value = (
# we deliberately return a non-None stream pos to avoid doing an initial_spam