1
0

Compare commits

...

101 Commits

Author SHA1 Message Date
Richard van der Hoff
86157b9743 Merge branch 'rav/fix_event_sig_checks' into matrix-org/fix_event_sig_checks 2018-09-05 15:21:36 +01:00
Erik Johnston
2f141f4c41 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-22 11:47:08 +01:00
Erik Johnston
764030cf63 Merge pull request #3659 from matrix-org/erikj/split_profiles
Allow profile updates to happen on workers
2018-08-22 11:35:55 +01:00
Erik Johnston
8432e2ebd7 Rename WorkerProfileHandler to BaseProfileHandler 2018-08-22 10:13:40 +01:00
Erik Johnston
a81f140880 Add assert to ensure handler is only run on master 2018-08-22 10:11:21 +01:00
Erik Johnston
47b25ba5f3 Remove redundant vars 2018-08-22 10:09:05 +01:00
Erik Johnston
3bf8bab8f9 Merge pull request #3673 from matrix-org/erikj/refactor_state_handler
Refactor state module to support multiple room versions
2018-08-22 10:04:55 +01:00
Richard van der Hoff
a4cf660a32 Merge pull request #3735 from matrix-org/travis/federation-spelling
limt -> limit
2018-08-22 09:34:21 +01:00
Richard van der Hoff
0d568ff403 Merge remote-tracking branch 'origin/release-v0.33.3' into develop 2018-08-22 09:15:44 +01:00
Travis Ralston
eb7be75a10 Create 3735.misc 2018-08-21 23:38:06 -06:00
Matthew Hodgson
bb81e78ec6 Split the state_group_cache in two (#3726)
Splits the state_group_cache in two.

One half contains normal state events; the other contains member events.

The idea is that the lazyloading common case of: "I want a subset of member events plus all of the other state" can be accomplished efficiently by splitting the cache into two, and asking for "all events" from the non-members cache, and "just these keys" from the members cache.  This means we can avoid having to make DictionaryCache aware of these sort of complicated queries, whilst letting LL requests benefit from the caching.

Previously we were unable to sensibly use the caching and had to pull all state from the DB irrespective of the filtering, which made things slow.  Hopefully fixes https://github.com/matrix-org/synapse/issues/3720.
2018-08-22 00:56:37 +02:00
Richard van der Hoff
638c0bf49b Merge branch 'rav/fix_gdpr_consent' into matrix-org-hotfixes 2018-08-21 22:54:35 +01:00
Richard van der Hoff
a52f276990 Merge tag 'v0.33.3rc2' into develop
Bugfixes
--------

- Fix bug in v0.33.3rc1 which caused infinite loops and OOMs
([\#3723](https://github.com/matrix-org/synapse/issues/3723))
2018-08-21 20:30:09 +01:00
hera
d1065e6f51 Merge tag 'v0.33.3rc2' into matrix-org-hotfixes
Bugfixes
--------

- Fix bug in v0.33.3rc1 which caused infinite loops and OOMs
([\#3723](https://github.com/matrix-org/synapse/issues/3723))
2018-08-21 19:12:14 +00:00
Erik Johnston
46c832eaac Merge pull request #3727 from matrix-org/erikj/dont_error_on_missing_keys
Don't log exceptions when failing to fetch server keys
2018-08-21 17:07:20 +01:00
Erik Johnston
2b1a4b2596 Merge pull request #3722 from matrix-org/erikj/bg_process_iteration
LaterGauge needs to call thread safe functions
2018-08-21 16:47:34 +01:00
Erik Johnston
cd6937fb26 Fix typo 2018-08-21 16:28:10 +01:00
Erik Johnston
c2c153dd3b Log more detail when we fail to authenticate request 2018-08-21 11:42:49 +01:00
Erik Johnston
79d3b4689e Newsfile 2018-08-21 11:21:48 +01:00
Erik Johnston
808d8e06aa Don't log exceptions when failing to fetch server keys
Not being able to resolve or connect to remote servers is an expected
error, so we shouldn't log at ERROR with stacktraces.
2018-08-21 11:19:26 +01:00
Erik Johnston
3f6762f0bb isort 2018-08-21 09:38:38 +01:00
Erik Johnston
e2c0aa2c26 Newsfile 2018-08-20 17:40:59 +01:00
Erik Johnston
b01a755498 Make the in flight requests metrics thread safe 2018-08-20 17:27:52 +01:00
Erik Johnston
1058d14127 Make the in flight background process metrics thread safe 2018-08-20 17:27:24 +01:00
Erik Johnston
4d664278af Merge branch 'develop' of github.com:matrix-org/synapse into erikj/refactor_state_handler 2018-08-20 14:49:43 +01:00
Erik Johnston
8dee601054 Remove redundant room_version checks 2018-08-20 14:48:53 +01:00
Erik Johnston
e21c368b8b Revert spurious change 2018-08-20 13:54:51 +01:00
Erik Johnston
567863127a Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-20 13:34:47 +01:00
Erik Johnston
f5abc10724 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-20 11:12:18 +01:00
Erik Johnston
ab822a2d1f Add some fixmes 2018-08-17 15:31:50 +01:00
Erik Johnston
91cdb6de08 Call UserDirectoryHandler methods directly
Turns out that the user directory handling is fairly racey as a bunch
of stuff assumes that the processing happens on master, which it doesn't
when there is a synapse.app.user_dir worker. So lets just call the
function directly until we actually get round to fixing it, since it
doesn't make the situation any worse.
2018-08-17 15:26:13 +01:00
Erik Johnston
782689bd40 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_profiles 2018-08-17 14:15:48 +01:00
Erik Johnston
ca87ad1def Split ProfileHandler into master and worker 2018-08-17 14:15:14 +01:00
Erik Johnston
38f708a2bb Remote profile cache should remain in master worker 2018-08-17 11:37:42 +01:00
Erik Johnston
bb795b56da Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-16 15:51:16 +01:00
Erik Johnston
4dd0604f61 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-15 15:37:05 +01:00
Erik Johnston
5075e444f4 Newsfile 2018-08-09 14:58:49 +01:00
Erik Johnston
3e19beb941 Fix tests 2018-08-09 14:58:49 +01:00
Erik Johnston
bb99b1f550 Add fast path in state res for zero prev events 2018-08-09 14:58:49 +01:00
Erik Johnston
ce6db0e547 Choose state algorithm based on room version 2018-08-09 14:58:47 +01:00
Erik Johnston
152c0aa58e Add constants for room versions 2018-08-09 14:55:47 +01:00
Erik Johnston
119451dcd1 Refactor state module
We split out the actual state resolution algorithm to prepare for having
multiple versions.
2018-08-09 14:55:47 +01:00
Erik Johnston
a6c813761a Docstrings 2018-08-09 10:41:08 +01:00
Erik Johnston
54a9bea88c Newsfile 2018-08-09 10:39:29 +01:00
Erik Johnston
484a0ebdfc Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_profiles 2018-08-09 10:16:29 +01:00
Richard van der Hoff
c05d278ba0 Merge branch 'rav/federation_metrics' into matrix-org-hotfixes 2018-08-07 19:11:29 +01:00
Erik Johnston
f81f421086 Update workers.rst with new paths 2018-08-07 10:51:35 +01:00
Erik Johnston
cd9765805e Allow ratelimiting on workers 2018-08-07 10:50:28 +01:00
Erik Johnston
495cb100d1 Allow profile changes to happen on workers 2018-08-07 10:50:26 +01:00
Erik Johnston
49a3163958 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-06 13:33:54 +01:00
Erik Johnston
1a568041fa Merge branch 'release-v0.33.1' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-08-02 15:28:32 +01:00
Travis Ralston
37be52ac34 limt -> limit 2018-07-31 16:29:09 -06:00
Erik Johnston
c9db8b0c32 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-24 17:22:23 +01:00
Erik Johnston
aa1bf10b91 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-24 15:49:38 +01:00
Erik Johnston
5222907bea Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-23 17:54:41 +01:00
Erik Johnston
e1eb147f2a Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-23 16:45:22 +01:00
hera
e43eb47c5f Fixup limiter 2018-07-23 15:22:47 +00:00
hera
27eb4c45cd Lower hacky timeout for member limiter 2018-07-23 15:16:36 +00:00
Erik Johnston
b136d7ff8f Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-23 16:09:40 +01:00
Richard van der Hoff
9e56e1ab30 Merge branch 'develop' into matrix-org-hotfixes 2018-07-19 16:40:28 +01:00
Erik Johnston
742f757337 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-19 10:26:13 +01:00
Richard van der Hoff
2f5dfe299c Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes 2018-07-17 15:26:47 +01:00
Erik Johnston
e4eec87c6a Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-17 11:18:39 +01:00
Erik Johnston
f793ff4571 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-17 10:04:33 +01:00
Richard van der Hoff
195aae2f16 Merge branch 'develop' into matrix-org-hotfixes 2018-07-12 12:09:25 +01:00
Erik Johnston
7c79f2cb72 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-12 09:59:58 +01:00
Richard van der Hoff
f04e35c170 Merge branch 'develop' into matrix-org-hotfixes 2018-07-10 18:04:03 +01:00
Matthew Hodgson
36bbac05bd Merge branch 'develop' of git+ssh://github.com/matrix-org/synapse into matrix-org-hotfixes 2018-07-06 19:21:09 +01:00
Erik Johnston
e2a4b7681e Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-07-05 10:29:32 +01:00
Erik Johnston
957944eee4 Merge pull request #3476 from matrix-org/erikj/timeout_memberships
Timeout membership requests after 90s
2018-07-03 10:18:39 +01:00
Erik Johnston
bf425e533e Fix PEP8 2018-07-03 10:11:09 +01:00
Erik Johnston
ca21957b8a Timeout membership requests after 90s
This is a hacky fix to try and stop in flight requests from building up
2018-07-02 13:56:08 +01:00
Erik Johnston
6a95270671 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-06-29 14:10:29 +01:00
hera
82781f5838 Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes 2018-06-28 21:09:28 +00:00
Matthew Hodgson
aae6d3ff69 Merge remote-tracking branch 'origin/revert-3451-hawkowl/sorteddict-api' into matrix-org-hotfixes 2018-06-26 18:36:29 +01:00
Matthew Hodgson
9175225adf Merge remote-tracking branch 'origin/hawkowl/sorteddict-api' into matrix-org-hotfixes 2018-06-26 17:52:37 +01:00
David Baker
7a32fa0101 Fix error on deleting users pending deactivation
Use simple_delete instead of simple_delete_one as commented
2018-06-26 11:57:44 +01:00
Erik Johnston
d46450195b Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-06-25 20:14:34 +01:00
Erik Johnston
c0128c1021 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-06-25 20:12:13 +01:00
Erik Johnston
3320b7c9a4 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-06-25 15:23:18 +01:00
Erik Johnston
4c22c9b0b6 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-06-25 14:37:13 +01:00
Richard van der Hoff
6d6ea1bb40 Merge branch 'develop' into matrix-org-hotfixes 2018-06-22 16:35:37 +01:00
aphrodite
9e38981ae4 Send HTTP pushes direct to http-priv rather than via clouldflare
(This is a heinous hack that ought to be made more generic and pushed back to develop)
2018-06-22 15:58:15 +01:00
hera
463e7c2709 Lower member limiter 2018-06-22 15:58:15 +01:00
Richard van der Hoff
ce9d0b1d0c Fix earlier logging patch
`@cached` doesn't work on decorated functions, because it uses inspection on
the target to calculate the number of arguments.
2018-06-22 15:58:15 +01:00
Richard van der Hoff
80786d5caf Logging for get_users_in_room 2018-06-22 15:58:15 +01:00
Richard van der Hoff
e18378c3e2 Increase member limiter to 20
Let's see if this makes the bridges go faster, or if it kills the synapse
master.
2018-06-22 15:58:15 +01:00
hera
0ca2857baa increase sync cache to 2 minutes
to give synchrotrons being hammered by repeating initial /syncs to get more
chance to actually complete and avoid a DoS
2018-06-22 15:58:15 +01:00
Erik Johnston
e21c312e16 Actuall set cache factors in workers 2018-06-22 15:58:15 +01:00
Richard van der Hoff
1031bd25f8 Avoid doing presence updates on replication reconnect
Presence is supposed to be disabled on matrix.org, so we shouldn't send a load
of USER_SYNC commands every time the synchrotron reconnects to the master.
2018-06-22 15:58:15 +01:00
hera
fae708c0e8 Disable auth on room_members for now
because the moznet bridge is broken (https://github.com/matrix-org/matrix-appservice-irc/issues/506)
2018-06-22 15:58:15 +01:00
Erik Johnston
8f8ea91eef Bump LAST_SEEN_GRANULARITY in client_ips 2018-06-22 15:58:15 +01:00
Erik Johnston
7a1406d144 Prefill client_ip_last_seen in replication 2018-06-22 15:58:15 +01:00
Erik Johnston
6373874833 Move event sending to end in shutdown room admin api 2018-06-22 15:58:15 +01:00
Erik Johnston
a79823e64b Add dummy presence REST handler to frontend proxy
The handler no-ops all requests as presence is disabled.
2018-06-22 15:58:15 +01:00
Erik Johnston
1766a5fdc0 Increase MAX_EVENTS_BEHIND for replication clients 2018-06-22 15:58:14 +01:00
Erik Johnston
e6b1ea3eb2 Disable presence in txn queue 2018-06-22 15:58:14 +01:00
Erik Johnston
e5537cf983 Limit concurrent AS joins 2018-06-22 15:58:14 +01:00
Erik Johnston
43bb12e640 Disable presence
This reverts commit 0ebd376a53 and
disables presence a bit more
2018-06-22 15:58:14 +01:00
Erik Johnston
66dcbf47a3 Disable auto search for prefixes in event search 2018-06-22 15:58:14 +01:00
Erik Johnston
a285fe05fd Add timeout to ResponseCache of /public_rooms 2018-06-22 15:58:14 +01:00
45 changed files with 892 additions and 407 deletions

1
changelog.d/3659.feature Normal file
View File

@@ -0,0 +1 @@
Support profile API endpoints on workers

1
changelog.d/3673.misc Normal file
View File

@@ -0,0 +1 @@
Refactor state module to support multiple room versions

1
changelog.d/3722.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues

1
changelog.d/3726.misc Normal file
View File

@@ -0,0 +1 @@
Split the state_group_cache into member and non-member state events (and so speed up LL /sync)

1
changelog.d/3727.misc Normal file
View File

@@ -0,0 +1 @@
Log failure to authenticate remote servers as warnings (without stack traces)

1
changelog.d/3735.misc Normal file
View File

@@ -0,0 +1 @@
Fix minor spelling error in federation client documentation.

View File

@@ -265,6 +265,7 @@ Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.

View File

@@ -97,9 +97,14 @@ class ThirdPartyEntityKind(object):
LOCATION = "location"
class RoomVersions(object):
V1 = "1"
VDH_TEST = "vdh-test-version"
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = "1"
DEFAULT_ROOM_VERSION = RoomVersions.V1
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"}
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}

View File

@@ -45,6 +45,11 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
RoomMembershipRestServlet,
@@ -53,6 +58,7 @@ from synapse.rest.client.v1.room import (
)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
@@ -62,6 +68,9 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
DirectoryStore,
SlavedTransactionStore,
SlavedProfileStore,
@@ -101,6 +110,9 @@ class EventCreatorServer(HomeServer):
RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource)
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,

View File

@@ -111,7 +111,7 @@ def stop(pidfile, app):
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor"
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
])
@@ -218,6 +218,10 @@ def main():
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
worker_cache_factors = (
worker_config.get("synctl_cache_factors")
or cache_factors
)
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -233,8 +237,10 @@ def main():
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
worker_cache_factors,
))
action = options.action
@@ -269,15 +275,19 @@ def main():
start(configfile)
for worker in workers:
env = os.environ.copy()
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
for cache_name, factor in worker.cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
start_worker(worker.app, configfile, worker.configfile)
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
else:
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
# Reset env back to the original
os.environ.clear()
os.environ.update(env)
if __name__ == "__main__":

