1
0

Compare commits

..

57 Commits

Author SHA1 Message Date
Erik Johnston
22245fb8a7 Actuall set cache factors in workers 2018-02-21 21:10:48 +00:00
Erik Johnston
78c5eca141 Add hacky cache factor override system 2018-02-21 21:00:24 +00:00
Richard van der Hoff
55bef59cc7 (Really) fix tablescan of event_push_actions on purge
commit 278d21b5 added new code to avoid the tablescan, but didn't remove the
old :/
2018-02-20 10:37:44 +00:00
Erik Johnston
ad683cd203 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-02-15 14:10:11 +00:00
Erik Johnston
6ed9ff69c2 Merge pull request #2873 from matrix-org/erikj/event_creator_no_state
Don't serialize current state over replication
2018-02-15 14:09:42 +00:00
Erik Johnston
106906a65e Don't serialize current state over replication 2018-02-15 13:53:18 +00:00
Erik Johnston
d66afef01e Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-02-15 13:03:05 +00:00
Erik Johnston
5fb347fc41 Merge pull request #2872 from matrix-org/erikj/event_worker_dont_log
Don't log errors propogated from send_event
2018-02-15 12:31:49 +00:00
Erik Johnston
cd94728e93 Merge pull request #2871 from matrix-org/erikj/event_creator_state
Fix state group storage bug in workers
2018-02-15 11:34:01 +00:00
Erik Johnston
fd1601c596 Fix state group storage bug in workers
We needed to move `_count_state_group_hops_txn` to the
StateGroupWorkerStore.
2018-02-15 11:04:32 +00:00
Erik Johnston
ef344b10e5 Don't log errors propogated from send_event 2018-02-15 11:03:49 +00:00
Richard van der Hoff
b07a33f024 Avoid doing presence updates on replication reconnect
Presence is supposed to be disabled on matrix.org, so we shouldn't send a load
of USER_SYNC commands every time the synchrotron reconnects to the master.
2018-02-15 09:51:09 +00:00
hera
74539aa25b Disable auth on room_members for now
because the moznet bridge is broken (https://github.com/matrix-org/matrix-appservice-irc/issues/506)
2018-02-15 09:51:09 +00:00
Erik Johnston
8f25cd6627 Bump LAST_SEEN_GRANULARITY in client_ips 2018-02-15 09:51:09 +00:00
Erik Johnston
dc1299a4b0 Prefill client_ip_last_seen in replication 2018-02-15 09:51:09 +00:00
Erik Johnston
2c72d66cda Move event sending to end in shutdown room admin api 2018-02-15 09:51:09 +00:00
Erik Johnston
cde90a89ed Add dummy presence REST handler to frontend proxy
The handler no-ops all requests as presence is disabled.
2018-02-15 09:51:09 +00:00
Erik Johnston
5aec53ad95 Don't intern type/state_keys in state store 2018-02-15 09:51:09 +00:00
Erik Johnston
d10d19f0ad Increase store._state_group_cache cache size 2018-02-15 09:51:09 +00:00
Erik Johnston
daec1d77be Make _get_joined_hosts_cache cache non-iterable 2018-02-15 09:51:09 +00:00
Erik Johnston
61885f7849 Increase MAX_EVENTS_BEHIND for replication clients 2018-02-15 09:51:09 +00:00
Erik Johnston
ba30d489d9 Disable presence in txn queue 2018-02-15 09:51:09 +00:00
Erik Johnston
5e2d0650df Handle exceptions in get_hosts_for_room when sending events over federation 2018-02-15 09:51:09 +00:00
Erik Johnston
841bcbcafa Limit concurrent AS joins 2018-02-15 09:51:09 +00:00
Erik Johnston
08a6b88e3d Disable presence
This reverts commit 0ebd376a53 and
disables presence a bit more
2018-02-15 09:51:09 +00:00
Erik Johnston
aece8e73b1 Make push actions rotation configurable 2018-02-15 09:51:09 +00:00
Mark Haines
328bd35e00 Deleting from event_push_actions needs to use an index 2018-02-15 09:51:09 +00:00
Erik Johnston
7de9a28b8e Disable auto search for prefixes in event search 2018-02-15 09:51:09 +00:00
Erik Johnston
2a9c3aea89 Add timeout to ResponseCache of /public_rooms 2018-02-15 09:51:09 +00:00
Richard van der Hoff
b8d821aa68 Merge pull request #2867 from matrix-org/rav/rework_purge
purge_history cleanups
2018-02-15 09:49:07 +00:00
Richard van der Hoff
d28ec43e15 Merge pull request #2769 from matrix-org/matthew/hit_the_gin
switch back from GIST to GIN indexes
2018-02-14 16:59:03 +00:00
Richard van der Hoff
39bf47319f purge_history: fix sqlite syntax error
apparently sqlite insists on indexes being named
2018-02-14 16:42:19 +00:00
Richard van der Hoff
ac27f6a35e purge_history: handle sqlite asshattery
apparently creating a temporary table commits the transaction. because that's a
useful thing.
2018-02-14 16:41:12 +00:00
Richard van der Hoff
5978dccff0 remove overzealous exception handling 2018-02-14 15:54:09 +00:00
Richard van der Hoff
278d21b5e4 purge_history: fix index use
event_push_actions doesn't have an index on event_id, so we need to specify
room_id.
2018-02-14 15:44:51 +00:00
Richard van der Hoff
5fcbf1e07c Rework event purge to use a temporary table
... which should speed things up by reducing the amount of data being shuffled
across the connection
2018-02-14 11:02:22 +00:00
Erik Johnston
c0c9327fe0 Merge pull request #2854 from matrix-org/erikj/event_create_worker
Create a worker for event creation
2018-02-13 18:07:10 +00:00
Erik Johnston
059d3a6c8e Update docs 2018-02-13 17:53:56 +00:00
Richard van der Hoff
ddb6a79b68 Merge branch 'matthew/gin_work_mem' into matthew/hit_the_gin 2018-02-13 16:45:36 +00:00
Richard van der Hoff
0b27ae8dc3 move search reindex to schema 47
We're up to schema v47 on develop now, so this will have to go in there to have
an effect.

This might cause an error if somebody has already run it in the v46 guise, and
runs it again in the v47 guise, because it will cause a duplicate entry in the
bbackground_updates table. On the other hand, the entry is removed once it is
complete, and it is unlikely that anyone other than matrix.org has run it on
v46. The update itself is harmless to re-run because it deliberately copes with
the index already existing.
2018-02-13 16:44:46 +00:00
Richard van der Hoff
4a6d551704 GIN reindex: Fix syntax errors, improve exception handling 2018-02-13 16:44:46 +00:00
Richard van der Hoff
a9b712e9dc Merge branch 'develop' into matthew/gin_work_mem 2018-02-13 12:16:01 +00:00
Erik Johnston
32c7b8e48b Update workers docs to include http port 2018-02-12 17:21:23 +00:00
Erik Johnston
f133228cb3 Add note in docs/workers.rst 2018-02-07 10:34:31 +00:00
Erik Johnston
50fe92cd26 Move presence handling into handle_new_client_event
As we want to have it run on the main synapse instance
2018-02-07 10:34:09 +00:00
Erik Johnston
8ec2e638be Add event_creator worker 2018-02-07 10:32:32 +00:00
Erik Johnston
24dd73028a Add replication http endpoint for event sending 2018-02-07 10:32:32 +00:00
Richard van der Hoff
80b8a28100 Factor out common code for search insert
we can reuse the same code as is used for event insert, for doing the
background index population.
2018-02-04 00:23:06 +00:00
Richard van der Hoff
bd25f9cf36 Clean up work_mem handling
Add some comments and improve exception handling when twiddling work_mem for
the search update
2018-02-03 23:05:41 +00:00
Richard van der Hoff
4eeae7ad65 Move store_event_search_txn to SearchStore
... as a precursor to making event storing and doing the bg update share some
code.
2018-02-03 22:59:45 +00:00
Richard van der Hoff
bb9f0f3cdb Merge branch 'develop' into matthew/gin_work_mem 2018-02-03 22:40:28 +00:00
Richard van der Hoff
6b02fc80d1 Reinstate event_search_postgres_gist handler
People may have queued updates for this, so we can't just delete it.
2018-02-02 14:32:51 +00:00
hera
174eacc8ba oops 2018-01-09 18:14:32 +00:00
Matthew Hodgson
a66f489678 fix GIST->GIN switch 2018-01-09 16:55:51 +00:00
Matthew Hodgson
e79db0a673 switch back from GIST to GIN indexes 2018-01-09 16:37:48 +00:00
Matthew Hodgson
e365ad329f oops, tweak work_mem when actually storing 2018-01-09 16:30:30 +00:00
Matthew Hodgson
19f9227643 avoid 80s GIN inserts by tweaking work_mem
see https://github.com/matrix-org/synapse/issues/2753 for details
2018-01-09 16:25:04 +00:00
40 changed files with 972 additions and 187 deletions

View File

@@ -30,17 +30,29 @@ requests made to the federation port. The caveats regarding running a
reverse-proxy on the federation port still apply (see
https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port).
To enable workers, you need to add a replication listener to the master synapse, e.g.::
To enable workers, you need to add two replication listeners to the master
synapse, e.g.::
listeners:
# The TCP replication port
- port: 9092
bind_address: '127.0.0.1'
type: replication
# The HTTP replication port
- port: 9093
bind_address: '127.0.0.1'
type: http
resources:
- names: [replication]
Under **no circumstances** should this replication API listener be exposed to the
public internet; it currently implements no authentication whatsoever and is
Under **no circumstances** should these replication API listeners be exposed to
the public internet; it currently implements no authentication whatsoever and is
unencrypted.
(Roughly, the TCP port is used for streaming data from the master to the
workers, and the HTTP port for the workers to send data to the main
synapse process.)
You then create a set of configs for the various worker processes. These
should be worker configuration files, and should be stored in a dedicated
subdirectory, to allow synctl to manipulate them.
@@ -52,8 +64,13 @@ You should minimise the number of overrides though to maintain a usable config.
You must specify the type of worker application (``worker_app``). The currently
available worker applications are listed below. You must also specify the
replication endpoint that it's talking to on the main synapse process
(``worker_replication_host`` and ``worker_replication_port``).
replication endpoints that it's talking to on the main synapse process.
``worker_replication_host`` should specify the host of the main synapse,
``worker_replication_port`` should point to the TCP replication listener port and
``worker_replication_http_port`` should point to the HTTP replication port.
Currently, only the ``event_creator`` worker requires specifying
``worker_replication_http_port``.
For instance::
@@ -62,6 +79,7 @@ For instance::
# The replication listener on the synapse to talk to.
worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_replication_http_port: 9093
worker_listeners:
- type: http
@@ -207,3 +225,14 @@ the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
file. For example::
worker_main_http_uri: http://127.0.0.1:8008
``synapse.app.event_creator``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles non-state event creation. It can handle REST endpoints matching:
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import RoomSendEventRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import reactor
from twisted.web.resource import Resource
logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
SlavedDeviceStore,
SlavedClientIpStore,
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
BaseSlavedStore,
):
pass
class EventCreatorServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
RoomSendEventRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
"/_matrix/client/api/v1": resource,
})
root_resource = create_resource_tree(resources, Resource())
_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
)
)
logger.info("Synapse event creator now listening on port %d", port)
def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
)
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
self.get_tcp_replication().start_replication(self)
def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())
def start(config_options):
try:
config = HomeServerConfig.load_config(
"Synapse event creator", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.event_creator"
assert config.worker_replication_http_port is not None
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-event-creator", config)
if __name__ == '__main__':
with LoggingContext("main"):
start(sys.argv[1:])

View File

@@ -36,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -49,6 +50,35 @@ from twisted.web.resource import Resource
logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,

View File

@@ -38,6 +38,7 @@ from synapse.metrics import register_memory_metrics
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey
@@ -219,6 +220,9 @@ class SynapseHomeServer(HomeServer):
if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self)
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
return resources
def start_listening(self):

