Compare commits
111 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 86090eadb0 | |||
| edbeed06ca | |||
| 277d2c506d | |||
| 7d0f712348 | |||
| e4570c53dd | |||
| 9cd3f06ab7 | |||
| f92963f5db | |||
| 725a72ec5a | |||
| a89f9f830c | |||
| 39ce38b024 | |||
| 8da39ad98f | |||
| 3ee4ad09eb | |||
| 0ca5c4d2af | |||
| 11597ddea5 | |||
| 05630758f2 | |||
| fcfe7f6ad3 | |||
| 88cc9cc69e | |||
| 9a0db062af | |||
| 33f6195d9a | |||
| e9e4cb25fc | |||
| 4ceaa7433a | |||
| 545001b9e4 | |||
| 01ccc9e6f2 | |||
| a9cb1a35c8 | |||
| f879127aaa | |||
| e6d87c93f3 | |||
| 004cc8a328 | |||
| ef520d8d0e | |||
| a134c572a6 | |||
| c2a5cf2fe3 | |||
| 800cfd5774 | |||
| 152c2ac19e | |||
| e70287cff3 | |||
| 03a26e28d9 | |||
| 3e0c0660b3 | |||
| 3f49e131d9 | |||
| 9b8c0fb162 | |||
| 691f8492fb | |||
| a9d7d98d3f | |||
| bdbb1eec65 | |||
| 01f72e2fc7 | |||
| 9187862002 | |||
| aa3587fdd1 | |||
| 51406dab96 | |||
| fecb45e0c3 | |||
| 44cd6e1358 | |||
| 8d6dc106d1 | |||
| a052aa42e7 | |||
| 8efe773ef1 | |||
| b7e7b52452 | |||
| 8cbbfaefc1 | |||
| 84b5cc69f5 | |||
| fde8e8f09f | |||
| eb9fc021e3 | |||
| 1c41b05c8c | |||
| 5bdb57cb66 | |||
| f5aa027c2f | |||
| e66fbcbb02 | |||
| 9aa5a0af51 | |||
| 610accbb7f | |||
| c384705ee8 | |||
| 1a3aa957ca | |||
| 3f961e638a | |||
| fa72803490 | |||
| 9a0d783c11 | |||
| 38f952b9bc | |||
| a8ce159be4 | |||
| f609acc109 | |||
| 0092cf38ae | |||
| 5b631ff41a | |||
| ba48755d56 | |||
| 926ba76e23 | |||
| 9cf519769b | |||
| 7c7706f42b | |||
| 2cc9f76bc3 | |||
| ddb00efc1d | |||
| 2a376579f3 | |||
| 873aea7168 | |||
| bf7ee93cb6 | |||
| 5ea624b0f5 | |||
| 0ad5125814 | |||
| 068c21ab10 | |||
| b29d1abab6 | |||
| 7367a4a823 | |||
| 7d26591048 | |||
| 2059b8573f | |||
| 10fdcf561d | |||
| 5ccb57d3ff | |||
| c33c1ceddd | |||
| fb647164f2 | |||
| a492b17fe2 | |||
| cb2c7c0669 | |||
| 3959754de3 | |||
| 4f28018c83 | |||
| 57db62e554 | |||
| 0011ede3b0 | |||
| 62ad701326 | |||
| 3f0f06cb31 | |||
| 3e839e0548 | |||
| ebd0127999 | |||
| cfe75a9fb6 | |||
| f51565e023 | |||
| d144ed6ffb | |||
| a08726fc42 | |||
| 350331d466 | |||
| df8ff682a7 | |||
| 3518d0ea8f | |||
| 3d5a25407c | |||
| 4102468da9 | |||
| 5b527d7ee1 | |||
| 053ecae4db |
+82
-4
@@ -1,11 +1,89 @@
|
||||
Unreleased
|
||||
==========
|
||||
Changes in synapse v0.27.2 (2018-03-26)
|
||||
=======================================
|
||||
|
||||
synctl no longer starts the main synapse when using ``-a`` option with workers.
|
||||
A new worker file should be added with ``worker_app: synapse.app.homeserver``.
|
||||
Bug fixes:
|
||||
|
||||
* Fix bug which broke TCP replication between workers (PR #3015)
|
||||
|
||||
|
||||
Changes in synapse v0.27.1 (2018-03-26)
|
||||
=======================================
|
||||
|
||||
Meta release as v0.27.0 temporarily pointed to the wrong commit
|
||||
|
||||
|
||||
Changes in synapse v0.27.0 (2018-03-26)
|
||||
=======================================
|
||||
|
||||
No changes since v0.27.0-rc2
|
||||
|
||||
|
||||
Changes in synapse v0.27.0-rc2 (2018-03-19)
|
||||
===========================================
|
||||
|
||||
Pulls in v0.26.1
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Fix bug introduced in v0.27.0-rc1 that causes much increased memory usage in state cache (PR #3005)
|
||||
|
||||
|
||||
Changes in synapse v0.26.1 (2018-03-15)
|
||||
=======================================
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Fix bug where an invalid event caused server to stop functioning correctly,
|
||||
due to parsing and serializing bugs in ujson library (PR #3008)
|
||||
|
||||
|
||||
Changes in synapse v0.27.0-rc1 (2018-03-14)
|
||||
===========================================
|
||||
|
||||
The common case for running Synapse is not to run separate workers, but for those that do, be aware that synctl no longer starts the main synapse when using ``-a`` option with workers. A new worker file should be added with ``worker_app: synapse.app.homeserver``.
|
||||
|
||||
This release also begins the process of renaming a number of the metrics
|
||||
reported to prometheus. See `docs/metrics-howto.rst <docs/metrics-howto.rst#block-and-response-metrics-renamed-for-0-27-0>`_.
|
||||
Note that the v0.28.0 release will remove the deprecated metric names.
|
||||
|
||||
Features:
|
||||
|
||||
* Add ability for ASes to override message send time (PR #2754)
|
||||
* Add support for custom storage providers for media repository (PR #2867, #2777, #2783, #2789, #2791, #2804, #2812, #2814, #2857, #2868, #2767)
|
||||
* Add purge API features, see `docs/admin_api/purge_history_api.rst <docs/admin_api/purge_history_api.rst>`_ for full details (PR #2858, #2867, #2882, #2946, #2962, #2943)
|
||||
* Add support for whitelisting 3PIDs that users can register. (PR #2813)
|
||||
* Add ``/room/{id}/event/{id}`` API (PR #2766)
|
||||
* Add an admin API to get all the media in a room (PR #2818) Thanks to @turt2live!
|
||||
* Add ``federation_domain_whitelist`` option (PR #2820, #2821)
|
||||
|
||||
|
||||
Changes:
|
||||
|
||||
* Continue to factor out processing from main process and into worker processes. See updated `docs/workers.rst <docs/workers.rst>`_ (PR #2892 - #2904, #2913, #2920 - #2926, #2947, #2847, #2854, #2872, #2873, #2874, #2928, #2929, #2934, #2856, #2976 - #2984, #2987 - #2989, #2991 - #2993, #2995, #2784)
|
||||
* Ensure state cache is used when persisting events (PR #2864, #2871, #2802, #2835, #2836, #2841, #2842, #2849)
|
||||
* Change the default config to bind on both IPv4 and IPv6 on all platforms (PR #2435) Thanks to @silkeh!
|
||||
* No longer require a specific version of saml2 (PR #2695) Thanks to @okurz!
|
||||
* Remove ``verbosity``/``log_file`` from generated config (PR #2755)
|
||||
* Add and improve metrics and logging (PR #2770, #2778, #2785, #2786, #2787, #2793, #2794, #2795, #2809, #2810, #2833, #2834, #2844, #2965, #2927, #2975, #2790, #2796, #2838)
|
||||
* When using synctl with workers, don't start the main synapse automatically (PR #2774)
|
||||
* Minor performance improvements (PR #2773, #2792)
|
||||
* Use a connection pool for non-federation outbound connections (PR #2817)
|
||||
* Make it possible to run unit tests against postgres (PR #2829)
|
||||
* Update pynacl dependency to 1.2.1 or higher (PR #2888) Thanks to @bachp!
|
||||
* Remove ability for AS users to call /events and /sync (PR #2948)
|
||||
* Use bcrypt.checkpw (PR #2949) Thanks to @krombel!
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Fix broken ``ldap_config`` config option (PR #2683) Thanks to @seckrv!
|
||||
* Fix error message when user is not allowed to unban (PR #2761) Thanks to @turt2live!
|
||||
* Fix publicised groups GET API (singular) over federation (PR #2772)
|
||||
* Fix user directory when using ``user_directory_search_all_users`` config option (PR #2803, #2831)
|
||||
* Fix error on ``/publicRooms`` when no rooms exist (PR #2827)
|
||||
* Fix bug in quarantine_media (PR #2837)
|
||||
* Fix url_previews when no Content-Type is returned from URL (PR #2845)
|
||||
* Fix rare race in sync API when joining room (PR #2944)
|
||||
* Fix slow event search, switch back from GIST to GIN indexes (PR #2769, #2848)
|
||||
|
||||
|
||||
Changes in synapse v0.26.0 (2018-01-05)
|
||||
|
||||
+7
-3
@@ -30,8 +30,12 @@ use github's pull request workflow to review the contribution, and either ask
|
||||
you to make any refinements needed or merge it and make them ourselves. The
|
||||
changes will then land on master when we next do a release.
|
||||
|
||||
We use Jenkins for continuous integration (http://matrix.org/jenkins), and
|
||||
typically all pull requests get automatically tested Jenkins: if your change breaks the build, Jenkins will yell about it in #matrix-dev:matrix.org so please lurk there and keep an eye open.
|
||||
We use `Jenkins <http://matrix.org/jenkins>`_ and
|
||||
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
|
||||
integration. All pull requests to synapse get automatically tested by Travis;
|
||||
the Jenkins builds require an adminstrator to start them. If your change
|
||||
breaks the build, this will be shown in github, so please keep an eye on the
|
||||
pull request for feedback.
|
||||
|
||||
Code style
|
||||
~~~~~~~~~~
|
||||
@@ -115,4 +119,4 @@ can't be accepted. Git makes this trivial - just use the -s flag when you do
|
||||
Conclusion
|
||||
~~~~~~~~~~
|
||||
|
||||
That's it! Matrix is a very open and collaborative project as you might expect given our obsession with open communication. If we're going to successfully matrix together all the fragmented communication technologies out there we are reliant on contributions and collaboration from the community to do so. So please get involved - and we hope you have as much fun hacking on Matrix as we do!
|
||||
That's it! Matrix is a very open and collaborative project as you might expect given our obsession with open communication. If we're going to successfully matrix together all the fragmented communication technologies out there we are reliant on contributions and collaboration from the community to do so. So please get involved - and we hope you have as much fun hacking on Matrix as we do!
|
||||
|
||||
+15
@@ -354,6 +354,10 @@ https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one
|
||||
Fedora
|
||||
------
|
||||
|
||||
Synapse is in the Fedora repositories as ``matrix-synapse``::
|
||||
|
||||
sudo dnf install matrix-synapse
|
||||
|
||||
Oleg Girko provides Fedora RPMs at
|
||||
https://obs.infoserver.lv/project/monitor/matrix-synapse
|
||||
|
||||
@@ -890,6 +894,17 @@ This should end with a 'PASSED' result::
|
||||
|
||||
PASSED (successes=143)
|
||||
|
||||
Running the Integration Tests
|
||||
=============================
|
||||
|
||||
Synapse is accompanied by `SyTest <https://github.com/matrix-org/sytest>`_,
|
||||
a Matrix homeserver integration testing suite, which uses HTTP requests to
|
||||
access the API as a Matrix client would. It is able to run Synapse directly from
|
||||
the source tree, so installation of the server is not required.
|
||||
|
||||
Testing with SyTest is recommended for verifying that changes related to the
|
||||
Client-Server API are functioning correctly. See the `installation instructions
|
||||
<https://github.com/matrix-org/sytest#installing>`_ for details.
|
||||
|
||||
Building Internal API Documentation
|
||||
===================================
|
||||
|
||||
+12
@@ -48,6 +48,18 @@ returned by the Client-Server API:
|
||||
# configured on port 443.
|
||||
curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
|
||||
|
||||
Upgrading to $NEXT_VERSION
|
||||
====================
|
||||
|
||||
This release expands the anonymous usage stats sent if the opt-in
|
||||
``report_stats`` configuration is set to ``true``. We now capture RSS memory
|
||||
and cpu use at a very coarse level. This requires administrators to install
|
||||
the optional ``psutil`` python module.
|
||||
|
||||
We would appreciate it if you could assist by ensuring this module is available
|
||||
and ``report_stats`` is enabled. This will let us see if performance changes to
|
||||
synapse are having an impact to the general community.
|
||||
|
||||
Upgrading to v0.15.0
|
||||
====================
|
||||
|
||||
|
||||
@@ -16,9 +16,11 @@ including an ``access_token`` of a server admin.
|
||||
|
||||
By default, events sent by local users are not deleted, as they may represent
|
||||
the only copies of this content in existence. (Events sent by remote users are
|
||||
deleted, and room state data before the cutoff is always removed).
|
||||
deleted.)
|
||||
|
||||
To delete local events as well, set ``delete_local_events`` in the body:
|
||||
Room state data (such as joins, leaves, topic) is always preserved.
|
||||
|
||||
To delete local message events as well, set ``delete_local_events`` in the body:
|
||||
|
||||
.. code:: json
|
||||
|
||||
|
||||
+1
-1
@@ -230,7 +230,7 @@ file. For example::
|
||||
``synapse.app.event_creator``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles non-state event creation. It can handle REST endpoints matching:
|
||||
Handles non-state event creation. It can handle REST endpoints matching::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
|
||||
|
||||
|
||||
+1
-1
@@ -16,4 +16,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.26.0"
|
||||
__version__ = "0.27.2"
|
||||
|
||||
@@ -15,9 +15,10 @@
|
||||
|
||||
"""Contains exceptions and error codes."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ from synapse.storage.presence import UserPresenceState
|
||||
from synapse.types import UserID, RoomID
|
||||
from twisted.internet import defer
|
||||
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import jsonschema
|
||||
from jsonschema import FormatChecker
|
||||
|
||||
|
||||
@@ -36,13 +36,13 @@ from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.appservice")
|
||||
|
||||
|
||||
class AppserviceSlaveStore(
|
||||
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
|
||||
DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
):
|
||||
pass
|
||||
@@ -64,7 +64,7 @@ class AppserviceServer(HomeServer):
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -44,17 +44,17 @@ from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.client_reader")
|
||||
|
||||
|
||||
class ClientReaderSlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
TransactionStore,
|
||||
SlavedClientIpStore,
|
||||
@@ -88,7 +88,7 @@ class ClientReaderServer(HomeServer):
|
||||
"/_matrix/client/api/v1": resource,
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -31,14 +31,20 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.room import RoomSendEventRestServlet
|
||||
from synapse.rest.client.v1.room import (
|
||||
RoomSendEventRestServlet, RoomMembershipRestServlet, RoomStateEventRestServlet,
|
||||
JoinRoomAliasServlet,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
@@ -46,12 +52,15 @@ from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.event_creator")
|
||||
|
||||
|
||||
class EventCreatorSlavedStore(
|
||||
DirectoryStore,
|
||||
TransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPusherStore,
|
||||
SlavedReceiptsStore,
|
||||
@@ -85,6 +94,9 @@ class EventCreatorServer(HomeServer):
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
RoomSendEventRestServlet(self).register(resource)
|
||||
RoomMembershipRestServlet(self).register(resource)
|
||||
RoomStateEventRestServlet(self).register(resource)
|
||||
JoinRoomAliasServlet(self).register(resource)
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
@@ -92,7 +104,7 @@ class EventCreatorServer(HomeServer):
|
||||
"/_matrix/client/api/v1": resource,
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -41,7 +41,7 @@ from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.federation_reader")
|
||||
|
||||
@@ -77,7 +77,7 @@ class FederationReaderServer(HomeServer):
|
||||
FEDERATION_PREFIX: TransportLayerServer(self),
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -42,7 +42,7 @@ from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.federation_sender")
|
||||
|
||||
@@ -91,7 +91,7 @@ class FederationSenderServer(HomeServer):
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.frontend_proxy")
|
||||
|
||||
@@ -142,7 +142,7 @@ class FrontendProxyServer(HomeServer):
|
||||
"/_matrix/client/api/v1": resource,
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -48,6 +48,7 @@ from synapse.server import HomeServer
|
||||
from synapse.storage import are_all_users_on_domain
|
||||
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
|
||||
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -56,7 +57,7 @@ from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import EncodingResourceWrapper, Resource
|
||||
from twisted.web.resource import EncodingResourceWrapper, NoResource
|
||||
from twisted.web.server import GzipEncoderFactory
|
||||
from twisted.web.static import File
|
||||
|
||||
@@ -126,7 +127,7 @@ class SynapseHomeServer(HomeServer):
|
||||
if WEB_CLIENT_PREFIX in resources:
|
||||
root_resource = RootRedirect(WEB_CLIENT_PREFIX)
|
||||
else:
|
||||
root_resource = Resource()
|
||||
root_resource = NoResource()
|
||||
|
||||
root_resource = create_resource_tree(resources, root_resource)
|
||||
|
||||
@@ -402,6 +403,10 @@ def run(hs):
|
||||
|
||||
stats = {}
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
stats_process = []
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def phone_stats_home():
|
||||
logger.info("Gathering stats for reporting")
|
||||
@@ -427,6 +432,15 @@ def run(hs):
|
||||
|
||||
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
|
||||
stats["daily_sent_messages"] = daily_sent_messages
|
||||
stats["cache_factor"] = CACHE_SIZE_FACTOR
|
||||
stats["event_cache_size"] = hs.config.event_cache_size
|
||||
|
||||
if len(stats_process) > 0:
|
||||
stats["memory_rss"] = 0
|
||||
stats["cpu_average"] = 0
|
||||
for process in stats_process:
|
||||
stats["memory_rss"] += process.memory_info().rss
|
||||
stats["cpu_average"] += int(process.cpu_percent(interval=None))
|
||||
|
||||
logger.info("Reporting stats to matrix.org: %s" % (stats,))
|
||||
try:
|
||||
@@ -437,10 +451,32 @@ def run(hs):
|
||||
except Exception as e:
|
||||
logger.warn("Error reporting stats: %s", e)
|
||||
|
||||
def performance_stats_init():
|
||||
try:
|
||||
import psutil
|
||||
process = psutil.Process()
|
||||
# Ensure we can fetch both, and make the initial request for cpu_percent
|
||||
# so the next request will use this as the initial point.
|
||||
process.memory_info().rss
|
||||
process.cpu_percent(interval=None)
|
||||
logger.info("report_stats can use psutil")
|
||||
stats_process.append(process)
|
||||
except (ImportError, AttributeError):
|
||||
logger.warn(
|
||||
"report_stats enabled but psutil is not installed or incorrect version."
|
||||
" Disabling reporting of memory/cpu stats."
|
||||
" Ensuring psutil is available will help matrix.org track performance"
|
||||
" changes across releases."
|
||||
)
|
||||
|
||||
if hs.config.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
clock.call_later(0, performance_stats_init)
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, phone_stats_home)
|
||||
|
||||
@@ -43,7 +43,7 @@ from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.media_repository")
|
||||
|
||||
@@ -84,7 +84,7 @@ class MediaRepositoryServer(HomeServer):
|
||||
),
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -37,7 +37,7 @@ from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.pusher")
|
||||
|
||||
@@ -94,7 +94,7 @@ class PusherServer(HomeServer):
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -56,7 +56,7 @@ from synapse.util.manhole import manhole
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.synchrotron")
|
||||
|
||||
@@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
|
||||
class SynchrotronSlavedStore(
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedFilteringStore,
|
||||
@@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
|
||||
SlavedGroupServerStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
@@ -269,7 +269,7 @@ class SynchrotronServer(HomeServer):
|
||||
"/_matrix/client/api/v1": resource,
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -38,7 +38,7 @@ def pid_running(pid):
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
return True
|
||||
except OSError, err:
|
||||
except OSError as err:
|
||||
if err.errno == errno.EPERM:
|
||||
return True
|
||||
return False
|
||||
@@ -98,7 +98,7 @@ def stop(pidfile, app):
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
write("stopped %s" % (app,), colour=GREEN)
|
||||
except OSError, err:
|
||||
except OSError as err:
|
||||
if err.errno == errno.ESRCH:
|
||||
write("%s not running" % (app,), colour=YELLOW)
|
||||
elif err.errno == errno.EPERM:
|
||||
|
||||
@@ -43,14 +43,14 @@ from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.user_dir")
|
||||
|
||||
|
||||
class UserDirectorySlaveStore(
|
||||
SlavedEventStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
UserDirectoryStore,
|
||||
@@ -116,7 +116,7 @@ class UserDirectoryServer(HomeServer):
|
||||
"/_matrix/client/api/v1": resource,
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
_base.listen_tcp(
|
||||
bind_addresses,
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.types import GroupID, get_domain_from_id
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -173,6 +172,7 @@ class ApplicationService(object):
|
||||
|
||||
if self.is_interested_in_user(event.sender):
|
||||
defer.returnValue(True)
|
||||
|
||||
# also check m.room.member state key
|
||||
if (event.type == EventTypes.Member and
|
||||
self.is_interested_in_user(event.state_key)):
|
||||
@@ -181,20 +181,18 @@ class ApplicationService(object):
|
||||
if not store:
|
||||
defer.returnValue(False)
|
||||
|
||||
does_match = yield self._matches_user_in_member_list(event.room_id, store)
|
||||
does_match = yield self._matches_user_in_member_list(
|
||||
event, store,
|
||||
)
|
||||
defer.returnValue(does_match)
|
||||
|
||||
@cachedInlineCallbacks(num_args=1, cache_context=True)
|
||||
def _matches_user_in_member_list(self, room_id, store, cache_context):
|
||||
member_list = yield store.get_users_in_room(
|
||||
room_id, on_invalidate=cache_context.invalidate
|
||||
@defer.inlineCallbacks
|
||||
def _matches_user_in_member_list(self, event, store):
|
||||
ases = yield store.get_appservices_with_user_in_room(
|
||||
event,
|
||||
)
|
||||
|
||||
# check joined member events
|
||||
for user_id in member_list:
|
||||
if self.is_interested_in_user(user_id):
|
||||
defer.returnValue(True)
|
||||
defer.returnValue(False)
|
||||
defer.returnValue(self.id in ases)
|
||||
|
||||
def _matches_room_id(self, event):
|
||||
if hasattr(event, "room_id"):
|
||||
|
||||
@@ -77,7 +77,9 @@ class RegistrationConfig(Config):
|
||||
|
||||
# Set the number of bcrypt rounds used to generate password hash.
|
||||
# Larger numbers increase the work factor needed to generate the hash.
|
||||
# The default number of rounds is 12.
|
||||
# The default number is 12 (which equates to 2^12 rounds).
|
||||
# N.B. that increasing this will exponentially increase the time required
|
||||
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
|
||||
bcrypt_rounds: 12
|
||||
|
||||
# Allows users to register as guests without a password/email/etc, and
|
||||
|
||||
@@ -155,7 +155,7 @@ class DeviceHandler(BaseHandler):
|
||||
|
||||
try:
|
||||
yield self.store.delete_device(user_id, device_id)
|
||||
except errors.StoreError, e:
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
pass
|
||||
@@ -204,7 +204,7 @@ class DeviceHandler(BaseHandler):
|
||||
|
||||
try:
|
||||
yield self.store.delete_devices(user_id, device_ids)
|
||||
except errors.StoreError, e:
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
pass
|
||||
@@ -243,7 +243,7 @@ class DeviceHandler(BaseHandler):
|
||||
new_display_name=content.get("display_name")
|
||||
)
|
||||
yield self.notify_device_update(user_id, [device_id])
|
||||
except errors.StoreError, e:
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
raise errors.NotFoundError()
|
||||
else:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -13,7 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import logging
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
@@ -134,23 +135,8 @@ class E2eKeysHandler(object):
|
||||
if user_id in destination_query:
|
||||
results[user_id] = keys
|
||||
|
||||
except CodeMessageException as e:
|
||||
failures[destination] = {
|
||||
"status": e.code, "message": e.message
|
||||
}
|
||||
except NotRetryingDestination as e:
|
||||
failures[destination] = {
|
||||
"status": 503, "message": "Not ready for retry",
|
||||
}
|
||||
except FederationDeniedError as e:
|
||||
failures[destination] = {
|
||||
"status": 403, "message": "Federation Denied",
|
||||
}
|
||||
except Exception as e:
|
||||
# include ConnectionRefused and other errors
|
||||
failures[destination] = {
|
||||
"status": 503, "message": e.message
|
||||
}
|
||||
failures[destination] = _exception_to_failure(e)
|
||||
|
||||
yield make_deferred_yieldable(defer.gatherResults([
|
||||
preserve_fn(do_remote_query)(destination)
|
||||
@@ -252,19 +238,8 @@ class E2eKeysHandler(object):
|
||||
for user_id, keys in remote_result["one_time_keys"].items():
|
||||
if user_id in device_keys:
|
||||
json_result[user_id] = keys
|
||||
except CodeMessageException as e:
|
||||
failures[destination] = {
|
||||
"status": e.code, "message": e.message
|
||||
}
|
||||
except NotRetryingDestination as e:
|
||||
failures[destination] = {
|
||||
"status": 503, "message": "Not ready for retry",
|
||||
}
|
||||
except Exception as e:
|
||||
# include ConnectionRefused and other errors
|
||||
failures[destination] = {
|
||||
"status": 503, "message": e.message
|
||||
}
|
||||
failures[destination] = _exception_to_failure(e)
|
||||
|
||||
yield make_deferred_yieldable(defer.gatherResults([
|
||||
preserve_fn(claim_client_keys)(destination)
|
||||
@@ -362,6 +337,31 @@ class E2eKeysHandler(object):
|
||||
)
|
||||
|
||||
|
||||
def _exception_to_failure(e):
|
||||
if isinstance(e, CodeMessageException):
|
||||
return {
|
||||
"status": e.code, "message": e.message,
|
||||
}
|
||||
|
||||
if isinstance(e, NotRetryingDestination):
|
||||
return {
|
||||
"status": 503, "message": "Not ready for retry",
|
||||
}
|
||||
|
||||
if isinstance(e, FederationDeniedError):
|
||||
return {
|
||||
"status": 403, "message": "Federation Denied",
|
||||
}
|
||||
|
||||
# include ConnectionRefused and other errors
|
||||
#
|
||||
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
|
||||
# give a string for e.message, which simplejson then fails to serialize.
|
||||
return {
|
||||
"status": 503, "message": str(e.message),
|
||||
}
|
||||
|
||||
|
||||
def _one_time_keys_match(old_key_json, new_key):
|
||||
old_key = json.loads(old_key_json)
|
||||
|
||||
|
||||
@@ -15,6 +15,11 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""Utilities for interacting with Identity Servers"""
|
||||
|
||||
import logging
|
||||
|
||||
import simplejson as json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import (
|
||||
@@ -24,9 +29,6 @@ from ._base import BaseHandler
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.api.errors import SynapseError, Codes
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ from synapse.types import (
|
||||
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
|
||||
from synapse.util.logcontext import preserve_fn, run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
from synapse.replication.http.send_event import send_event_to_master
|
||||
@@ -38,7 +38,7 @@ from canonicaljson import encode_canonical_json
|
||||
|
||||
import logging
|
||||
import random
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -678,8 +678,8 @@ class EventCreationHandler(object):
|
||||
|
||||
# Ensure that we can round trip before trying to persist in db
|
||||
try:
|
||||
dump = ujson.dumps(unfreeze(event.content))
|
||||
ujson.loads(dump)
|
||||
dump = frozendict_json_encoder.encode(event.content)
|
||||
simplejson.loads(dump)
|
||||
except Exception:
|
||||
logger.exception("Failed to encode content: %r", event.content)
|
||||
raise
|
||||
|
||||
@@ -38,7 +38,10 @@ class ProfileHandler(BaseHandler):
|
||||
|
||||
self.user_directory_handler = hs.get_user_directory_handler()
|
||||
|
||||
self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
|
||||
if hs.config.worker_app is None:
|
||||
self.clock.looping_call(
|
||||
self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_profile(self, user_id):
|
||||
|
||||
@@ -24,7 +24,7 @@ from synapse.api.errors import (
|
||||
from synapse.http.client import CaptchaServerHttpClient
|
||||
from synapse import types
|
||||
from synapse.types import UserID
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.async import run_on_reactor, Linearizer
|
||||
from synapse.util.threepids import check_3pid_allowed
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -46,6 +46,10 @@ class RegistrationHandler(BaseHandler):
|
||||
|
||||
self.macaroon_gen = hs.get_macaroon_generator()
|
||||
|
||||
self._generate_user_id_linearizer = Linearizer(
|
||||
name="_generate_user_id_linearizer",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_username(self, localpart, guest_access_token=None,
|
||||
assigned_user_id=None):
|
||||
@@ -345,9 +349,11 @@ class RegistrationHandler(BaseHandler):
|
||||
@defer.inlineCallbacks
|
||||
def _generate_user_id(self, reseed=False):
|
||||
if reseed or self._next_generated_user_id is None:
|
||||
self._next_generated_user_id = (
|
||||
yield self.store.find_next_generated_user_id_localpart()
|
||||
)
|
||||
with (yield self._generate_user_id_linearizer.queue(())):
|
||||
if reseed or self._next_generated_user_id is None:
|
||||
self._next_generated_user_id = (
|
||||
yield self.store.find_next_generated_user_id_localpart()
|
||||
)
|
||||
|
||||
id = self._next_generated_user_id
|
||||
self._next_generated_user_id += 1
|
||||
|
||||
@@ -27,10 +27,6 @@ from synapse.api.constants import (
|
||||
EventTypes, Membership,
|
||||
)
|
||||
from synapse.api.errors import AuthError, SynapseError, Codes
|
||||
from synapse.replication.http.membership import (
|
||||
remote_join, remote_reject_invite, get_or_register_3pid_guest,
|
||||
notify_user_membership_change,
|
||||
)
|
||||
from synapse.types import UserID, RoomID
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.distributor import user_left_room, user_joined_room
|
||||
@@ -374,7 +370,7 @@ class RoomMemberHandler(object):
|
||||
content["kind"] = "guest"
|
||||
|
||||
ret = yield self._remote_join(
|
||||
remote_room_hosts, room_id, target, content
|
||||
requester, remote_room_hosts, room_id, target, content
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
|
||||
@@ -396,7 +392,7 @@ class RoomMemberHandler(object):
|
||||
# send the rejection to the inviter's HS.
|
||||
remote_room_hosts = remote_room_hosts + [inviter.domain]
|
||||
res = yield self._remote_reject_invite(
|
||||
remote_room_hosts, room_id, target,
|
||||
requester, remote_room_hosts, room_id, target,
|
||||
)
|
||||
defer.returnValue(res)
|
||||
|
||||
@@ -853,7 +849,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
self.distributor.declare("user_left_room")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _remote_join(self, remote_room_hosts, room_id, user, content):
|
||||
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
|
||||
"""Implements RoomMemberHandler._remote_join
|
||||
"""
|
||||
if len(remote_room_hosts) == 0:
|
||||
@@ -872,7 +868,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
yield self._user_joined_room(user, room_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
|
||||
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
|
||||
"""Implements RoomMemberHandler._remote_reject_invite
|
||||
"""
|
||||
fed_handler = self.federation_handler
|
||||
@@ -933,77 +929,3 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
|
||||
if membership:
|
||||
yield self.store.forget(user_id, room_id)
|
||||
|
||||
|
||||
class RoomMemberWorkerHandler(RoomMemberHandler):
|
||||
@defer.inlineCallbacks
|
||||
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
|
||||
"""Implements RoomMemberHandler._remote_join
|
||||
"""
|
||||
if len(remote_room_hosts) == 0:
|
||||
raise SynapseError(404, "No known servers")
|
||||
|
||||
ret = yield remote_join(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
requester=requester,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
room_id=room_id,
|
||||
user_id=user.to_string(),
|
||||
content=content,
|
||||
)
|
||||
|
||||
yield self._user_joined_room(user, room_id)
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
||||
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
|
||||
"""Implements RoomMemberHandler._remote_reject_invite
|
||||
"""
|
||||
return remote_reject_invite(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
requester=requester,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
room_id=room_id,
|
||||
user_id=target.to_string(),
|
||||
)
|
||||
|
||||
def _user_joined_room(self, target, room_id):
|
||||
"""Implements RoomMemberHandler._user_joined_room
|
||||
"""
|
||||
return notify_user_membership_change(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
user_id=target.to_string(),
|
||||
room_id=room_id,
|
||||
change="join",
|
||||
)
|
||||
|
||||
def _user_left_room(self, target, room_id):
|
||||
"""Implements RoomMemberHandler._user_left_room
|
||||
"""
|
||||
return notify_user_membership_change(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
user_id=target.to_string(),
|
||||
room_id=room_id,
|
||||
change="left",
|
||||
)
|
||||
|
||||
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
|
||||
"""Implements RoomMemberHandler.get_or_register_3pid_guest
|
||||
"""
|
||||
return get_or_register_3pid_guest(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
requester=requester,
|
||||
medium=medium,
|
||||
address=address,
|
||||
inviter_user_id=inviter_user_id,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.handlers.room_member import RoomMemberHandler
|
||||
from synapse.replication.http.membership import (
|
||||
remote_join, remote_reject_invite, get_or_register_3pid_guest,
|
||||
notify_user_membership_change,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoomMemberWorkerHandler(RoomMemberHandler):
|
||||
@defer.inlineCallbacks
|
||||
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
|
||||
"""Implements RoomMemberHandler._remote_join
|
||||
"""
|
||||
if len(remote_room_hosts) == 0:
|
||||
raise SynapseError(404, "No known servers")
|
||||
|
||||
ret = yield remote_join(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
requester=requester,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
room_id=room_id,
|
||||
user_id=user.to_string(),
|
||||
content=content,
|
||||
)
|
||||
|
||||
yield self._user_joined_room(user, room_id)
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
||||
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
|
||||
"""Implements RoomMemberHandler._remote_reject_invite
|
||||
"""
|
||||
return remote_reject_invite(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
requester=requester,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
room_id=room_id,
|
||||
user_id=target.to_string(),
|
||||
)
|
||||
|
||||
def _user_joined_room(self, target, room_id):
|
||||
"""Implements RoomMemberHandler._user_joined_room
|
||||
"""
|
||||
return notify_user_membership_change(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
user_id=target.to_string(),
|
||||
room_id=room_id,
|
||||
change="joined",
|
||||
)
|
||||
|
||||
def _user_left_room(self, target, room_id):
|
||||
"""Implements RoomMemberHandler._user_left_room
|
||||
"""
|
||||
return notify_user_membership_change(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
user_id=target.to_string(),
|
||||
room_id=room_id,
|
||||
change="left",
|
||||
)
|
||||
|
||||
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
|
||||
"""Implements RoomMemberHandler.get_or_register_3pid_guest
|
||||
"""
|
||||
return get_or_register_3pid_guest(
|
||||
self.simple_http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
requester=requester,
|
||||
medium=medium,
|
||||
address=address,
|
||||
inviter_user_id=inviter_user_id,
|
||||
)
|
||||
@@ -37,7 +37,7 @@ from twisted.web.util import redirectTo
|
||||
import collections
|
||||
import logging
|
||||
import urllib
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -461,8 +461,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
|
||||
if canonical_json or synapse.events.USE_FROZEN_DICTS:
|
||||
json_bytes = encode_canonical_json(json_object)
|
||||
else:
|
||||
# ujson doesn't like frozen_dicts.
|
||||
json_bytes = ujson.dumps(json_object, ensure_ascii=False)
|
||||
json_bytes = simplejson.dumps(json_object)
|
||||
|
||||
return respond_with_json_bytes(
|
||||
request, code, json_bytes,
|
||||
@@ -489,6 +488,7 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
|
||||
request.setHeader(b"Content-Type", b"application/json")
|
||||
request.setHeader(b"Server", version_string)
|
||||
request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
|
||||
request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
|
||||
|
||||
if send_cors:
|
||||
set_cors_headers(request)
|
||||
|
||||
@@ -34,7 +34,6 @@ REQUIREMENTS = {
|
||||
"bcrypt": ["bcrypt>=3.1.0"],
|
||||
"pillow": ["PIL"],
|
||||
"pydenticon": ["pydenticon"],
|
||||
"ujson": ["ujson"],
|
||||
"blist": ["blist"],
|
||||
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
|
||||
"pymacaroons-pynacl": ["pymacaroons"],
|
||||
|
||||
@@ -13,11 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import send_event
|
||||
import membership
|
||||
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.replication.http import membership, send_event
|
||||
|
||||
|
||||
REPLICATION_PREFIX = "/_synapse/replication"
|
||||
|
||||
@@ -13,6 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError, MatrixCodeMessageException
|
||||
@@ -20,9 +23,6 @@ from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.types import Requester, UserID
|
||||
from synapse.util.distributor import user_left_room, user_joined_room
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -154,7 +154,7 @@ def notify_user_membership_change(client, host, port, user_id, room_id, change):
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
assert change in ("join", "left")
|
||||
assert change in ("joined", "left")
|
||||
|
||||
uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
|
||||
|
||||
@@ -297,7 +297,7 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
|
||||
|
||||
|
||||
class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
|
||||
PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>join|left)_room$")]
|
||||
PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
|
||||
|
||||
def __init__(self, hs):
|
||||
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
|
||||
@@ -317,7 +317,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
|
||||
|
||||
user = UserID.from_string(user_id)
|
||||
|
||||
if change == "join":
|
||||
if change == "joined":
|
||||
user_joined_room(self.distributor, user, room_id)
|
||||
elif change == "left":
|
||||
user_left_room(self.distributor, user, room_id)
|
||||
|
||||
@@ -17,8 +17,10 @@
|
||||
from synapse.storage.appservice import (
|
||||
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
|
||||
)
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
|
||||
|
||||
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
|
||||
ApplicationServiceWorkerStore):
|
||||
ApplicationServiceWorkerStore,
|
||||
SlavedEventStore):
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.storage.profile import ProfileWorkerStore
|
||||
|
||||
|
||||
class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
|
||||
pass
|
||||
@@ -19,11 +19,13 @@ allowed to be sent by which side.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
|
||||
|
||||
|
||||
class Command(object):
|
||||
"""The base command class.
|
||||
@@ -100,14 +102,14 @@ class RdataCommand(Command):
|
||||
return cls(
|
||||
stream_name,
|
||||
None if token == "batch" else int(token),
|
||||
json.loads(row_json)
|
||||
simplejson.loads(row_json)
|
||||
)
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((
|
||||
self.stream_name,
|
||||
str(self.token) if self.token is not None else "batch",
|
||||
json.dumps(self.row),
|
||||
_json_encoder.encode(self.row),
|
||||
))
|
||||
|
||||
|
||||
@@ -298,10 +300,12 @@ class InvalidateCacheCommand(Command):
|
||||
def from_line(cls, line):
|
||||
cache_func, keys_json = line.split(" ", 1)
|
||||
|
||||
return cls(cache_func, json.loads(keys_json))
|
||||
return cls(cache_func, simplejson.loads(keys_json))
|
||||
|
||||
def to_line(self):
|
||||
return " ".join((self.cache_func, json.dumps(self.keys)))
|
||||
return " ".join((
|
||||
self.cache_func, _json_encoder.encode(self.keys),
|
||||
))
|
||||
|
||||
|
||||
class UserIpCommand(Command):
|
||||
@@ -325,14 +329,14 @@ class UserIpCommand(Command):
|
||||
def from_line(cls, line):
|
||||
user_id, jsn = line.split(" ", 1)
|
||||
|
||||
access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
|
||||
access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
|
||||
|
||||
return cls(
|
||||
user_id, access_token, ip, user_agent, device_id, last_seen
|
||||
)
|
||||
|
||||
def to_line(self):
|
||||
return self.user_id + " " + json.dumps((
|
||||
return self.user_id + " " + _json_encoder.encode((
|
||||
self.access_token, self.ip, self.user_agent, self.device_id,
|
||||
self.last_seen,
|
||||
))
|
||||
|
||||
@@ -30,7 +30,7 @@ from synapse.http.servlet import (
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ import re
|
||||
import shutil
|
||||
import sys
|
||||
import traceback
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import urlparse
|
||||
|
||||
from twisted.web.server import NOT_DONE_YET
|
||||
|
||||
+2
-3
@@ -47,9 +47,8 @@ from synapse.handlers.device import DeviceHandler
|
||||
from synapse.handlers.e2e_keys import E2eKeysHandler
|
||||
from synapse.handlers.presence import PresenceHandler
|
||||
from synapse.handlers.room_list import RoomListHandler
|
||||
from synapse.handlers.room_member import (
|
||||
RoomMemberMasterHandler, RoomMemberWorkerHandler,
|
||||
)
|
||||
from synapse.handlers.room_member import RoomMemberMasterHandler
|
||||
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
|
||||
from synapse.handlers.set_password import SetPasswordHandler
|
||||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import TypingHandler
|
||||
|
||||
+36
-35
@@ -132,7 +132,7 @@ class StateHandler(object):
|
||||
|
||||
state_map = yield self.store.get_events(state.values(), get_prev_content=False)
|
||||
state = {
|
||||
key: state_map[e_id] for key, e_id in state.items() if e_id in state_map
|
||||
key: state_map[e_id] for key, e_id in state.iteritems() if e_id in state_map
|
||||
}
|
||||
|
||||
defer.returnValue(state)
|
||||
@@ -378,7 +378,7 @@ class StateHandler(object):
|
||||
new_state = resolve_events_with_state_map(state_set_ids, state_map)
|
||||
|
||||
new_state = {
|
||||
key: state_map[ev_id] for key, ev_id in new_state.items()
|
||||
key: state_map[ev_id] for key, ev_id in new_state.iteritems()
|
||||
}
|
||||
|
||||
return new_state
|
||||
@@ -458,15 +458,15 @@ class StateResolutionHandler(object):
|
||||
# build a map from state key to the event_ids which set that state.
|
||||
# dict[(str, str), set[str])
|
||||
state = {}
|
||||
for st in state_groups_ids.values():
|
||||
for key, e_id in st.items():
|
||||
for st in state_groups_ids.itervalues():
|
||||
for key, e_id in st.iteritems():
|
||||
state.setdefault(key, set()).add(e_id)
|
||||
|
||||
# build a map from state key to the event_ids which set that state,
|
||||
# including only those where there are state keys in conflict.
|
||||
conflicted_state = {
|
||||
k: list(v)
|
||||
for k, v in state.items()
|
||||
for k, v in state.iteritems()
|
||||
if len(v) > 1
|
||||
}
|
||||
|
||||
@@ -480,36 +480,37 @@ class StateResolutionHandler(object):
|
||||
)
|
||||
else:
|
||||
new_state = {
|
||||
key: e_ids.pop() for key, e_ids in state.items()
|
||||
key: e_ids.pop() for key, e_ids in state.iteritems()
|
||||
}
|
||||
|
||||
# if the new state matches any of the input state groups, we can
|
||||
# use that state group again. Otherwise we will generate a state_id
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
state_group = None
|
||||
new_state_event_ids = frozenset(new_state.values())
|
||||
for sg, events in state_groups_ids.items():
|
||||
if new_state_event_ids == frozenset(e_id for e_id in events):
|
||||
state_group = sg
|
||||
break
|
||||
with Measure(self.clock, "state.create_group_ids"):
|
||||
# if the new state matches any of the input state groups, we can
|
||||
# use that state group again. Otherwise we will generate a state_id
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
state_group = None
|
||||
new_state_event_ids = frozenset(new_state.itervalues())
|
||||
for sg, events in state_groups_ids.iteritems():
|
||||
if new_state_event_ids == frozenset(e_id for e_id in events):
|
||||
state_group = sg
|
||||
break
|
||||
|
||||
# TODO: We want to create a state group for this set of events, to
|
||||
# increase cache hits, but we need to make sure that it doesn't
|
||||
# end up as a prev_group without being added to the database
|
||||
# TODO: We want to create a state group for this set of events, to
|
||||
# increase cache hits, but we need to make sure that it doesn't
|
||||
# end up as a prev_group without being added to the database
|
||||
|
||||
prev_group = None
|
||||
delta_ids = None
|
||||
for old_group, old_ids in state_groups_ids.iteritems():
|
||||
if not set(new_state) - set(old_ids):
|
||||
n_delta_ids = {
|
||||
k: v
|
||||
for k, v in new_state.iteritems()
|
||||
if old_ids.get(k) != v
|
||||
}
|
||||
if not delta_ids or len(n_delta_ids) < len(delta_ids):
|
||||
prev_group = old_group
|
||||
delta_ids = n_delta_ids
|
||||
prev_group = None
|
||||
delta_ids = None
|
||||
for old_group, old_ids in state_groups_ids.iteritems():
|
||||
if not set(new_state) - set(old_ids):
|
||||
n_delta_ids = {
|
||||
k: v
|
||||
for k, v in new_state.iteritems()
|
||||
if old_ids.get(k) != v
|
||||
}
|
||||
if not delta_ids or len(n_delta_ids) < len(delta_ids):
|
||||
prev_group = old_group
|
||||
delta_ids = n_delta_ids
|
||||
|
||||
cache = _StateCacheEntry(
|
||||
state=new_state,
|
||||
@@ -702,7 +703,7 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_
|
||||
|
||||
auth_events = {
|
||||
key: state_map[ev_id]
|
||||
for key, ev_id in auth_event_ids.items()
|
||||
for key, ev_id in auth_event_ids.iteritems()
|
||||
if ev_id in state_map
|
||||
}
|
||||
|
||||
@@ -740,7 +741,7 @@ def _resolve_state_events(conflicted_state, auth_events):
|
||||
|
||||
auth_events.update(resolved_state)
|
||||
|
||||
for key, events in conflicted_state.items():
|
||||
for key, events in conflicted_state.iteritems():
|
||||
if key[0] == EventTypes.JoinRules:
|
||||
logger.debug("Resolving conflicted join rules %r", events)
|
||||
resolved_state[key] = _resolve_auth_events(
|
||||
@@ -750,7 +751,7 @@ def _resolve_state_events(conflicted_state, auth_events):
|
||||
|
||||
auth_events.update(resolved_state)
|
||||
|
||||
for key, events in conflicted_state.items():
|
||||
for key, events in conflicted_state.iteritems():
|
||||
if key[0] == EventTypes.Member:
|
||||
logger.debug("Resolving conflicted member lists %r", events)
|
||||
resolved_state[key] = _resolve_auth_events(
|
||||
@@ -760,7 +761,7 @@ def _resolve_state_events(conflicted_state, auth_events):
|
||||
|
||||
auth_events.update(resolved_state)
|
||||
|
||||
for key, events in conflicted_state.items():
|
||||
for key, events in conflicted_state.iteritems():
|
||||
if key not in resolved_state:
|
||||
logger.debug("Resolving conflicted state %r:%r", key, events)
|
||||
resolved_state[key] = _resolve_normal_events(
|
||||
|
||||
@@ -23,7 +23,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
|
||||
|
||||
import abc
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -18,9 +18,14 @@ import re
|
||||
import simplejson as json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.appservice import AppServiceTransaction
|
||||
from synapse.config.appservice import load_appservices
|
||||
from synapse.storage.events import EventsWorkerStore
|
||||
from synapse.storage.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.state import StateGroupWorkerStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.async import Linearizer
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
|
||||
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
|
||||
return exclusive_user_regex
|
||||
|
||||
|
||||
class ApplicationServiceWorkerStore(SQLBaseStore):
|
||||
class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
|
||||
SQLBaseStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
self.services_cache = load_appservices(
|
||||
hs.hostname,
|
||||
@@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
|
||||
return service
|
||||
return None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservices_with_user_in_room(self, event):
|
||||
"""Get the list of appservices in the room at the given event
|
||||
|
||||
Args:
|
||||
event (Event)
|
||||
|
||||
Returns:
|
||||
Deferred[set(str)]: The IDs of all ASes in the room
|
||||
"""
|
||||
state_group = yield self._get_state_group_for_event(event.event_id)
|
||||
|
||||
if not state_group:
|
||||
raise Exception("No state group for event %s", event.event_id)
|
||||
|
||||
ases_in_room = yield self._get_appservices_with_user_in_room(
|
||||
event.room_id, state_group,
|
||||
)
|
||||
|
||||
defer.returnValue(ases_in_room)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000)
|
||||
def _get_appservices_with_user_in_room(self, room_id, state_group):
|
||||
cache = self._get_appservices_with_user_in_room_cache(room_id)
|
||||
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
|
||||
|
||||
defer.returnValue(ases_in_room)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def _get_appservices_with_user_in_room_cache(self, room_id):
|
||||
return _AppserviceUsersCache(self, room_id)
|
||||
|
||||
|
||||
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
||||
# This is currently empty due to there not being any AS storage functions
|
||||
@@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
|
||||
" (SELECT stream_ordering FROM appservice_stream_position)"
|
||||
" < e.stream_ordering"
|
||||
" AND e.stream_ordering <= ?"
|
||||
" AND NOT e.outlier"
|
||||
" ORDER BY e.stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
@@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
|
||||
# to keep consistency with the other stores, we keep this empty class for
|
||||
# now.
|
||||
pass
|
||||
|
||||
|
||||
class _AppserviceUsersCache(object):
|
||||
"""Attempts to calculate which appservices have users in a given room by
|
||||
looking at state groups and their delta_ids
|
||||
"""
|
||||
|
||||
def __init__(self, store, room_id):
|
||||
self.store = store
|
||||
self.room_id = room_id
|
||||
|
||||
self.linearizer = Linearizer("_AppserviceUsersCache")
|
||||
|
||||
# The last state group we calculated the ASes in the room for.
|
||||
self.state_group = object()
|
||||
|
||||
# A dict of all appservices in the room at the above state group,
|
||||
# along with a user_id of an AS user in the room.
|
||||
# Dict of as_id -> user_id.
|
||||
self.appservices_in_room = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservices_in_room_by_user(self, state_group):
|
||||
"""
|
||||
Args:
|
||||
state_group(str)
|
||||
|
||||
Returns:
|
||||
Deferred[set(str)]: The IDs of all ASes in the room
|
||||
"""
|
||||
assert state_group is not None
|
||||
|
||||
if state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
with (yield self.linearizer.queue(())):
|
||||
# Set of ASes that we need to recalculate their membership of
|
||||
# the room
|
||||
uhandled_ases = set()
|
||||
|
||||
# If the state groups match then there is nothing to do
|
||||
if state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
|
||||
|
||||
# If the prev_group matches the last state group we can calculate
|
||||
# the new value by looking at the deltas
|
||||
if prev_group and prev_group == self.state_group:
|
||||
for (typ, state_key), event_id in delta_ids.iteritems():
|
||||
if typ != EventTypes.Member:
|
||||
continue
|
||||
|
||||
user_id = state_key
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
|
||||
is_join = event.membership == Membership.JOIN
|
||||
for appservice in self.store.get_app_services():
|
||||
as_id = appservice.id
|
||||
|
||||
# If this is a join and the appservice is already in
|
||||
# the room then its a noop
|
||||
if is_join:
|
||||
if as_id in self.appservices_in_room:
|
||||
continue
|
||||
# If this is not a join, then we only need to recalculate
|
||||
# if the AS is in the room and the cached joined AS user
|
||||
# matches this event.
|
||||
elif self.appservices_in_room.get(as_id, None) != user_id:
|
||||
continue
|
||||
|
||||
# If the AS is not interested in the user then its a
|
||||
# noop.
|
||||
if not appservice.is_interested_in_user(user_id):
|
||||
continue
|
||||
|
||||
if is_join:
|
||||
# If an AS user is joining then the AS is now
|
||||
# interested in the room
|
||||
self.appservices_in_room[as_id] = user_id
|
||||
else:
|
||||
# If an AS user has left then we need to
|
||||
# recalcualte if they're in the room.
|
||||
uhandled_ases.add(appservice)
|
||||
self.appservices_in_room.pop(as_id, None)
|
||||
else:
|
||||
uhandled_ases = set(self.store.get_app_services())
|
||||
|
||||
if uhandled_ases:
|
||||
# We need to recalculate which ASes are in the room, so lets
|
||||
# get the current state and try and find a join event
|
||||
# that the AS is interested in.
|
||||
|
||||
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
|
||||
|
||||
for appservice in uhandled_ases:
|
||||
as_id = appservice.id
|
||||
|
||||
self.appservices_in_room.pop(as_id, None)
|
||||
|
||||
for (etype, state_key), event_id in current_state_ids.iteritems():
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
||||
if not appservice.is_interested_in_user(state_key):
|
||||
continue
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
if event.membership == Membership.JOIN:
|
||||
self.appservices_in_room[as_id] = state_key
|
||||
break
|
||||
|
||||
self.state_group = state_group
|
||||
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
@@ -19,7 +19,7 @@ from . import engines
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -85,7 +85,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||
)
|
||||
rows = []
|
||||
for destination, edu in remote_messages_by_destination.items():
|
||||
edu_json = ujson.dumps(edu)
|
||||
edu_json = simplejson.dumps(edu)
|
||||
rows.append((destination, stream_id, now_ms, edu_json))
|
||||
txn.executemany(sql, rows)
|
||||
|
||||
@@ -177,7 +177,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||
" WHERE user_id = ?"
|
||||
)
|
||||
txn.execute(sql, (user_id,))
|
||||
message_json = ujson.dumps(messages_by_device["*"])
|
||||
message_json = simplejson.dumps(messages_by_device["*"])
|
||||
for row in txn:
|
||||
# Add the message for all devices for this user on this
|
||||
# server.
|
||||
@@ -199,7 +199,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||
# Only insert into the local inbox if the device exists on
|
||||
# this server
|
||||
device = row[0]
|
||||
message_json = ujson.dumps(messages_by_device[device])
|
||||
message_json = simplejson.dumps(messages_by_device[device])
|
||||
messages_json_for_user[device] = message_json
|
||||
|
||||
if messages_json_for_user:
|
||||
@@ -253,7 +253,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||
messages = []
|
||||
for row in txn:
|
||||
stream_pos = row[0]
|
||||
messages.append(ujson.loads(row[1]))
|
||||
messages.append(simplejson.loads(row[1]))
|
||||
if len(messages) < limit:
|
||||
stream_pos = current_stream_id
|
||||
return (messages, stream_pos)
|
||||
@@ -389,7 +389,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||
messages = []
|
||||
for row in txn:
|
||||
stream_pos = row[0]
|
||||
messages.append(ujson.loads(row[1]))
|
||||
messages.append(simplejson.loads(row[1]))
|
||||
if len(messages) < limit:
|
||||
stream_pos = current_stream_id
|
||||
return (messages, stream_pos)
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ from twisted.internet import defer
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ from synapse.types import RoomStreamToken
|
||||
from .stream import lower_bound
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
+46
-17
@@ -14,15 +14,19 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from collections import OrderedDict, deque, namedtuple
|
||||
from functools import wraps
|
||||
import logging
|
||||
|
||||
import simplejson as json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.events import USE_FROZEN_DICTS
|
||||
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import (
|
||||
PreserveLoggingContext, make_deferred_yieldable
|
||||
PreserveLoggingContext, make_deferred_yieldable,
|
||||
)
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
@@ -30,16 +34,8 @@ from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.types import get_domain_from_id
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
from collections import deque, namedtuple, OrderedDict
|
||||
from functools import wraps
|
||||
|
||||
import synapse.metrics
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
|
||||
# these are only included to make the type annotations work
|
||||
from synapse.events import EventBase # noqa: F401
|
||||
from synapse.events.snapshot import EventContext # noqa: F401
|
||||
@@ -53,13 +49,25 @@ event_counter = metrics.register_counter(
|
||||
"persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
|
||||
)
|
||||
|
||||
# The number of times we are recalculating the current state
|
||||
state_delta_counter = metrics.register_counter(
|
||||
"state_delta",
|
||||
)
|
||||
# The number of times we are recalculating state when there is only a
|
||||
# single forward extremity
|
||||
state_delta_single_event_counter = metrics.register_counter(
|
||||
"state_delta_single_event",
|
||||
)
|
||||
# The number of times we are reculating state when we could have resonably
|
||||
# calculated the delta when we calculated the state for an event we were
|
||||
# persisting.
|
||||
state_delta_reuse_delta_counter = metrics.register_counter(
|
||||
"state_delta_reuse_delta",
|
||||
)
|
||||
|
||||
|
||||
def encode_json(json_object):
|
||||
if USE_FROZEN_DICTS:
|
||||
# ujson doesn't like frozen_dicts
|
||||
return encode_canonical_json(json_object)
|
||||
else:
|
||||
return json.dumps(json_object, ensure_ascii=False)
|
||||
return frozendict_json_encoder.encode(json_object)
|
||||
|
||||
|
||||
class _EventPeristenceQueue(object):
|
||||
@@ -369,7 +377,8 @@ class EventsStore(EventsWorkerStore):
|
||||
room_id, ev_ctx_rm, latest_event_ids
|
||||
)
|
||||
|
||||
if new_latest_event_ids == set(latest_event_ids):
|
||||
latest_event_ids = set(latest_event_ids)
|
||||
if new_latest_event_ids == latest_event_ids:
|
||||
# No change in extremities, so no change in state
|
||||
continue
|
||||
|
||||
@@ -390,6 +399,26 @@ class EventsStore(EventsWorkerStore):
|
||||
if all_single_prev_not_state:
|
||||
continue
|
||||
|
||||
state_delta_counter.inc()
|
||||
if len(new_latest_event_ids) == 1:
|
||||
state_delta_single_event_counter.inc()
|
||||
|
||||
# This is a fairly handwavey check to see if we could
|
||||
# have guessed what the delta would have been when
|
||||
# processing one of these events.
|
||||
# What we're interested in is if the latest extremities
|
||||
# were the same when we created the event as they are
|
||||
# now. When this server creates a new event (as opposed
|
||||
# to receiving it over federation) it will use the
|
||||
# forward extremities as the prev_events, so we can
|
||||
# guess this by looking at the prev_events and checking
|
||||
# if they match the current forward extremities.
|
||||
for ev, _ in ev_ctx_rm:
|
||||
prev_event_ids = set(e for e, _ in ev.prev_events)
|
||||
if latest_event_ids == prev_event_ids:
|
||||
state_delta_reuse_delta_counter.inc()
|
||||
break
|
||||
|
||||
logger.info(
|
||||
"Calculating state delta for room %s", room_id,
|
||||
)
|
||||
|
||||
@@ -28,7 +28,7 @@ from synapse.api.errors import SynapseError
|
||||
from collections import namedtuple
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
# these are only included to make the type annotations work
|
||||
from synapse.events import EventBase # noqa: F401
|
||||
|
||||
@@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
|
||||
# The category ID for the "default" category. We don't store as null in the
|
||||
|
||||
+26
-24
@@ -21,14 +21,7 @@ from synapse.api.errors import StoreError
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
|
||||
class ProfileStore(SQLBaseStore):
|
||||
def create_profile(self, user_localpart):
|
||||
return self._simple_insert(
|
||||
table="profiles",
|
||||
values={"user_id": user_localpart},
|
||||
desc="create_profile",
|
||||
)
|
||||
|
||||
class ProfileWorkerStore(SQLBaseStore):
|
||||
@defer.inlineCallbacks
|
||||
def get_profileinfo(self, user_localpart):
|
||||
try:
|
||||
@@ -61,14 +54,6 @@ class ProfileStore(SQLBaseStore):
|
||||
desc="get_profile_displayname",
|
||||
)
|
||||
|
||||
def set_profile_displayname(self, user_localpart, new_displayname):
|
||||
return self._simple_update_one(
|
||||
table="profiles",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
updatevalues={"displayname": new_displayname},
|
||||
desc="set_profile_displayname",
|
||||
)
|
||||
|
||||
def get_profile_avatar_url(self, user_localpart):
|
||||
return self._simple_select_one_onecol(
|
||||
table="profiles",
|
||||
@@ -77,14 +62,6 @@ class ProfileStore(SQLBaseStore):
|
||||
desc="get_profile_avatar_url",
|
||||
)
|
||||
|
||||
def set_profile_avatar_url(self, user_localpart, new_avatar_url):
|
||||
return self._simple_update_one(
|
||||
table="profiles",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
updatevalues={"avatar_url": new_avatar_url},
|
||||
desc="set_profile_avatar_url",
|
||||
)
|
||||
|
||||
def get_from_remote_profile_cache(self, user_id):
|
||||
return self._simple_select_one(
|
||||
table="remote_profile_cache",
|
||||
@@ -94,6 +71,31 @@ class ProfileStore(SQLBaseStore):
|
||||
desc="get_from_remote_profile_cache",
|
||||
)
|
||||
|
||||
|
||||
class ProfileStore(ProfileWorkerStore):
|
||||
def create_profile(self, user_localpart):
|
||||
return self._simple_insert(
|
||||
table="profiles",
|
||||
values={"user_id": user_localpart},
|
||||
desc="create_profile",
|
||||
)
|
||||
|
||||
def set_profile_displayname(self, user_localpart, new_displayname):
|
||||
return self._simple_update_one(
|
||||
table="profiles",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
updatevalues={"displayname": new_displayname},
|
||||
desc="set_profile_displayname",
|
||||
)
|
||||
|
||||
def set_profile_avatar_url(self, user_localpart, new_avatar_url):
|
||||
return self._simple_update_one(
|
||||
table="profiles",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
updatevalues={"avatar_url": new_avatar_url},
|
||||
desc="set_profile_avatar_url",
|
||||
)
|
||||
|
||||
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
|
||||
"""Ensure we are caching the remote user's profiles.
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ from twisted.internet import defer
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -460,14 +460,12 @@ class RegistrationStore(RegistrationWorkerStore,
|
||||
"""
|
||||
def _find_next_generated_user_id(txn):
|
||||
txn.execute("SELECT name FROM users")
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
||||
regex = re.compile("^@(\d+):")
|
||||
|
||||
found = set()
|
||||
|
||||
for r in rows:
|
||||
user_id = r["name"]
|
||||
for user_id, in txn:
|
||||
match = regex.search(user_id)
|
||||
if match:
|
||||
found.add(int(match.group(1)))
|
||||
|
||||
+18
-14
@@ -22,7 +22,7 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -157,6 +157,18 @@ class RoomWorkerStore(SQLBaseStore):
|
||||
"get_public_room_changes", get_public_room_changes_txn
|
||||
)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
return self._simple_select_one_onecol(
|
||||
table="blocked_rooms",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
},
|
||||
retcol="1",
|
||||
allow_none=True,
|
||||
desc="is_room_blocked",
|
||||
)
|
||||
|
||||
|
||||
class RoomStore(RoomWorkerStore, SearchStore):
|
||||
|
||||
@@ -485,18 +497,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
else:
|
||||
defer.returnValue(None)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
return self._simple_select_one_onecol(
|
||||
table="blocked_rooms",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
},
|
||||
retcol="1",
|
||||
allow_none=True,
|
||||
desc="is_room_blocked",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def block_room(self, room_id, user_id):
|
||||
yield self._simple_insert(
|
||||
@@ -507,7 +507,11 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
},
|
||||
desc="block_room",
|
||||
)
|
||||
self.is_room_blocked.invalidate((room_id,))
|
||||
yield self.runInteraction(
|
||||
"block_room_invalidation",
|
||||
self._invalidate_cache_and_stream,
|
||||
self.is_room_blocked, (room_id,),
|
||||
)
|
||||
|
||||
def get_media_mxcs_in_room(self, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
@@ -28,7 +28,7 @@ from synapse.api.constants import Membership, EventTypes
|
||||
from synapse.types import get_domain_from_id
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
|
||||
# @defer.inlineCallbacks
|
||||
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
|
||||
# We don't use `state_group`, its there so that we can cache based
|
||||
# on it. However, its important that its never None, since two current_state's
|
||||
|
||||
@@ -12,9 +12,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -66,7 +66,7 @@ def run_create(cur, database_engine, *args, **kwargs):
|
||||
"max_stream_id_exclusive": max_stream_id + 1,
|
||||
"rows_inserted": 0,
|
||||
}
|
||||
progress_json = ujson.dumps(progress)
|
||||
progress_json = simplejson.dumps(progress)
|
||||
|
||||
sql = (
|
||||
"INSERT into background_updates (update_name, progress_json)"
|
||||
|
||||
@@ -16,7 +16,7 @@ import logging
|
||||
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -45,7 +45,7 @@ def run_create(cur, database_engine, *args, **kwargs):
|
||||
"max_stream_id_exclusive": max_stream_id + 1,
|
||||
"rows_inserted": 0,
|
||||
}
|
||||
progress_json = ujson.dumps(progress)
|
||||
progress_json = simplejson.dumps(progress)
|
||||
|
||||
sql = (
|
||||
"INSERT into background_updates (update_name, progress_json)"
|
||||
|
||||
@@ -16,7 +16,7 @@ from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
|
||||
import logging
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -49,7 +49,7 @@ def run_create(cur, database_engine, *args, **kwargs):
|
||||
"rows_inserted": 0,
|
||||
"have_added_indexes": False,
|
||||
}
|
||||
progress_json = ujson.dumps(progress)
|
||||
progress_json = simplejson.dumps(progress)
|
||||
|
||||
sql = (
|
||||
"INSERT into background_updates (update_name, progress_json)"
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
|
||||
import logging
|
||||
import ujson
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -44,7 +44,7 @@ def run_create(cur, database_engine, *args, **kwargs):
|
||||
"max_stream_id_exclusive": max_stream_id + 1,
|
||||
"rows_inserted": 0,
|
||||
}
|
||||
progress_json = ujson.dumps(progress)
|
||||
progress_json = simplejson.dumps(progress)
|
||||
|
||||
sql = (
|
||||
"INSERT into background_updates (update_name, progress_json)"
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
from collections import namedtuple
|
||||
import logging
|
||||
import re
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
@@ -240,6 +240,9 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
(
|
||||
"AND type = ? AND state_key = ?",
|
||||
(etype, state_key)
|
||||
) if state_key is not None else (
|
||||
"AND type = ?",
|
||||
(etype,)
|
||||
)
|
||||
for etype, state_key in types
|
||||
]
|
||||
@@ -259,10 +262,19 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
key = (typ, state_key)
|
||||
results[group][key] = event_id
|
||||
else:
|
||||
where_args = []
|
||||
where_clauses = []
|
||||
wildcard_types = False
|
||||
if types is not None:
|
||||
where_clause = "AND (%s)" % (
|
||||
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
|
||||
)
|
||||
for typ in types:
|
||||
if typ[1] is None:
|
||||
where_clauses.append("(type = ?)")
|
||||
where_args.extend(typ[0])
|
||||
wildcard_types = True
|
||||
else:
|
||||
where_clauses.append("(type = ? AND state_key = ?)")
|
||||
where_args.extend([typ[0], typ[1]])
|
||||
where_clause = "AND (%s)" % (" OR ".join(where_clauses))
|
||||
else:
|
||||
where_clause = ""
|
||||
|
||||
@@ -279,7 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
# after we finish deduping state, which requires this func)
|
||||
args = [next_group]
|
||||
if types:
|
||||
args.extend(i for typ in types for i in typ)
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(
|
||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||
@@ -292,9 +304,17 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
if (typ, state_key) not in results[group]
|
||||
)
|
||||
|
||||
# If the lengths match then we must have all the types,
|
||||
# so no need to go walk further down the tree.
|
||||
if types is not None and len(results[group]) == len(types):
|
||||
# If the number of entries in the (type,state_key)->event_id dict
|
||||
# matches the number of (type,state_keys) types we were searching
|
||||
# for, then we must have found them all, so no need to go walk
|
||||
# further down the tree... UNLESS our types filter contained
|
||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||
# search
|
||||
if (
|
||||
types is not None and
|
||||
not wildcard_types and
|
||||
len(results[group]) == len(types)
|
||||
):
|
||||
break
|
||||
|
||||
next_group = self._simple_select_one_onecol_txn(
|
||||
|
||||
@@ -19,7 +19,7 @@ from synapse.storage.account_data import AccountDataWorkerStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from twisted.internet import defer
|
||||
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -23,7 +23,7 @@ from canonicaljson import encode_canonical_json
|
||||
from collections import namedtuple
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -667,7 +667,7 @@ class UserDirectoryStore(SQLBaseStore):
|
||||
# The array of numbers are the weights for the various part of the
|
||||
# search: (domain, _, display name, localpart)
|
||||
sql = """
|
||||
SELECT d.user_id, display_name, avatar_url
|
||||
SELECT d.user_id AS user_id, display_name, avatar_url
|
||||
FROM user_directory_search
|
||||
INNER JOIN user_directory AS d USING (user_id)
|
||||
%s
|
||||
@@ -702,7 +702,7 @@ class UserDirectoryStore(SQLBaseStore):
|
||||
search_query = _parse_query_sqlite(search_term)
|
||||
|
||||
sql = """
|
||||
SELECT d.user_id, display_name, avatar_url
|
||||
SELECT d.user_id AS user_id, display_name, avatar_url
|
||||
FROM user_directory_search
|
||||
INNER JOIN user_directory AS d USING (user_id)
|
||||
%s
|
||||
|
||||
@@ -132,9 +132,13 @@ class DictionaryCache(object):
|
||||
self._update_or_insert(key, value, known_absent)
|
||||
|
||||
def _update_or_insert(self, key, value, known_absent):
|
||||
entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {}))
|
||||
# We pop and reinsert as we need to tell the cache the size may have
|
||||
# changed
|
||||
|
||||
entry = self.cache.pop(key, DictionaryEntry(False, set(), {}))
|
||||
entry.value.update(value)
|
||||
entry.known_absent.update(known_absent)
|
||||
self.cache[key] = entry
|
||||
|
||||
def _insert(self, key, value, known_absent):
|
||||
self.cache[key] = DictionaryEntry(True, known_absent, value)
|
||||
|
||||
@@ -154,14 +154,21 @@ class LruCache(object):
|
||||
def cache_set(key, value, callbacks=[]):
|
||||
node = cache.get(key, None)
|
||||
if node is not None:
|
||||
if value != node.value:
|
||||
# We sometimes store large objects, e.g. dicts, which cause
|
||||
# the inequality check to take a long time. So let's only do
|
||||
# the check if we have some callbacks to call.
|
||||
if node.callbacks and value != node.value:
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.callbacks.clear()
|
||||
|
||||
if size_callback:
|
||||
cached_cache_len[0] -= size_callback(node.value)
|
||||
cached_cache_len[0] += size_callback(value)
|
||||
# We don't bother to protect this by value != node.value as
|
||||
# generally size_callback will be cheap compared with equality
|
||||
# checks. (For example, taking the size of two dicts is quicker
|
||||
# than comparing them for equality.)
|
||||
if size_callback:
|
||||
cached_cache_len[0] -= size_callback(node.value)
|
||||
cached_cache_len[0] += size_callback(value)
|
||||
|
||||
node.callbacks.update(callbacks)
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from frozendict import frozendict
|
||||
import simplejson as json
|
||||
|
||||
|
||||
def freeze(o):
|
||||
@@ -49,3 +50,21 @@ def unfreeze(o):
|
||||
pass
|
||||
|
||||
return o
|
||||
|
||||
|
||||
def _handle_frozendict(obj):
|
||||
"""Helper for EventEncoder. Makes frozendicts serializable by returning
|
||||
the underlying dict
|
||||
"""
|
||||
if type(obj) is frozendict:
|
||||
# fishing the protected dict out of the object is a bit nasty,
|
||||
# but we don't really want the overhead of copying the dict.
|
||||
return obj._dict
|
||||
raise TypeError('Object of type %s is not JSON serializable' %
|
||||
obj.__class__.__name__)
|
||||
|
||||
|
||||
# A JSONEncoder which is capable of encoding frozendics without barfing
|
||||
frozendict_json_encoder = json.JSONEncoder(
|
||||
default=_handle_frozendict,
|
||||
)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import logging
|
||||
|
||||
@@ -45,7 +45,7 @@ def create_resource_tree(desired_tree, root_resource):
|
||||
for path_seg in full_path.split('/')[1:-1]:
|
||||
if path_seg not in last_resource.listNames():
|
||||
# resource doesn't exist, so make a "dummy resource"
|
||||
child_resource = Resource()
|
||||
child_resource = NoResource()
|
||||
last_resource.putChild(path_seg, child_resource)
|
||||
res_id = _resource_id(last_resource, path_seg)
|
||||
resource_mappings[res_id] = child_resource
|
||||
|
||||
@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.event.sender = "@irc_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_user_id_prefix_no_match(self):
|
||||
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.event.sender = "@someone_else:matrix.org"
|
||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
||||
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_member_is_checked(self):
|
||||
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
self.event.sender = "@someone_else:matrix.org"
|
||||
self.event.type = "m.room.member"
|
||||
self.event.state_key = "@irc_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_id_match(self):
|
||||
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||
)
|
||||
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_id_no_match(self):
|
||||
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||
)
|
||||
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
|
||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
||||
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_alias_match(self):
|
||||
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
self.store.get_aliases_for_room.return_value = [
|
||||
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
|
||||
]
|
||||
self.store.get_users_in_room.return_value = []
|
||||
self.store.get_appservices_with_user_in_room.return_value = []
|
||||
self.assertFalse((yield self.service.is_interested(
|
||||
self.event, self.store
|
||||
)))
|
||||
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
||||
}
|
||||
self.event.state_key = self.service.sender
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_member_list_match(self):
|
||||
self.service.namespaces[ApplicationService.NS_USERS].append(
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.store.get_users_in_room.return_value = [
|
||||
"@alice:here",
|
||||
"@irc_fo:here", # AS user
|
||||
"@bob:here",
|
||||
]
|
||||
self.store.get_aliases_for_room.return_value = []
|
||||
|
||||
self.event.sender = "@xmpp_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(
|
||||
event=self.event, store=self.store
|
||||
)))
|
||||
|
||||
Reference in New Issue
Block a user