View File

@@ -18,7 +18,9 @@ import logging
from canonicaljson import json
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
from twisted.internet.protocol import Factory
from twisted.names.error import DomainError
from twisted.web.http import HTTPClient
from synapse.http.endpoint import matrix_federation_endpoint
@@ -47,12 +49,14 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
server_response, server_certificate = yield protocol.remote_key
defer.returnValue((server_response, server_certificate))
except SynapseKeyClientError as e:
logger.exception("Error getting key for %r" % (server_name,))
logger.warn("Error getting key for %r: %s", server_name, e)
if e.status.startswith("4"):
# Don't retry for 4xx responses.
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
except Exception as e:
logger.exception(e)
logger.exception("Error getting key for %r", server_name)
raise IOError("Cannot get key for %r" % server_name)

View File

@@ -106,7 +106,7 @@ class TransportLayerClient(object):
dest (str)
room_id (str)
event_tuples (list)
limt (int)
limit (int)
Returns:
Deferred: Results in a dict received from the remote homeserver.

View File

@@ -261,10 +261,10 @@ class BaseFederationServlet(object):
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.exception("authenticate_request failed")
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception:
logger.exception("authenticate_request failed")
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
if origin:

View File

@@ -291,8 +291,9 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
room_version = yield self.store.get_room_version(pdu.room_id)
state_map = yield resolve_events_with_factory(
state_groups, {pdu.event_id: pdu}, fetch
room_version, state_groups, {pdu.event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
@@ -1828,7 +1829,10 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
room_version = yield self.store.get_room_version(event.room_id)
new_state = self.state_handler.resolve_events(
room_version,
[list(local_view.values()), list(remote_view.values())],
event
)

View File

@@ -203,7 +203,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break

View File

@@ -32,12 +32,16 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
class ProfileHandler(BaseHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
class BaseProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.
BaseProfileHandler can be instantiated directly on workers and will
delegate to master when necessary. The master process should use the
subclass MasterProfileHandler
"""
def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)
super(BaseProfileHandler, self).__init__(hs)
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -46,11 +50,6 @@ class ProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
if hs.config.worker_app is None:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
@@ -282,6 +281,20 @@ class ProfileHandler(BaseHandler):
room_id, str(e.message)
)
class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(MasterProfileHandler, self).__init__(hs)
assert hs.config.worker_app is None
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache,

View File

@@ -44,9 +44,12 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs, "room_list")
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
self.response_cache = ResponseCache(
hs, "room_list", timeout_ms=10 * 60 * 1000,
)
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000,
)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,

View File

@@ -66,6 +66,7 @@ class RoomMemberHandler(object):
self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -241,18 +242,37 @@ class RoomMemberHandler(object):
):
key = (room_id,)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
as_id = object()
if requester.app_service:
as_id = requester.app_service.id
then = self.clock.time_msec()
with (yield self.member_limiter.queue(as_id)):
diff = self.clock.time_msec() - then
if diff > 80 * 1000:
# haproxy would have timed the request out anyway...
raise SynapseError(504, "took to long to process")
with (yield self.member_linearizer.queue(key)):
diff = self.clock.time_msec() - then
if diff > 80 * 1000:
# haproxy would have timed the request out anyway...
raise SynapseError(504, "took to long to process")
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
defer.returnValue(result)
@@ -344,6 +364,7 @@ class RoomMemberHandler(object):
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)

View File

@@ -35,6 +35,8 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -191,7 +193,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.response_cache = ResponseCache(
hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS,
)
self.state = hs.get_state_handler()
self.auth = hs.get_auth()

View File

@@ -119,6 +119,8 @@ class UserDirectoryHandler(object):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None,
)
@@ -127,6 +129,8 @@ class UserDirectoryHandler(object):
def handle_user_deactivated(self, user_id):
"""Called when a user ID is deactivated
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id)

View File

@@ -15,6 +15,7 @@
# limitations under the License.
import logging
import threading
from prometheus_client.core import Counter, Histogram
@@ -111,6 +112,9 @@ in_flight_requests_db_sched_duration = Counter(
# The set of all in flight requests, set[RequestMetrics]
_in_flight_requests = set()
# Protects the _in_flight_requests set from concurrent accesss
_in_flight_requests_lock = threading.Lock()
def _get_in_flight_counts():
"""Returns a count of all in flight requests by (method, server_name)
@@ -120,7 +124,8 @@ def _get_in_flight_counts():
"""
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
reqs = list(_in_flight_requests)
with _in_flight_requests_lock:
reqs = list(_in_flight_requests)
for rm in reqs:
rm.update_metrics()
@@ -154,10 +159,12 @@ class RequestMetrics(object):
# to the "in flight" metrics.
self._request_stats = self.start_context.get_resource_usage()
_in_flight_requests.add(self)
with _in_flight_requests_lock:
_in_flight_requests.add(self)
def stop(self, time_sec, request):
_in_flight_requests.discard(self)
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
context = LoggingContext.current_context()

View File

@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import six
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
@@ -78,6 +80,9 @@ _background_process_counts = dict() # type: dict[str, int]
# of process descriptions that no longer have any active processes.
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
# A lock that covers the above dicts
_bg_metrics_lock = threading.Lock()
class _Collector(object):
"""A custom metrics collector for the background process metrics.
@@ -92,7 +97,11 @@ class _Collector(object):
labels=["name"],
)
for desc, processes in six.iteritems(_background_processes):
# We copy the dict so that it doesn't change from underneath us
with _bg_metrics_lock:
_background_processes_copy = dict(_background_processes)
for desc, processes in six.iteritems(_background_processes_copy):
background_process_in_flight_count.add_metric(
(desc,), len(processes),
)
@@ -167,19 +176,26 @@ def run_as_background_process(desc, func, *args, **kwargs):
"""
@defer.inlineCallbacks
def run():
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
_background_process_start_count.labels(desc).inc()
with LoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
proc = _BackgroundProcess(desc, context)
_background_processes.setdefault(desc, set()).add(proc)
with _bg_metrics_lock:
_background_processes.setdefault(desc, set()).add(proc)
try:
yield func(*args, **kwargs)
finally:
proc.update_metrics()
_background_processes[desc].remove(proc)
with _bg_metrics_lock:
_background_processes[desc].remove(proc)
with PreserveLoggingContext():
return run()

View File

@@ -331,7 +331,12 @@ class HttpPusher(object):
if not notification_dict:
defer.returnValue([])
try:
resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
url = self.url.replace(
"https://matrix.org/_matrix/push/v1/notify",
"http://http-priv.matrix.org/_matrix/push/v1/notify",
)
resp = yield self.http_client.post_json_get_json(url, notification_dict)
except Exception:
logger.warn(
"Failed to push event %s to %s",

View File

@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

View File

@@ -32,7 +32,7 @@ from twisted.internet import defer
logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", (

View File

@@ -454,17 +454,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -502,6 +491,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(

View File

@@ -56,7 +56,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
@@ -308,7 +308,10 @@ class HomeServer(object):
return InitialSyncHandler(self)
def build_profile_handler(self):
return ProfileHandler(self)
if self.config.worker_app:
return BaseProfileHandler(self)
else:
return MasterProfileHandler(self)
def build_event_creation_handler(self):
return EventCreationHandler(self)

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,21 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import logging
from collections import namedtuple
from six import iteritems, iterkeys, itervalues
from six import iteritems, itervalues
from frozendict import frozendict
from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.constants import EventTypes, RoomVersions
from synapse.events.snapshot import EventContext
from synapse.state import v1
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
@@ -264,6 +262,7 @@ class StateHandler(object):
defer.returnValue(context)
logger.debug("calling resolve_state_groups from compute_event_context")
entry = yield self.resolve_state_groups_for_events(
event.room_id, [e for e, _ in event.prev_events],
)
@@ -338,8 +337,11 @@ class StateHandler(object):
event, resolves conflicts between them and returns them.
Args:
room_id (str):
event_ids (list[str]):
room_id (str)
event_ids (list[str])
explicit_room_version (str|None): If set uses the the given room
version to choose the resolution algorithm. If None, then
checks the database for room version.
Returns:
Deferred[_StateCacheEntry]: resolved state
@@ -353,7 +355,12 @@ class StateHandler(object):
room_id, event_ids
)
if len(state_groups_ids) == 1:
if len(state_groups_ids) == 0:
defer.returnValue(_StateCacheEntry(
state={},
state_group=None,
))
elif len(state_groups_ids) == 1:
name, state_list = list(state_groups_ids.items()).pop()
prev_group, delta_ids = yield self.store.get_state_group_delta(name)
@@ -365,8 +372,11 @@ class StateHandler(object):
delta_ids=delta_ids,
))
room_version = yield self.store.get_room_version(room_id)
result = yield self._state_resolution_handler.resolve_state_groups(
room_id, state_groups_ids, None, self._state_map_factory,
room_id, room_version, state_groups_ids, None,
self._state_map_factory,
)
defer.returnValue(result)
@@ -375,7 +385,7 @@ class StateHandler(object):
ev_ids, get_prev_content=False, check_redacted=False,
)
def resolve_events(self, state_sets, event):
def resolve_events(self, room_version, state_sets, event):
logger.info(
"Resolving state for %s with %d groups", event.room_id, len(state_sets)
)
@@ -391,7 +401,9 @@ class StateHandler(object):
}
with Measure(self.clock, "state._resolve_events"):
new_state = resolve_events_with_state_map(state_set_ids, state_map)
new_state = resolve_events_with_state_map(
room_version, state_set_ids, state_map,
)
new_state = {
key: state_map[ev_id] for key, ev_id in iteritems(new_state)
@@ -430,7 +442,7 @@ class StateResolutionHandler(object):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
self, room_id, state_groups_ids, event_map, state_map_factory,
self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
):
"""Resolves conflicts between a set of state groups
@@ -439,6 +451,7 @@ class StateResolutionHandler(object):
Args:
room_id (str): room we are resolving for (used for logging)
room_version (str): version of the room
state_groups_ids (dict[int, dict[(str, str), str]]):
map from state group id to the state in that state group
(where 'state' is a map from state key to event id)
@@ -492,6 +505,7 @@ class StateResolutionHandler(object):
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
state_map_factory=state_map_factory,
@@ -575,16 +589,10 @@ def _make_state_cache_entry(
)
def _ordered_events(events):
def key_func(e):
return -int(e.depth), hashlib.sha1(e.event_id.encode('ascii')).hexdigest()
return sorted(events, key=key_func)
def resolve_events_with_state_map(state_sets, state_map):
def resolve_events_with_state_map(room_version, state_sets, state_map):
"""
Args:
room_version(str): Version of the room
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
state_map(dict): a dict from event_id to event, for all events in
@@ -594,75 +602,23 @@ def resolve_events_with_state_map(state_sets, state_map):
dict[(str, str), str]:
a map from (type, state_key) to event_id.
"""
if len(state_sets) == 1:
return state_sets[0]
unconflicted_state, conflicted_state = _seperate(
state_sets,
)
auth_events = _create_auth_events_from_maps(
unconflicted_state, conflicted_state, state_map
)
return _resolve_with_state(
unconflicted_state, conflicted_state, auth_events, state_map
)
if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
return v1.resolve_events_with_state_map(
state_sets, state_map,
)
else:
# This should only happen if we added a version but forgot to add it to
# the list above.
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)
def _seperate(state_sets):
"""Takes the state_sets and figures out which keys are conflicted and
which aren't. i.e., which have multiple different event_ids associated
with them in different state sets.
Args:
state_sets(iterable[dict[(str, str), str]]):
List of dicts of (type, state_key) -> event_id, which are the
different state groups to resolve.
Returns:
(dict[(str, str), str], dict[(str, str), set[str]]):
A tuple of (unconflicted_state, conflicted_state), where:
unconflicted_state is a dict mapping (type, state_key)->event_id
for unconflicted state keys.
conflicted_state is a dict mapping (type, state_key) to a set of
event ids for conflicted state keys.
"""
state_set_iterator = iter(state_sets)
unconflicted_state = dict(next(state_set_iterator))
conflicted_state = {}
for state_set in state_set_iterator:
for key, value in iteritems(state_set):
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
if unconflicted_value is None:
# There isn't an unconflicted entry so check if there is a
# conflicted entry.
ls = conflicted_state.get(key)
if ls is None:
# There wasn't a conflicted entry so haven't seen this key before.
# Therefore it isn't conflicted yet.
unconflicted_state[key] = value
else:
# This key is already conflicted, add our value to the conflict set.
ls.add(value)
elif unconflicted_value != value:
# If the unconflicted value is not the same as our value then we
# have a new conflict. So move the key from the unconflicted_state
# to the conflicted state.
conflicted_state[key] = {value, unconflicted_value}
unconflicted_state.pop(key, None)
return unconflicted_state, conflicted_state
@defer.inlineCallbacks
def resolve_events_with_factory(state_sets, event_map, state_map_factory):
def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
"""
Args:
room_version(str): Version of the room
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
@@ -682,185 +638,13 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
if len(state_sets) == 1:
defer.returnValue(state_sets[0])
unconflicted_state, conflicted_state = _seperate(
state_sets,
)
needed_events = set(
event_id
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
state_map = yield state_map_factory(needed_events)
if event_map is not None:
state_map.update(event_map)
# get the ids of the auth events which allow us to authenticate the
# conflicted state, picking only from the unconflicting state.
#
# dict[(str, str), str]: a map from state key to event id
auth_events = _create_auth_events_from_maps(
unconflicted_state, conflicted_state, state_map
)
new_needed_events = set(itervalues(auth_events))
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)
defer.returnValue(_resolve_with_state(
unconflicted_state, conflicted_state, auth_events, state_map
))
def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map):
auth_events = {}
for event_ids in itervalues(conflicted_state):
for event_id in event_ids:
if event_id in state_map:
keys = event_auth.auth_types_for_event(state_map[event_id])
for key in keys:
if key not in auth_events:
event_id = unconflicted_state.get(key, None)
if event_id:
auth_events[key] = event_id
return auth_events
def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids,
state_map):
conflicted_state = {}
for key, event_ids in iteritems(conflicted_state_ids):
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
if len(events) > 1:
conflicted_state[key] = events
elif len(events) == 1:
unconflicted_state_ids[key] = events[0].event_id
auth_events = {
key: state_map[ev_id]
for key, ev_id in iteritems(auth_event_ids)
if ev_id in state_map
}
try:
resolved_state = _resolve_state_events(
conflicted_state, auth_events
if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
return v1.resolve_events_with_factory(
state_sets, event_map, state_map_factory,
)
else:
# This should only happen if we added a version but forgot to add it to
# the list above.
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)
except Exception:
logger.exception("Failed to resolve state")
raise
new_state = unconflicted_state_ids
for key, event in iteritems(resolved_state):
new_state[key] = event.event_id
return new_state
def _resolve_state_events(conflicted_state, auth_events):
""" This is where we actually decide which of the conflicted state to
use.
We resolve conflicts in the following order:
1. power levels
2. join rules
3. memberships
4. other events.
"""
resolved_state = {}
if POWER_KEY in conflicted_state:
events = conflicted_state[POWER_KEY]
logger.debug("Resolving conflicted power levels %r", events)
resolved_state[POWER_KEY] = _resolve_auth_events(
events, auth_events)
auth_events.update(resolved_state)
for key, events in iteritems(conflicted_state):
if key[0] == EventTypes.JoinRules:
logger.debug("Resolving conflicted join rules %r", events)
resolved_state[key] = _resolve_auth_events(
events,
auth_events
)
auth_events.update(resolved_state)
for key, events in iteritems(conflicted_state):
if key[0] == EventTypes.Member:
logger.debug("Resolving conflicted member lists %r", events)
resolved_state[key] = _resolve_auth_events(
events,
auth_events
)
auth_events.update(resolved_state)
for key, events in iteritems(conflicted_state):
if key not in resolved_state:
logger.debug("Resolving conflicted state %r:%r", key, events)
resolved_state[key] = _resolve_normal_events(
events, auth_events
)
return resolved_state
def _resolve_auth_events(events, auth_events):
reverse = [i for i in reversed(_ordered_events(events))]
auth_keys = set(
key
for event in events
for key in event_auth.auth_types_for_event(event)
)
new_auth_events = {}
for key in auth_keys:
auth_event = auth_events.get(key, None)
if auth_event:
new_auth_events[key] = auth_event
auth_events = new_auth_events
prev_event = reverse[0]
for event in reverse[1:]:
auth_events[(prev_event.type, prev_event.state_key)] = prev_event
try:
# The signatures have already been checked at this point
event_auth.check(event, auth_events, do_sig_check=False, do_size_check=False)
prev_event = event
except AuthError:
return prev_event
return event
def _resolve_normal_events(events, auth_events):
for event in _ordered_events(events):
try:
# The signatures have already been checked at this point
event_auth.check(event, auth_events, do_sig_check=False, do_size_check=False)
return event
except AuthError:
pass
# Use the last event (the one with the least depth) if they all fail
# the auth check.
return event