View File

@@ -117,6 +117,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
@@ -214,6 +215,8 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
# presence is disabled on matrix.org, so we return the empty set
return set()
return [
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
if count > 0

View File

@@ -108,7 +108,7 @@ def stop(pidfile, app):
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor"
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
])
@@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
@@ -211,6 +215,10 @@ def main():
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
worker_cache_factors = (
worker_config.get("synctl_cache_factors")
or cache_factors
)
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -226,8 +234,10 @@ def main():
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
worker_cache_factors,
))
action = options.action
@@ -261,15 +271,19 @@ def main():
start(configfile)
for worker in workers:
env = os.environ.copy()
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
for cache_name, factor in worker.cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
start_worker(worker.app, configfile, worker.configfile)
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
else:
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
# Reset env back to the original
os.environ.clear()
os.environ.update(env)
if __name__ == "__main__":

View File

@@ -33,8 +33,16 @@ class WorkerConfig(Config):
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
self.worker_log_config = config.get("worker_log_config")
# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None)
# The port on the main synapse for TCP replication
self.worker_replication_port = config.get("worker_replication_port", None)
# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")
self.worker_name = config.get("worker_name", self.worker_app)
self.worker_main_http_uri = config.get("worker_main_http_uri", None)

View File

@@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from frozendict import frozendict
class EventContext(object):
"""
@@ -73,3 +77,100 @@ class EventContext(object):
self.prev_state_events = None
self.app_service = None
def serialize(self, event):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
Args:
event (FrozenEvent): The event that this context relates to
Returns:
dict
"""
# We don't serialize the full state dicts, instead they get pulled out
# of the DB on the other side. However, the other side can't figure out
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
return {
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
"state_group": self.state_group,
"rejected": self.rejected,
"push_actions": self.push_actions,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}
@staticmethod
@defer.inlineCallbacks
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
Args:
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`
Returns:
EventContext
"""
context = EventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.push_actions = input["push_actions"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
prev_state_id = input["prev_state_id"]
event_type = input["event_type"]
event_state_key = input["event_state_key"]
context.current_state_ids = yield store.get_state_ids_for_group(
context.state_group,
)
if prev_state_id and event_state_key:
context.prev_state_ids = dict(context.current_state_ids)
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
else:
context.prev_state_ids = context.current_state_ids
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)
defer.returnValue(context)
def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in
JSON we need to convert them to a form that can.
"""
if state_dict is None:
return None
return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
]
def _decode_state_dict(input):
"""Decodes a state dict encoded using `_encode_state_dict` above
"""
if input is None:
return None
return frozendict({(etype, state_key,): v for etype, state_key, v in input})

View File

@@ -184,17 +184,22 @@ class TransactionQueue(object):
if not is_mine and send_on_behalf_of is None:
continue
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
except Exception:
logger.exception("Failed to calculate hosts in room")
continue
destinations = set(destinations)
if send_on_behalf_of is not None:
@@ -254,6 +259,7 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled

View File

@@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,

View File

@@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze
from synapse.visibility import filter_events_for_client
from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
@@ -282,7 +283,7 @@ class MessageHandler(BaseHandler):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or becuase there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -312,6 +313,9 @@ class EventCreationHandler(object):
self.server_name = hs.hostname
self.ratelimiter = hs.get_ratelimiter()
self.notifier = hs.get_notifier()
self.config = hs.config
self.http_client = hs.get_simple_http_client()
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -419,12 +423,6 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
)
if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
preserve_fn(presence.bump_presence_active_time)(user)
@defer.inlineCallbacks
def deduplicate_state_event(self, event, context):
"""
@@ -559,6 +557,18 @@ class EventCreationHandler(object):
):
# We now need to go and hit out to wherever we need to hit out to.
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
)
return
if ratelimit:
yield self.base_handler.ratelimit(requester)
@@ -692,3 +702,9 @@ class EventCreationHandler(object):
)
preserve_fn(_notify)()
if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
preserve_fn(presence.bump_presence_active_time)(requester.user)

View File

@@ -372,6 +372,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -401,6 +402,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -443,6 +445,8 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
# presence is disabled on matrix.org, so we return the empty set
return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
@@ -462,6 +466,7 @@ class PresenceHandler(object):
syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server.
"""
return
# Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = (

View File

@@ -44,7 +44,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,

View File

@@ -28,7 +28,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer
from synapse.util.async import Linearizer, Limiter
from synapse.util.distributor import user_left_room, user_joined_room
from ._base import BaseHandler
@@ -50,6 +50,7 @@ class RoomMemberHandler(BaseHandler):
self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
self.member_limiter = Limiter(3)
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -161,18 +162,23 @@ class RoomMemberHandler(BaseHandler):
):
key = (room_id,)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
as_id = object()
if requester.app_service:
as_id = requester.app_service.id
with (yield self.member_limiter.queue(as_id)):
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
defer.returnValue(result)

View File

@@ -585,7 +585,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if False and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)

View File

@@ -37,7 +37,6 @@ import collections
import logging
import urllib
import ujson
import simplejson
logger = logging.getLogger(__name__)
@@ -417,7 +416,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
json_bytes = encode_canonical_json(json_object)
else:
# ujson doesn't like frozen_dicts.
json_bytes = simplejson.dumps(json_object)
json_bytes = ujson.dumps(json_object, ensure_ascii=False)
return respond_with_json_bytes(
request, code, json_bytes,

View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import send_event
from synapse.http.server import JsonResource
REPLICATION_PREFIX = "/_synapse/replication"
class ReplicationRestResource(JsonResource):
def __init__(self, hs):
JsonResource.__init__(self, hs, canonical_json=False)
self.register_servlets(hs)
def register_servlets(self, hs):
send_event.register_servlets(hs, self)

View File

@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.errors import SynapseError, MatrixCodeMessageException
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.metrics import Measure
from synapse.types import Requester
import logging
import re
logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context):
"""Send event to be handled on the master
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
event (FrozenEvent)
context (EventContext)
"""
uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": context.serialize(event),
"requester": requester.serialize(),
}
try:
result = yield client.post_json_get_json(uri, payload)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise SynapseError(e.code, e.msg, e.errcode)
defer.returnValue(result)
class ReplicationSendEventRestServlet(RestServlet):
"""Handles events newly created on workers, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/send_event
{
"event": { .. serialized event .. },
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
}
"""
PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__()
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@defer.inlineCallbacks
def on_POST(self, request):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
event_dict = content["event"]
internal_metadata = content["internal_metadata"]
rejected_reason = content["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])
if requester.user:
request.authenticated_entity = requester.user.to_string()
logger.info(
"Got event to send with ID: %s into room: %s",
event.event_id, event.room_id,
)
yield self.event_creation_handler.handle_new_client_event(
requester, event, context,
)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
ReplicationSendEventRestServlet(hs).register(http_server)

View File

@@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

View File

@@ -21,6 +21,7 @@ from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore
from synapse.storage.signatures import SignatureStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -170,6 +171,25 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
get_latest_event_ids_and_hashes_in_room = (
DataStore.get_latest_event_ids_and_hashes_in_room.__func__
)
_get_latest_event_ids_and_hashes_in_room = (
DataStore._get_latest_event_ids_and_hashes_in_room.__func__
)
_get_event_reference_hashes_txn = (
DataStore._get_event_reference_hashes_txn.__func__
)
add_event_hashes = (
DataStore.add_event_hashes.__func__
)
get_event_reference_hashes = (
SignatureStore.__dict__["get_event_reference_hashes"]
)
get_event_reference_hash = (
SignatureStore.__dict__["get_event_reference_hash"]
)
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()

View File

@@ -33,7 +33,7 @@ import logging
logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", (

View File

@@ -212,17 +212,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -260,6 +249,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(

View File

@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
yield self.presence_handler.set_state(user, state)
# yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))

View File

@@ -186,7 +186,6 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomSendEventRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
def register(self, http_server):

View File

@@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit
import itertools
import logging
import simplejson as json
import ujson as json
logger = logging.getLogger(__name__)

View File

@@ -99,6 +99,19 @@ class ApplicationServiceStore(SQLBaseStore):
return service
return None
def get_app_service_by_id(self, as_id):
"""Get the application service with the given appservice ID.
Args:
as_id (str): The application service ID.
Returns:
synapse.appservice.ApplicationService or None.
"""
for service in self.services_cache:
if service.id == as_id:
return service
return None
def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service.

View File

@@ -242,6 +242,25 @@ class BackgroundUpdateStore(SQLBaseStore):
"""
self._background_update_handlers[update_name] = update_handler
def register_noop_background_update(self, update_name):
"""Register a noop handler for a background update.
This is useful when we previously did a background update, but no
longer wish to do the update. In this case the background update should
be removed from the schema delta files, but there may still be some
users who have the background update queued, so this method should
also be called to clear the update.
Args:
update_name (str): Name of update
"""
@defer.inlineCallbacks
def noop_update(progress, batch_size):
yield self._end_background_update(update_name)
defer.returnValue(1)
self.register_background_update_handler(update_name, noop_update)
def register_background_index_update(self, update_name, index_name,
table, columns, where_clause=None,
unique=False,

View File

@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):

View File

@@ -87,6 +87,8 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
@@ -629,7 +631,7 @@ class EventPushActionsStore(SQLBaseStore):
)
if caught_up:
break
yield sleep(5)
yield sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@@ -650,8 +652,8 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""", (old_rotate_stream_ordering,))
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row

View File

@@ -37,7 +37,7 @@ from functools import wraps
import synapse.metrics
import logging
import simplejson as json
import ujson as json
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
@@ -1043,7 +1043,6 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1063,6 +1062,14 @@ class EventsStore(SQLBaseStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
@@ -2093,6 +2100,30 @@ class EventsStore(SQLBaseStore):
# state_groups
# state_groups_state
# we will build a temporary table listing the events so that we don't
# have to keep shovelling the list back and forth across the
# connection. Annoyingly the python sqlite driver commits the
# transaction on CREATE, so let's do this first.
#
# furthermore, we might already have the table from a previous (failed)
# purge attempt, so let's drop the table first.
txn.execute("DROP TABLE IF EXISTS events_to_purge")
txn.execute(
"CREATE TEMPORARY TABLE events_to_purge ("
" event_id TEXT NOT NULL,"
" should_delete BOOLEAN NOT NULL"
")"
)
# create an index on should_delete because later we'll be looking for
# the should_delete / shouldn't_delete subsets
txn.execute(
"CREATE INDEX events_to_purge_should_delete"
" ON events_to_purge(should_delete)",
)
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@@ -2115,23 +2146,30 @@ class EventsStore(SQLBaseStore):
logger.info("[purge] looking for events to delete")
should_delete_expr = "state_key IS NULL"
should_delete_params = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
should_delete_params += (room_id, topological_ordering)
txn.execute(
"SELECT event_id, state_key FROM events"
" LEFT JOIN state_events USING (room_id, event_id)"
" WHERE room_id = ? AND topological_ordering < ?",
(room_id, topological_ordering,)
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE e.room_id = ? AND topological_ordering < ?" % (
should_delete_expr,
),
should_delete_params,
)
txn.execute(
"SELECT event_id, should_delete FROM events_to_purge"
)
event_rows = txn.fetchall()
to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and (
delete_local_events or not self.hs.is_mine_id(event_id)
)
]
logger.info(
"[purge] found %i events before cutoff, of which %i can be deleted",
len(event_rows), len(to_delete),
len(event_rows), sum(1 for e in event_rows if e[1]),
)
logger.info("[purge] Finding new backward extremities")
@@ -2139,12 +2177,11 @@ class EventsStore(SQLBaseStore):
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
"SELECT DISTINCT e.event_id FROM events as e"
" INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events as e2 ON e2.event_id = ed.event_id"
" WHERE e.room_id = ? AND e.topological_ordering < ?"
" AND e2.topological_ordering >= ?",
(room_id, topological_ordering, topological_ordering)
"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
" WHERE e2.topological_ordering >= ?",
(topological_ordering, )
)
new_backwards_extrems = txn.fetchall()
@@ -2172,12 +2209,11 @@ class EventsStore(SQLBaseStore):
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
" SELECT DISTINCT state_group FROM events"
" SELECT DISTINCT state_group FROM events_to_purge"
" INNER JOIN event_to_state_groups USING (event_id)"
" WHERE room_id = ? AND topological_ordering < ?"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(room_id, topological_ordering, topological_ordering)
(topological_ordering, )
)
state_rows = txn.fetchall()
@@ -2262,9 +2298,9 @@ class EventsStore(SQLBaseStore):
)
logger.info("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
txn.execute(
"DELETE FROM event_to_state_groups "
"WHERE event_id IN (SELECT event_id from events_to_purge)"
)
for event_id, _ in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (
@@ -2281,7 +2317,6 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -2289,22 +2324,35 @@ class EventsStore(SQLBaseStore):
):
logger.info("[purge] removing events from %s", table)
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
txn.execute(
"DELETE FROM %s WHERE event_id IN ("
" SELECT event_id FROM events_to_purge WHERE should_delete"
")" % (table,),
)
# event_push_actions lacks an index on event_id, and has one on
# (room_id, event_id) instead.
for table in (
"event_push_actions",
):
logger.info("[purge] removing events from %s", table)
txn.execute(
"DELETE FROM %s WHERE room_id = ? AND event_id IN ("
" SELECT event_id FROM events_to_purge WHERE should_delete"
")" % (table,),
(room_id, )
)
# Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers")
txn.executemany(
txn.execute(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
[
(True, event_id,) for event_id, state_key in event_rows
if state_key is not None or (
not delete_local_events and self.hs.is_mine_id(event_id)
)
]
" WHERE event_id IN ("
" SELECT event_id FROM events_to_purge "
" WHERE NOT should_delete"
")",
(True,),
)
# synapse tries to take out an exclusive lock on room_depth whenever it
@@ -2319,6 +2367,12 @@ class EventsStore(SQLBaseStore):
(topological_ordering, room_id,)
)
# finally, drop the temp table. this will commit the txn in sqlite,
# so make sure to keep this actually last.
txn.execute(
"DROP TABLE events_to_purge"
)
logger.info("[purge] done")
@defer.inlineCallbacks

View File

@@ -39,12 +39,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
# we no longer use refresh tokens, but it's possible that some people
# might have a background update queued to build this index. Just
# clear the background update.
@defer.inlineCallbacks
def noop_update(progress, batch_size):
yield self._end_background_update("refresh_tokens_device_index")
defer.returnValue(1)
self.register_background_update_handler(
"refresh_tokens_device_index", noop_update)
self.register_noop_background_update("refresh_tokens_device_index")
@defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):

View File

@@ -675,7 +675,7 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(result)
@cached(max_entries=10000, iterable=True)
@cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)

View File

@@ -13,5 +13,7 @@
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('event_search_postgres_gist', '{}');
-- We no longer do this given we back it out again in schema 47
-- INSERT into background_updates (update_name, progress_json)
-- VALUES ('event_search_postgres_gist', '{}');

View File

@@ -0,0 +1,17 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('event_search_postgres_gin', '{}');

View File

@@ -38,6 +38,7 @@ class SearchStore(BackgroundUpdateStore):
EVENT_SEARCH_UPDATE_NAME = "event_search"
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
def __init__(self, db_conn, hs):
super(SearchStore, self).__init__(db_conn, hs)
@@ -48,9 +49,19 @@ class SearchStore(BackgroundUpdateStore):
self.EVENT_SEARCH_ORDER_UPDATE_NAME,
self._background_reindex_search_order
)
self.register_background_update_handler(
# we used to have a background update to turn the GIN index into a
# GIST one; we no longer do that (obviously) because we actually want
# a GIN index. However, it's possible that some people might still have
# the background update queued, so we register a handler to clear the
# background update.
self.register_noop_background_update(
self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME,
self._background_reindex_gist_search
)
self.register_background_update_handler(
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME,
self._background_reindex_gin_search
)
@defer.inlineCallbacks
@@ -151,25 +162,48 @@ class SearchStore(BackgroundUpdateStore):
defer.returnValue(result)
@defer.inlineCallbacks
def _background_reindex_gist_search(self, progress, batch_size):
def _background_reindex_gin_search(self, progress, batch_size):
"""This handles old synapses which used GIST indexes, if any;
converting them back to be GIN as per the actual schema.
"""
def create_index(conn):
conn.rollback()
# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
conn.set_session(autocommit=True)
c = conn.cursor()
c.execute(
"CREATE INDEX CONCURRENTLY event_search_fts_idx_gist"
" ON event_search USING GIST (vector)"
)
try:
c = conn.cursor()
c.execute("DROP INDEX event_search_fts_idx")
# if we skipped the conversion to GIST, we may already/still
# have an event_search_fts_idx; unfortunately postgres 9.4
# doesn't support CREATE INDEX IF EXISTS so we just catch the
# exception and ignore it.
import psycopg2
try:
c.execute(
"CREATE INDEX CONCURRENTLY event_search_fts_idx"
" ON event_search USING GIN (vector)"
)
except psycopg2.ProgrammingError as e:
logger.warn(
"Ignoring error %r when trying to switch from GIST to GIN",
e
)
conn.set_session(autocommit=False)
# we should now be able to delete the GIST index.
c.execute(
"DROP INDEX IF EXISTS event_search_fts_idx_gist"
)
finally:
conn.set_session(autocommit=False)
if isinstance(self.database_engine, PostgresEngine):
yield self.runWithConnection(create_index)
yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME)
yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME)
defer.returnValue(1)
@defer.inlineCallbacks
@@ -289,7 +323,30 @@ class SearchStore(BackgroundUpdateStore):
entry.stream_ordering, entry.origin_server_ts,
) for entry in entries)
# inserts to a GIN index are normally batched up into a pending
# list, and then all committed together once the list gets to a
# certain size. The trouble with that is that postgres (pre-9.5)
# uses work_mem to determine the length of the list, and work_mem
# is typically very large.
#
# We therefore reduce work_mem while we do the insert.
#
# (postgres 9.5 uses the separate gin_pending_list_limit setting,
# so doesn't suffer the same problem, but changing work_mem will
# be harmless)
#
# Note that we don't need to worry about restoring it on
# exception, because exceptions will cause the transaction to be
# rolled back, including the effects of the SET command.
#
# Also: we use SET rather than SET LOCAL because there's lots of
# other stuff going on in this transaction, which want to have the
# normal work_mem setting.
txn.execute("SET work_mem='256kB'")
txn.executemany(sql, args)
txn.execute("RESET work_mem")
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
@@ -662,7 +719,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
return " & ".join(result + ":*" for result in results)
return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:

View File

@@ -20,7 +20,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@@ -139,6 +139,20 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue(group_to_state)
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
"""Get the state IDs for the given state group
Args:
state_group (int)
Returns:
Deferred[dict]: Resolves to a map of (type, state_key) -> event_id
"""
group_to_state = yield self._get_state_for_groups((state_group,))
defer.returnValue(group_to_state[state_group])
@defer.inlineCallbacks
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
@@ -532,8 +546,7 @@ class StateGroupWorkerStore(SQLBaseStore):
state_dict = results[group]
state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
for k, v in group_state_dict.iteritems()
group_state_dict.iteritems()
)
self._state_group_cache.update(
@@ -655,6 +668,47 @@ class StateGroupWorkerStore(SQLBaseStore):
return self.runInteraction("store_state_group", _store_state_group_txn)
def _count_state_group_hops_txn(self, txn, state_group):
"""Given a state group, count how many hops there are in the tree.
This is used to ensure the delta chains don't get too long.
"""
if isinstance(self.database_engine, PostgresEngine):
sql = ("""
WITH RECURSIVE state(state_group) AS (
VALUES(?::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
SELECT count(*) FROM state;
""")
txn.execute(sql, (state_group,))
row = txn.fetchone()
if row and row[0]:
return row[0]
else:
return 0
else:
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
next_group = state_group
count = 0
while next_group:
next_group = self._simple_select_one_onecol_txn(
txn,
table="state_group_edges",
keyvalues={"state_group": next_group},
retcol="prev_state_group",
allow_none=True,
)
if next_group:
count += 1
return count
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
""" Keeps track of the state at a given event.
@@ -729,47 +783,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
(event_id,), state_group_id
)
def _count_state_group_hops_txn(self, txn, state_group):
"""Given a state group, count how many hops there are in the tree.
This is used to ensure the delta chains don't get too long.
"""
if isinstance(self.database_engine, PostgresEngine):
sql = ("""
WITH RECURSIVE state(state_group) AS (
VALUES(?::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
SELECT count(*) FROM state;
""")
txn.execute(sql, (state_group,))
row = txn.fetchone()
if row and row[0]:
return row[0]
else:
return 0
else:
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
next_group = state_group
count = 0
while next_group:
next_group = self._simple_select_one_onecol_txn(
txn,
table="state_group_edges",
keyvalues={"state_group": next_group},
retcol="prev_state_group",
allow_none=True,
)
if next_group:
count += 1
return count
@defer.inlineCallbacks
def _background_deduplicate_state(self, progress, batch_size):
"""This background update will slowly deduplicate state by reencoding

View File

@@ -19,20 +19,59 @@ from synapse.api.errors import SynapseError
from collections import namedtuple
Requester = namedtuple("Requester", [
class Requester(namedtuple("Requester", [
"user", "access_token_id", "is_guest", "device_id", "app_service",
])
"""
Represents the user making a request
])):
"""
Represents the user making a request
Attributes:
user (UserID): id of the user making the request
access_token_id (int|None): *ID* of the access token used for this
request, or None if it came via the appservice API or similar
is_guest (bool): True if the user making this request is a guest user
device_id (str|None): device_id which was set at authentication time
app_service (ApplicationService|None): the AS requesting on behalf of the user
"""
Attributes:
user (UserID): id of the user making the request
access_token_id (int|None): *ID* of the access token used for this
request, or None if it came via the appservice API or similar
is_guest (bool): True if the user making this request is a guest user
device_id (str|None): device_id which was set at authentication time
app_service (ApplicationService|None): the AS requesting on behalf of the user
"""
def serialize(self):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
Returns:
dict
"""
return {
"user_id": self.user.to_string(),
"access_token_id": self.access_token_id,
"is_guest": self.is_guest,
"device_id": self.device_id,
"app_server_id": self.app_service.id if self.app_service else None,
}
@staticmethod
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
Requester.
Args:
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`
Returns:
Requester
"""
appservice = None
if input["app_server_id"]:
appservice = store.get_app_service_by_id(input["app_server_id"])
return Requester(
user=UserID.from_string(input["user_id"]),
access_token_id=input["access_token_id"],
is_guest=input["is_guest"],
device_id=input["device_id"],
app_service=appservice,
)
def create_requester(user_id, access_token_id=None, is_guest=False,

View File

@@ -18,6 +18,16 @@ import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
def get_cache_factor_for(cache_name):
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
factor = os.environ.get(env_var)
if factor:
return float(factor)
return CACHE_SIZE_FACTOR
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}

View File

@@ -16,7 +16,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -298,7 +298,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree

View File

@@ -984,11 +984,13 @@ class RoomInitialSyncTestCase(RestTestCase):
self.assertTrue("presence" in response)
presence_by_user = {
e["content"]["user_id"]: e for e in response["presence"]
}
self.assertTrue(self.user_id in presence_by_user)
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
# presence is turned off on hotfixes
# presence_by_user = {
# e["content"]["user_id"]: e for e in response["presence"]
# }
# self.assertTrue(self.user_id in presence_by_user)
# self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
class RoomMessageListTestCase(RestTestCase):