321
synapse/state/v1.py Normal file
View File

@@ -0,0 +1,321 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import logging
from six import iteritems, iterkeys, itervalues
from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
logger = logging.getLogger(__name__)
POWER_KEY = (EventTypes.PowerLevels, "")
def resolve_events_with_state_map(state_sets, state_map):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
state_map(dict): a dict from event_id to event, for all events in
state_sets.
Returns
dict[(str, str), str]:
a map from (type, state_key) to event_id.
"""
if len(state_sets) == 1:
return state_sets[0]
unconflicted_state, conflicted_state = _seperate(
state_sets,
)
auth_events = _create_auth_events_from_maps(
unconflicted_state, conflicted_state, state_map
)
return _resolve_with_state(
unconflicted_state, conflicted_state, auth_events, state_map
)
@defer.inlineCallbacks
def resolve_events_with_factory(state_sets, event_map, state_map_factory):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map(dict[str,FrozenEvent]|None):
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_map_factory.
state_map_factory(func): will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.
Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
if len(state_sets) == 1:
defer.returnValue(state_sets[0])
unconflicted_state, conflicted_state = _seperate(
state_sets,
)
needed_events = set(
event_id
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
state_map = yield state_map_factory(needed_events)
if event_map is not None:
state_map.update(event_map)
# get the ids of the auth events which allow us to authenticate the
# conflicted state, picking only from the unconflicting state.
#
# dict[(str, str), str]: a map from state key to event id
auth_events = _create_auth_events_from_maps(
unconflicted_state, conflicted_state, state_map
)
new_needed_events = set(itervalues(auth_events))
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)
defer.returnValue(_resolve_with_state(
unconflicted_state, conflicted_state, auth_events, state_map
))
def _seperate(state_sets):
"""Takes the state_sets and figures out which keys are conflicted and
which aren't. i.e., which have multiple different event_ids associated
with them in different state sets.
Args:
state_sets(iterable[dict[(str, str), str]]):
List of dicts of (type, state_key) -> event_id, which are the
different state groups to resolve.
Returns:
(dict[(str, str), str], dict[(str, str), set[str]]):
A tuple of (unconflicted_state, conflicted_state), where:
unconflicted_state is a dict mapping (type, state_key)->event_id
for unconflicted state keys.
conflicted_state is a dict mapping (type, state_key) to a set of
event ids for conflicted state keys.
"""
state_set_iterator = iter(state_sets)
unconflicted_state = dict(next(state_set_iterator))
conflicted_state = {}
for state_set in state_set_iterator:
for key, value in iteritems(state_set):
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
if unconflicted_value is None:
# There isn't an unconflicted entry so check if there is a
# conflicted entry.
ls = conflicted_state.get(key)
if ls is None:
# There wasn't a conflicted entry so haven't seen this key before.
# Therefore it isn't conflicted yet.
unconflicted_state[key] = value
else:
# This key is already conflicted, add our value to the conflict set.
ls.add(value)
elif unconflicted_value != value:
# If the unconflicted value is not the same as our value then we
# have a new conflict. So move the key from the unconflicted_state
# to the conflicted state.
conflicted_state[key] = {value, unconflicted_value}
unconflicted_state.pop(key, None)
return unconflicted_state, conflicted_state
def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map):
auth_events = {}
for event_ids in itervalues(conflicted_state):
for event_id in event_ids:
if event_id in state_map:
keys = event_auth.auth_types_for_event(state_map[event_id])
for key in keys:
if key not in auth_events:
event_id = unconflicted_state.get(key, None)
if event_id:
auth_events[key] = event_id
return auth_events
def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids,
state_map):
conflicted_state = {}
for key, event_ids in iteritems(conflicted_state_ids):
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
if len(events) > 1:
conflicted_state[key] = events
elif len(events) == 1:
unconflicted_state_ids[key] = events[0].event_id
auth_events = {
key: state_map[ev_id]
for key, ev_id in iteritems(auth_event_ids)
if ev_id in state_map
}
try:
resolved_state = _resolve_state_events(
conflicted_state, auth_events
)
except Exception:
logger.exception("Failed to resolve state")
raise
new_state = unconflicted_state_ids
for key, event in iteritems(resolved_state):
new_state[key] = event.event_id
return new_state
def _resolve_state_events(conflicted_state, auth_events):
""" This is where we actually decide which of the conflicted state to
use.
We resolve conflicts in the following order:
1. power levels
2. join rules
3. memberships
4. other events.
"""
resolved_state = {}
if POWER_KEY in conflicted_state:
events = conflicted_state[POWER_KEY]
logger.debug("Resolving conflicted power levels %r", events)
resolved_state[POWER_KEY] = _resolve_auth_events(
events, auth_events)
auth_events.update(resolved_state)
for key, events in iteritems(conflicted_state):
if key[0] == EventTypes.JoinRules:
logger.debug("Resolving conflicted join rules %r", events)
resolved_state[key] = _resolve_auth_events(
events,
auth_events
)
auth_events.update(resolved_state)
for key, events in iteritems(conflicted_state):
if key[0] == EventTypes.Member:
logger.debug("Resolving conflicted member lists %r", events)
resolved_state[key] = _resolve_auth_events(
events,
auth_events
)
auth_events.update(resolved_state)
for key, events in iteritems(conflicted_state):
if key not in resolved_state:
logger.debug("Resolving conflicted state %r:%r", key, events)
resolved_state[key] = _resolve_normal_events(
events, auth_events
)
return resolved_state
def _resolve_auth_events(events, auth_events):
reverse = [i for i in reversed(_ordered_events(events))]
auth_keys = set(
key
for event in events
for key in event_auth.auth_types_for_event(event)
)
new_auth_events = {}
for key in auth_keys:
auth_event = auth_events.get(key, None)
if auth_event:
new_auth_events[key] = auth_event
auth_events = new_auth_events
prev_event = reverse[0]
for event in reverse[1:]:
auth_events[(prev_event.type, prev_event.state_key)] = prev_event
try:
# The signatures have already been checked at this point
event_auth.check(event, auth_events, do_sig_check=False, do_size_check=False)
prev_event = event
except AuthError:
return prev_event
return event
def _resolve_normal_events(events, auth_events):
for event in _ordered_events(events):
try:
# The signatures have already been checked at this point
event_auth.check(event, auth_events, do_sig_check=False, do_size_check=False)
return event
except AuthError:
pass
# Use the last event (the one with the least depth) if they all fail
# the auth check.
return event
def _ordered_events(events):
def key_func(e):
return -int(e.depth), hashlib.sha1(e.event_id.encode('ascii')).hexdigest()
return sorted(events, key=key_func)

View File

@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):

View File

@@ -705,9 +705,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
}
events_map = {ev.event_id: ev for ev, _ in events_context}
room_version = yield self.get_room_version(room_id)
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
room_id, state_groups, events_map, get_events
room_id, room_version, state_groups, events_map, get_events
)
defer.returnValue((res.state, None))

View File

@@ -71,8 +71,6 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_from_remote_profile_cache",
)
class ProfileStore(ProfileWorkerStore):
def create_profile(self, user_localpart):
return self._simple_insert(
table="profiles",
@@ -96,6 +94,8 @@ class ProfileStore(ProfileWorkerStore):
desc="set_profile_avatar_url",
)
class ProfileStore(ProfileWorkerStore):
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles.

View File

@@ -186,6 +186,35 @@ class RoomWorkerStore(SQLBaseStore):
desc="is_room_blocked",
)
@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user
Args:
user_id (str)
Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
row = yield self._simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
allow_none=True,
desc="get_ratelimit_for_user",
)
if row:
defer.returnValue(RatelimitOverride(
messages_per_second=row["messages_per_second"],
burst_count=row["burst_count"],
))
else:
defer.returnValue(None)
class RoomStore(RoomWorkerStore, SearchStore):
@@ -469,35 +498,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
"get_all_new_public_rooms", get_all_new_public_rooms
)
@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user
Args:
user_id (str)
Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
row = yield self._simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
allow_none=True,
desc="get_ratelimit_for_user",
)
if row:
defer.returnValue(RatelimitOverride(
messages_per_second=row["messages_per_second"],
burst_count=row["burst_count"],
))
else:
defer.returnValue(None)
@defer.inlineCallbacks
def block_room(self, room_id, user_id):
yield self._simple_insert(

View File

@@ -66,7 +66,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
@cached(max_entries=100000, iterable=True)
@cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -80,7 +80,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN,))
return [to_ascii(r[0]) for r in txn]
return self.runInteraction("get_users_in_room", f)
start_time = self._clock.time_msec()
result = yield self.runInteraction("get_users_in_room", f)
end_time = self._clock.time_msec()
logger.info(
"Fetched room membership for %s (%i users) in %i ms",
room_id, len(result), end_time - start_time,
)
defer.returnValue(result)
@cached()
def get_invited_rooms_for_user(self, user_id):

View File

@@ -724,7 +724,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
return " & ".join(result + ":*" for result in results)
return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:

View File

@@ -60,8 +60,43 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def __init__(self, db_conn, hs):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
# Originally the state store used a single DictionaryCache to cache the
# event IDs for the state types in a given state group to avoid hammering
# on the state_group* tables.
#
# The point of using a DictionaryCache is that it can cache a subset
# of the state events for a given state group (i.e. a subset of the keys for a
# given dict which is an entry in the cache for a given state group ID).
#
# However, this poses problems when performing complicated queries
# on the store - for instance: "give me all the state for this group, but
# limit members to this subset of users", as DictionaryCache's API isn't
# rich enough to say "please cache any of these fields, apart from this subset".
# This is problematic when lazy loading members, which requires this behaviour,
# as without it the cache has no choice but to speculatively load all
# state events for the group, which negates the efficiency being sought.
#
# Rather than overcomplicating DictionaryCache's API, we instead split the
# state_group_cache into two halves - one for tracking non-member events,
# and the other for tracking member_events. This means that lazy loading
# queries can be made in a cache-friendly manner by querying both caches
# separately and then merging the result. So for the example above, you
# would query the members cache for a specific subset of state keys
# (which DictionaryCache will handle efficiently and fine) and the non-members
# cache for all state (which DictionaryCache will similarly handle fine)
# and then just merge the results together.
#
# We size the non-members cache to be smaller than the members cache as the
# vast majority of state in Matrix (today) is member events.
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
"*stateGroupCache*",
# TODO: this hasn't been tuned yet
50000 * get_cache_factor_for("stateGroupCache")
)
self._state_group_members_cache = DictionaryCache(
"*stateGroupMembersCache*",
500000 * get_cache_factor_for("stateGroupMembersCache")
)
@defer.inlineCallbacks
@@ -275,7 +310,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
})
@defer.inlineCallbacks
def _get_state_groups_from_groups(self, groups, types):
def _get_state_groups_from_groups(self, groups, types, members=None):
"""Returns the state groups for a given set of groups, filtering on
types of state events.
@@ -284,6 +319,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
types (Iterable[str, str|None]|None): list of 2-tuples of the form
(`type`, `state_key`), where a `state_key` of `None` matches all
state_keys for the `type`. If None, all types are returned.
members (bool|None): If not None, then, in addition to any filtering
implied by types, the results are also filtered to only include
member events (if True), or to exclude member events (if False)
Returns:
dictionary state_group -> (dict of (type, state_key) -> event id)
@@ -294,14 +332,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
for chunk in chunks:
res = yield self.runInteraction(
"_get_state_groups_from_groups",
self._get_state_groups_from_groups_txn, chunk, types,
self._get_state_groups_from_groups_txn, chunk, types, members,
)
results.update(res)
defer.returnValue(results)
def _get_state_groups_from_groups_txn(
self, txn, groups, types=None,
self, txn, groups, types=None, members=None,
):
results = {group: {} for group in groups}
@@ -339,6 +377,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
%s
""")
if members is True:
sql += " AND type = '%s'" % (EventTypes.Member,)
elif members is False:
sql += " AND type <> '%s'" % (EventTypes.Member,)
# Turns out that postgres doesn't like doing a list of OR's and
# is about 1000x slower, so we just issue a query for each specific
# type seperately.
@@ -386,6 +429,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
else:
where_clause = ""
if members is True:
where_clause += " AND type = '%s'" % EventTypes.Member
elif members is False:
where_clause += " AND type <> '%s'" % EventTypes.Member
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
for group in groups:
@@ -580,10 +628,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
def _get_some_state_from_cache(self, group, types, filtered_types=None):
def _get_some_state_from_cache(self, cache, group, types, filtered_types=None):
"""Checks if group is in cache. See `_get_state_for_groups`
Args:
cache(DictionaryCache): the state group cache to use
group(int): The state group to lookup
types(list[str, str|None]): List of 2-tuples of the form
(`type`, `state_key`), where a `state_key` of `None` matches all
@@ -597,11 +646,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
requests state from the cache, if False we need to query the DB for the
missing state.
"""
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
is_all, known_absent, state_dict_ids = cache.get(group)
type_to_key = {}
# tracks whether any of ourrequested types are missing from the cache
# tracks whether any of our requested types are missing from the cache
missing_types = False
for typ, state_key in types:
@@ -648,7 +697,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
if include(k[0], k[1])
}, got_all
def _get_all_state_from_cache(self, group):
def _get_all_state_from_cache(self, cache, group):
"""Checks if group is in cache. See `_get_state_for_groups`
Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool
@@ -656,9 +705,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
cache, if False we need to query the DB for the missing state.
Args:
cache(DictionaryCache): the state group cache to use
group: The state group to lookup
"""
is_all, _, state_dict_ids = self._state_group_cache.get(group)
is_all, _, state_dict_ids = cache.get(group)
return state_dict_ids, is_all
@@ -681,6 +731,62 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
if filtered_types is not None and EventTypes.Member not in filtered_types:
# we want all of the membership events
member_types = None
else:
member_types = [t for t in types if t[0] == EventTypes.Member]
else:
non_member_types = None
member_types = None
non_member_state = yield self._get_state_for_groups_using_cache(
groups, self._state_group_cache, non_member_types, filtered_types,
)
# XXX: we could skip this entirely if member_types is []
member_state = yield self._get_state_for_groups_using_cache(
# we set filtered_types=None as member_state only ever contain members.
groups, self._state_group_members_cache, member_types, None,
)
state = non_member_state
for group in groups:
state[group].update(member_state[group])
defer.returnValue(state)
@defer.inlineCallbacks
def _get_state_for_groups_using_cache(
self, groups, cache, types=None, filtered_types=None
):
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key, querying from a specific cache.
Args:
groups (iterable[int]): list of state groups for which we want
to get the state.
cache (DictionaryCache): the cache of group ids to state dicts which
we will pass through - either the normal state cache or the specific
members state cache.
types (None|iterable[(str, None|str)]):
indicates the state type/keys required. If None, the whole
state is fetched and returned.
Otherwise, each entry should be a `(type, state_key)` tuple to
include in the response. A `state_key` of None is a wildcard
meaning that we require all state with that type.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
@@ -692,7 +798,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
if types is not None:
for group in set(groups):
state_dict_ids, got_all = self._get_some_state_from_cache(
group, types, filtered_types
cache, group, types, filtered_types
)
results[group] = state_dict_ids
@@ -701,7 +807,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
else:
for group in set(groups):
state_dict_ids, got_all = self._get_all_state_from_cache(
group
cache, group
)
results[group] = state_dict_ids
@@ -710,8 +816,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
missing_groups.append(group)
if missing_groups:
# Okay, so we have some missing_types, lets fetch them.
cache_seq_num = self._state_group_cache.sequence
# Okay, so we have some missing_types, let's fetch them.
cache_seq_num = cache.sequence
# the DictionaryCache knows if it has *all* the state, but
# does not know if it has all of the keys of a particular type,
@@ -725,7 +831,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
types_to_fetch = types
group_to_state_dict = yield self._get_state_groups_from_groups(
missing_groups, types_to_fetch
missing_groups, types_to_fetch, cache == self._state_group_members_cache,
)
for group, group_state_dict in iteritems(group_to_state_dict):
@@ -745,7 +851,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# update the cache with all the things we fetched from the
# database.
self._state_group_cache.update(
cache.update(
cache_seq_num,
key=group,
value=group_state_dict,
@@ -847,15 +953,33 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
],
)
# Prefill the state group cache with this group.
# Prefill the state group caches with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
current_member_state_ids = {
s: ev
for (s, ev) in iteritems(current_state_ids)
if s[0] == EventTypes.Member
}
txn.call_after(
self._state_group_members_cache.update,
self._state_group_members_cache.sequence,
key=state_group,
value=dict(current_member_state_ids),
)
current_non_member_state_ids = {
s: ev
for (s, ev) in iteritems(current_state_ids)
if s[0] != EventTypes.Member
}
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=state_group,
value=dict(current_state_ids),
value=dict(current_non_member_state_ids),
)
return state_group

View File

@@ -20,7 +20,7 @@ from twisted.internet import defer
import synapse.types
from synapse.api.errors import AuthError
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.profile import MasterProfileHandler
from synapse.types import UserID
from tests import unittest
@@ -29,7 +29,7 @@ from tests.utils import setup_test_homeserver
class ProfileHandlers(object):
def __init__(self, hs):
self.profile_handler = ProfileHandler(hs)
self.profile_handler = MasterProfileHandler(hs)
class ProfileTestCase(unittest.TestCase):

View File

@@ -112,6 +112,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
@defer.inlineCallbacks
def test_invites(self):
yield self.persist(type="m.room.create", key="", creator=USER_ID)
yield self.check("get_invited_rooms_for_user", [USER_ID_2], [])
event = yield self.persist(
type="m.room.member", key=USER_ID_2, membership="invite"
@@ -133,7 +134,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
@defer.inlineCallbacks
def test_push_actions_for_user(self):
yield self.persist(type="m.room.create", creator=USER_ID)
yield self.persist(type="m.room.create", key="", creator=USER_ID)
yield self.persist(type="m.room.join", key=USER_ID, membership="join")
yield self.persist(
type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"

View File

@@ -807,11 +807,13 @@ class RoomInitialSyncTestCase(RoomBase):
self.assertTrue("presence" in channel.json_body)
presence_by_user = {
e["content"]["user_id"]: e for e in channel.json_body["presence"]
}
self.assertTrue(self.user_id in presence_by_user)
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
# presence is turned off on hotfixes
# presence_by_user = {
# e["content"]["user_id"]: e for e in channel.json_body["presence"]
# }
# self.assertTrue(self.user_id in presence_by_user)
# self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
class RoomMessageListTestCase(RoomBase):

View File

@@ -22,7 +22,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.types import RoomID, UserID
from tests import unittest
from tests.utils import setup_test_homeserver
from tests.utils import create_room, setup_test_homeserver
class RedactionTestCase(unittest.TestCase):
@@ -41,6 +41,8 @@ class RedactionTestCase(unittest.TestCase):
self.room1 = RoomID.from_string("!abc123:test")
yield create_room(hs, self.room1.to_string(), self.u_alice.to_string())
self.depth = 1
@defer.inlineCallbacks

View File

@@ -22,7 +22,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.types import RoomID, UserID
from tests import unittest
from tests.utils import setup_test_homeserver
from tests.utils import create_room, setup_test_homeserver
class RoomMemberStoreTestCase(unittest.TestCase):
@@ -45,6 +45,8 @@ class RoomMemberStoreTestCase(unittest.TestCase):
self.room = RoomID.from_string("!abc123:test")
yield create_room(hs, self.room.to_string(), self.u_alice.to_string())
@defer.inlineCallbacks
def inject_room_member(self, room, user, membership, replaces_state=None):
builder = self.event_builder_factory.new(

View File

@@ -185,6 +185,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
# test _get_some_state_from_cache correctly filters out members with types=[]
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group, [], filtered_types=[EventTypes.Member]
)
@@ -197,8 +198,20 @@ class StateStoreTestCase(tests.unittest.TestCase):
state_dict,
)
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group, [], filtered_types=[EventTypes.Member]
)
self.assertEqual(is_all, True)
self.assertDictEqual(
{},
state_dict,
)
# test _get_some_state_from_cache correctly filters in members with wildcard types
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member]
)
@@ -207,6 +220,18 @@ class StateStoreTestCase(tests.unittest.TestCase):
{
(e1.type, e1.state_key): e1.event_id,
(e2.type, e2.state_key): e2.event_id,
},
state_dict,
)
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member]
)
self.assertEqual(is_all, True)
self.assertDictEqual(
{
(e3.type, e3.state_key): e3.event_id,
# e4 is overwritten by e5
(e5.type, e5.state_key): e5.event_id,
@@ -216,6 +241,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
# test _get_some_state_from_cache correctly filters in members with specific types
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group,
[(EventTypes.Member, e5.state_key)],
filtered_types=[EventTypes.Member],
@@ -226,6 +252,20 @@ class StateStoreTestCase(tests.unittest.TestCase):
{
(e1.type, e1.state_key): e1.event_id,
(e2.type, e2.state_key): e2.event_id,
},
state_dict,
)
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group,
[(EventTypes.Member, e5.state_key)],
filtered_types=[EventTypes.Member],
)
self.assertEqual(is_all, True)
self.assertDictEqual(
{
(e5.type, e5.state_key): e5.event_id,
},
state_dict,
@@ -234,6 +274,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
# test _get_some_state_from_cache correctly filters in members with specific types
# and no filtered_types
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group, [(EventTypes.Member, e5.state_key)], filtered_types=None
)
@@ -254,9 +295,6 @@ class StateStoreTestCase(tests.unittest.TestCase):
{
(e1.type, e1.state_key): e1.event_id,
(e2.type, e2.state_key): e2.event_id,
(e3.type, e3.state_key): e3.event_id,
# e4 is overwritten by e5
(e5.type, e5.state_key): e5.event_id,
},
)
@@ -269,8 +307,6 @@ class StateStoreTestCase(tests.unittest.TestCase):
# list fetched keys so it knows it's partial
fetched_keys=(
(e1.type, e1.state_key),
(e3.type, e3.state_key),
(e5.type, e5.state_key),
),
)
@@ -284,8 +320,6 @@ class StateStoreTestCase(tests.unittest.TestCase):
set(
[
(e1.type, e1.state_key),
(e3.type, e3.state_key),
(e5.type, e5.state_key),
]
),
)
@@ -293,8 +327,6 @@ class StateStoreTestCase(tests.unittest.TestCase):
state_dict_ids,
{
(e1.type, e1.state_key): e1.event_id,
(e3.type, e3.state_key): e3.event_id,
(e5.type, e5.state_key): e5.event_id,
},
)
@@ -304,14 +336,25 @@ class StateStoreTestCase(tests.unittest.TestCase):
# test _get_some_state_from_cache correctly filters out members with types=[]
room_id = self.room.to_string()
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group, [], filtered_types=[EventTypes.Member]
)
self.assertEqual(is_all, False)
self.assertDictEqual({(e1.type, e1.state_key): e1.event_id}, state_dict)
room_id = self.room.to_string()
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group, [], filtered_types=[EventTypes.Member]
)
self.assertEqual(is_all, True)
self.assertDictEqual({}, state_dict)
# test _get_some_state_from_cache correctly filters in members wildcard types
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member]
)
@@ -319,8 +362,19 @@ class StateStoreTestCase(tests.unittest.TestCase):
self.assertDictEqual(
{
(e1.type, e1.state_key): e1.event_id,
},
state_dict,
)
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member]
)
self.assertEqual(is_all, True)
self.assertDictEqual(
{
(e3.type, e3.state_key): e3.event_id,
# e4 is overwritten by e5
(e5.type, e5.state_key): e5.event_id,
},
state_dict,
@@ -328,6 +382,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
# test _get_some_state_from_cache correctly filters in members with specific types
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group,
[(EventTypes.Member, e5.state_key)],
filtered_types=[EventTypes.Member],
@@ -337,6 +392,20 @@ class StateStoreTestCase(tests.unittest.TestCase):
self.assertDictEqual(
{
(e1.type, e1.state_key): e1.event_id,
},
state_dict,
)
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group,
[(EventTypes.Member, e5.state_key)],
filtered_types=[EventTypes.Member],
)
self.assertEqual(is_all, True)
self.assertDictEqual(
{
(e5.type, e5.state_key): e5.event_id,
},
state_dict,
@@ -345,8 +414,22 @@ class StateStoreTestCase(tests.unittest.TestCase):
# test _get_some_state_from_cache correctly filters in members with specific types
# and no filtered_types
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_cache,
group, [(EventTypes.Member, e5.state_key)], filtered_types=None
)
self.assertEqual(is_all, False)
self.assertDictEqual({}, state_dict)
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
self.store._state_group_members_cache,
group, [(EventTypes.Member, e5.state_key)], filtered_types=None
)
self.assertEqual(is_all, True)
self.assertDictEqual({(e5.type, e5.state_key): e5.event_id}, state_dict)
self.assertDictEqual(
{
(e5.type, e5.state_key): e5.event_id,
},
state_dict,
)

View File

@@ -18,7 +18,7 @@ from mock import Mock
from twisted.internet import defer
from synapse.api.auth import Auth
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, RoomVersions
from synapse.events import FrozenEvent
from synapse.state import StateHandler, StateResolutionHandler
@@ -117,6 +117,9 @@ class StateGroupStore(object):
def register_event_id_state_group(self, event_id, state_group):
self._event_to_state_group[event_id] = state_group
def get_room_version(self, room_id):
return RoomVersions.V1
class DictObj(dict):
def __init__(self, **kwargs):
@@ -176,7 +179,9 @@ class StateTestCase(unittest.TestCase):
def test_branch_no_conflict(self):
graph = Graph(
nodes={
"START": DictObj(type=EventTypes.Create, state_key="", depth=1),
"START": DictObj(
type=EventTypes.Create, state_key="", content={}, depth=1,
),
"A": DictObj(type=EventTypes.Message, depth=2),
"B": DictObj(type=EventTypes.Message, depth=3),
"C": DictObj(type=EventTypes.Name, state_key="", depth=3),

View File

@@ -21,7 +21,7 @@ from synapse.events import FrozenEvent
from synapse.visibility import filter_events_for_server
import tests.unittest
from tests.utils import setup_test_homeserver
from tests.utils import create_room, setup_test_homeserver
logger = logging.getLogger(__name__)
@@ -36,6 +36,8 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
self.event_builder_factory = self.hs.get_event_builder_factory()
self.store = self.hs.get_datastore()
yield create_room(self.hs, TEST_ROOM_ID, "@someone:ROOM")
@defer.inlineCallbacks
def test_filtering(self):
#

View File

@@ -24,6 +24,7 @@ from six.moves.urllib import parse as urlparse
from twisted.internet import defer, reactor
from synapse.api.constants import EventTypes
from synapse.api.errors import CodeMessageException, cs_error
from synapse.federation.transport import server
from synapse.http.server import HttpServer
@@ -545,3 +546,32 @@ class DeferredMockCallable(object):
"Expected not to received any calls, got:\n"
+ "\n".join(["call(%s)" % _format_call(c[0], c[1]) for c in calls])
)
@defer.inlineCallbacks
def create_room(hs, room_id, creator_id):
"""Creates and persist a creation event for the given room
Args:
hs
room_id (str)
creator_id (str)
"""
store = hs.get_datastore()
event_builder_factory = hs.get_event_builder_factory()
event_creation_handler = hs.get_event_creation_handler()
builder = event_builder_factory.new({
"type": EventTypes.Create,
"state_key": "",
"sender": creator_id,
"room_id": room_id,
"content": {},
})
event, context = yield event_creation_handler.create_new_client_event(
builder
)
yield store.persist_event(event, context)