1
0

Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston
2017-11-27 14:20:13 +00:00
69 changed files with 1168 additions and 638 deletions

View File

@@ -1,3 +1,69 @@
Changes in synapse v0.25.1 (2017-11-17)
=======================================
Bug fixes:
* Fix login with LDAP and other password provider modules (PR #2678). Thanks to
@jkolo!
Changes in synapse v0.25.0 (2017-11-15)
=======================================
Bug fixes:
* Fix port script (PR #2673)
Changes in synapse v0.25.0-rc1 (2017-11-14)
===========================================
Features:
* Add is_public to groups table to allow for private groups (PR #2582)
* Add a route for determining who you are (PR #2668) Thanks to @turt2live!
* Add more features to the password providers (PR #2608, #2610, #2620, #2622,
#2623, #2624, #2626, #2628, #2629)
* Add a hook for custom rest endpoints (PR #2627)
* Add API to update group room visibility (PR #2651)
Changes:
* Ignore <noscript> tags when generating URL preview descriptions (PR #2576)
Thanks to @maximevaillancourt!
* Register some /unstable endpoints in /r0 as well (PR #2579) Thanks to
@krombel!
* Support /keys/upload on /r0 as well as /unstable (PR #2585)
* Front-end proxy: pass through auth header (PR #2586)
* Allow ASes to deactivate their own users (PR #2589)
* Remove refresh tokens (PR #2613)
* Automatically set default displayname on register (PR #2617)
* Log login requests (PR #2618)
* Always return `is_public` in the `/groups/:group_id/rooms` API (PR #2630)
* Avoid no-op media deletes (PR #2637) Thanks to @spantaleev!
* Fix various embarrassing typos around user_directory and add some doc. (PR
#2643)
* Return whether a user is an admin within a group (PR #2647)
* Namespace visibility options for groups (PR #2657)
* Downcase UserIDs on registration (PR #2662)
* Cache failures when fetching URL previews (PR #2669)
Bug fixes:
* Fix port script (PR #2577)
* Fix error when running synapse with no logfile (PR #2581)
* Fix UI auth when deleting devices (PR #2591)
* Fix typo when checking if user is invited to group (PR #2599)
* Fix the port script to drop NUL values in all tables (PR #2611)
* Fix appservices being backlogged and not receiving new events due to a bug in
notify_interested_services (PR #2631) Thanks to @xyzz!
* Fix updating rooms avatar/display name when modified by admin (PR #2636)
Thanks to @farialima!
* Fix bug in state group storage (PR #2649)
* Fix 500 on invalid utf-8 in request (PR #2663)
Changes in synapse v0.24.1 (2017-10-24)
=======================================

View File

@@ -5,7 +5,8 @@ To use it, first install prometheus by following the instructions at
http://prometheus.io/
Then add a new job to the main prometheus.conf file:
### for Prometheus v1
Add a new job to the main prometheus.conf file:
job: {
name: "synapse"
@@ -15,6 +16,22 @@ Then add a new job to the main prometheus.conf file:
}
}
### for Prometheus v2
Add a new job to the main prometheus.yml file:
- job_name: "synapse"
metrics_path: "/_synapse/metrics"
# when endpoint uses https:
scheme: "https"
static_configs:
- targets: ['SERVER.LOCATION:PORT']
To use `synapse.rules` add
rule_files:
- "/PATH/TO/synapse-v2.rules"
Metrics are disabled by default when running synapse; they must be enabled
with the 'enable-metrics' option, either in the synapse config file or as a
command-line option.

View File

@@ -0,0 +1,60 @@
groups:
- name: synapse
rules:
- record: "synapse_federation_transaction_queue_pendingEdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- record: "synapse_federation_transaction_queue_pendingPdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- record: 'synapse_http_server_requests:method'
labels:
servlet: ""
expr: "sum(synapse_http_server_requests) by (method)"
- record: 'synapse_http_server_requests:servlet'
labels:
method: ""
expr: 'sum(synapse_http_server_requests) by (servlet)'
- record: 'synapse_http_server_requests:total'
labels:
servlet: ""
expr: 'sum(synapse_http_server_requests:by_method) by (servlet)'
- record: 'synapse_cache:hit_ratio_5m'
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'
- record: 'synapse_cache:hit_ratio_30s'
expr: 'rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])'
- record: 'synapse_federation_client_sent'
labels:
type: "EDU"
expr: 'synapse_federation_client_sent_edus + 0'
- record: 'synapse_federation_client_sent'
labels:
type: "PDU"
expr: 'synapse_federation_client_sent_pdu_destinations:count + 0'
- record: 'synapse_federation_client_sent'
labels:
type: "Query"
expr: 'sum(synapse_federation_client_sent_queries) by (job)'
- record: 'synapse_federation_server_received'
labels:
type: "EDU"
expr: 'synapse_federation_server_received_edus + 0'
- record: 'synapse_federation_server_received'
labels:
type: "PDU"
expr: 'synapse_federation_server_received_pdus + 0'
- record: 'synapse_federation_server_received'
labels:
type: "Query"
expr: 'sum(synapse_federation_server_received_queries) by (job)'
- record: 'synapse_federation_transaction_queue_pending'
labels:
type: "EDU"
expr: 'synapse_federation_transaction_queue_pending_edus + 0'
- record: 'synapse_federation_transaction_queue_pending'
labels:
type: "PDU"
expr: 'synapse_federation_transaction_queue_pending_pdus + 0'

View File

@@ -298,10 +298,6 @@ It can be used like this:
# this will now be logged against the request context
logger.debug("Request handling complete")
XXX: I think ``preserve_context_over_fn`` is supposed to do the first option,
but the fact that it does ``preserve_context_over_deferred`` on its results
means that its use is fraught with difficulty.
Passing synapse deferreds into third-party functions
----------------------------------------------------

View File

@@ -1,11 +1,15 @@
Scaling synapse via workers
---------------------------
===========================
Synapse has experimental support for splitting out functionality into
multiple separate python processes, helping greatly with scalability. These
processes are called 'workers', and are (eventually) intended to scale
horizontally independently.
All of the below is highly experimental and subject to change as Synapse evolves,
but documenting it here to help folks needing highly scalable Synapses similar
to the one running matrix.org!
All processes continue to share the same database instance, and as such, workers
only work with postgres based synapse deployments (sharing a single sqlite
across multiple processes is a recipe for disaster, plus you should be using
@@ -16,6 +20,16 @@ TCP protocol called 'replication' - analogous to MySQL or Postgres style
database replication; feeding a stream of relevant data to the workers so they
can be kept in sync with the main synapse process and database state.
Configuration
-------------
To make effective use of the workers, you will need to configure an HTTP
reverse-proxy such as nginx or haproxy, which will direct incoming requests to
the correct worker, or to the main synapse instance. Note that this includes
requests made to the federation port. The caveats regarding running a
reverse-proxy on the federation port still apply (see
https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port).
To enable workers, you need to add a replication listener to the master synapse, e.g.::
listeners:
@@ -27,26 +41,19 @@ Under **no circumstances** should this replication API listener be exposed to th
public internet; it currently implements no authentication whatsoever and is
unencrypted.
You then create a set of configs for the various worker processes. These should be
worker configuration files should be stored in a dedicated subdirectory, to allow
synctl to manipulate them.
The current available worker applications are:
* synapse.app.pusher - handles sending push notifications to sygnal and email
* synapse.app.synchrotron - handles /sync endpoints. can scales horizontally through multiple instances.
* synapse.app.appservice - handles output traffic to Application Services
* synapse.app.federation_reader - handles receiving federation traffic (including public_rooms API)
* synapse.app.media_repository - handles the media repository.
* synapse.app.client_reader - handles client API endpoints like /publicRooms
You then create a set of configs for the various worker processes. These
should be worker configuration files, and should be stored in a dedicated
subdirectory, to allow synctl to manipulate them.
Each worker configuration file inherits the configuration of the main homeserver
configuration file. You can then override configuration specific to that worker,
e.g. the HTTP listener that it provides (if any); logging configuration; etc.
You should minimise the number of overrides though to maintain a usable config.
You must specify the type of worker application (worker_app) and the replication
endpoint that it's talking to on the main synapse process (worker_replication_host
and worker_replication_port).
You must specify the type of worker application (``worker_app``). The currently
available worker applications are listed below. You must also specify the
replication endpoint that it's talking to on the main synapse process
(``worker_replication_host`` and ``worker_replication_port``).
For instance::
@@ -68,11 +75,11 @@ For instance::
worker_log_config: /home/matrix/synapse/config/synchrotron_log_config.yaml
...is a full configuration for a synchrotron worker instance, which will expose a
plain HTTP /sync endpoint on port 8083 separately from the /sync endpoint provided
plain HTTP ``/sync`` endpoint on port 8083 separately from the ``/sync`` endpoint provided
by the main synapse.
Obviously you should configure your loadbalancer to route the /sync endpoint to
the synchrotron instance(s) in this instance.
Obviously you should configure your reverse-proxy to route the relevant
endpoints to the worker (``localhost:8083`` in the above example).
Finally, to actually run your worker-based synapse, you must pass synctl the -a
commandline option to tell it to operate on all the worker configurations found
@@ -89,6 +96,114 @@ To manipulate a specific worker, you pass the -w option to synctl::
synctl -w $CONFIG/workers/synchrotron.yaml restart
All of the above is highly experimental and subject to change as Synapse evolves,
but documenting it here to help folks needing highly scalable Synapses similar
to the one running matrix.org!
Available worker applications
-----------------------------
``synapse.app.pusher``
~~~~~~~~~~~~~~~~~~~~~~
Handles sending push notifications to sygnal and email. Doesn't handle any
REST endpoints itself, but you should set ``start_pushers: False`` in the
shared configuration file to stop the main synapse sending these notifications.
Note this worker cannot be load-balanced: only one instance should be active.
``synapse.app.synchrotron``
~~~~~~~~~~~~~~~~~~~~~~~~~~~
The synchrotron handles ``sync`` requests from clients. In particular, it can
handle REST endpoints matching the following regular expressions::
^/_matrix/client/(v2_alpha|r0)/sync$
^/_matrix/client/(api/v1|v2_alpha|r0)/events$
^/_matrix/client/(api/v1|r0)/initialSync$
^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$
The above endpoints should all be routed to the synchrotron worker by the
reverse-proxy configuration.
It is possible to run multiple instances of the synchrotron to scale
horizontally. In this case the reverse-proxy should be configured to
load-balance across the instances, though it will be more efficient if all
requests from a particular user are routed to a single instance. Extracting
a userid from the access token is currently left as an exercise for the reader.
``synapse.app.appservice``
~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles sending output traffic to Application Services. Doesn't handle any
REST endpoints itself, but you should set ``notify_appservices: False`` in the
shared configuration file to stop the main synapse sending these notifications.
Note this worker cannot be load-balanced: only one instance should be active.
``synapse.app.federation_reader``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles a subset of federation endpoints. In particular, it can handle REST
endpoints matching the following regular expressions::
^/_matrix/federation/v1/event/
^/_matrix/federation/v1/state/
^/_matrix/federation/v1/state_ids/
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles sending federation traffic to other servers. Doesn't handle any
REST endpoints itself, but you should set ``send_federation: False`` in the
shared configuration file to stop the main synapse sending this traffic.
Note this worker cannot be load-balanced: only one instance should be active.
``synapse.app.media_repository``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles the media repository. It can handle all endpoints starting with::
/_matrix/media/
You should also set ``enable_media_repo: False`` in the shared configuration
file to stop the main synapse running background jobs related to managing the
media repository.
Note this worker cannot be load-balanced: only one instance should be active.
``synapse.app.client_reader``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handles client API endpoints. It can handle REST endpoints matching the
following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
``synapse.app.user_dir``
~~~~~~~~~~~~~~~~~~~~~~~~
Handles searches in the user directory. It can handle REST endpoints matching
the following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$
``synapse.app.frontend_proxy``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Proxies some frequently-requested client endpoints to add caching and remove
load from the main synapse. It can handle REST endpoints matching the following
regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
It will proxy any requests it cannot handle to the main synapse instance. It
must therefore be configured with the location of the main instance, via
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
file. For example::
worker_main_http_uri: http://127.0.0.1:8008

View File

@@ -43,6 +43,13 @@ BOOLEAN_COLUMNS = {
"device_lists_outbound_pokes": ["sent"],
"users_who_share_rooms": ["share_private"],
"groups": ["is_public"],
"group_rooms": ["is_public"],
"group_users": ["is_public", "is_admin"],
"group_summary_rooms": ["is_public"],
"group_room_categories": ["is_public"],
"group_summary_users": ["is_public"],
"group_roles": ["is_public"],
"local_group_membership": ["is_publicised", "is_admin"],
}

45
scripts/sync_room_to_group.pl Executable file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env perl
use strict;
use warnings;
use JSON::XS;
use LWP::UserAgent;
use URI::Escape;
if (@ARGV < 4) {
die "usage: $0 <homeserver url> <access_token> <room_id|room_alias> <group_id>\n";
}
my ($hs, $access_token, $room_id, $group_id) = @ARGV;
my $ua = LWP::UserAgent->new();
$ua->timeout(10);
if ($room_id =~ /^#/) {
$room_id = uri_escape($room_id);
$room_id = decode_json($ua->get("${hs}/_matrix/client/r0/directory/room/${room_id}?access_token=${access_token}")->decoded_content)->{room_id};
}
my $room_users = [ keys %{decode_json($ua->get("${hs}/_matrix/client/r0/rooms/${room_id}/joined_members?access_token=${access_token}")->decoded_content)->{joined}} ];
my $group_users = [
(map { $_->{user_id} } @{decode_json($ua->get("${hs}/_matrix/client/unstable/groups/${group_id}/users?access_token=${access_token}" )->decoded_content)->{chunk}}),
(map { $_->{user_id} } @{decode_json($ua->get("${hs}/_matrix/client/unstable/groups/${group_id}/invited_users?access_token=${access_token}" )->decoded_content)->{chunk}}),
];
die "refusing to sync from empty room" unless (@$room_users);
die "refusing to sync to empty group" unless (@$group_users);
my $diff = {};
foreach my $user (@$room_users) { $diff->{$user}++ }
foreach my $user (@$group_users) { $diff->{$user}-- }
foreach my $user (keys %$diff) {
if ($diff->{$user} == 1) {
warn "inviting $user";
print STDERR $ua->put("${hs}/_matrix/client/unstable/groups/${group_id}/admin/users/invite/${user}?access_token=${access_token}", Content=>'{}')->status_line."\n";
}
elsif ($diff->{$user} == -1) {
warn "removing $user";
print STDERR $ua->put("${hs}/_matrix/client/unstable/groups/${group_id}/admin/users/remove/${user}?access_token=${access_token}", Content=>'{}')->status_line."\n";
}
}

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.24.1"
__version__ = "0.25.1"

View File

@@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
from synapse.storage import are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
@@ -195,14 +194,19 @@ class SynapseHomeServer(HomeServer):
})
if name in ["media", "federation", "client"]:
media_repo = MediaRepositoryResource(self)
resources.update({
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
})
if self.get_config().enable_media_repo:
media_repo = self.get_media_repository_resource()
resources.update({
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
})
elif name == "media":
raise ConfigError(
"'media' resource conflicts with enable_media_repo=False",
)
if name in ["keys", "federation"]:
resources.update({

View File

@@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
@@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer):
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
elif name == "media":
media_repo = MediaRepositoryResource(self)
media_repo = self.get_media_repository_resource()
resources.update({
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
@@ -151,6 +150,13 @@ def start(config_options):
assert config.worker_app == "synapse.app.media_repository"
if config.enable_media_repo:
_base.quit_with_error(
"enable_media_repo must be disabled in the main synapse process\n"
"before the media repo can be run in a separate worker.\n"
"Please add ``enable_media_repo: false`` to the main config\n"
)
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts

View File

@@ -343,11 +343,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
# NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
self.presence_handler.sync_callback = self.send_user_sync
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)

View File

@@ -14,6 +14,7 @@
# limitations under the License.
from synapse.api.constants import EventTypes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import GroupID, get_domain_from_id
from twisted.internet import defer
@@ -81,12 +82,13 @@ class ApplicationService(object):
# values.
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
def __init__(self, token, url=None, namespaces=None, hs_token=None,
def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
sender=None, id=None, protocols=None, rate_limited=True):
self.token = token
self.url = url
self.hs_token = hs_token
self.sender = sender
self.server_name = hostname
self.namespaces = self._check_namespaces(namespaces)
self.id = id
@@ -125,6 +127,24 @@ class ApplicationService(object):
raise ValueError(
"Expected bool for 'exclusive' in ns '%s'" % ns
)
group_id = regex_obj.get("group_id")
if group_id:
if not isinstance(group_id, str):
raise ValueError(
"Expected string for 'group_id' in ns '%s'" % ns
)
try:
GroupID.from_string(group_id)
except Exception:
raise ValueError(
"Expected valid group ID for 'group_id' in ns '%s'" % ns
)
if get_domain_from_id(group_id) != self.server_name:
raise ValueError(
"Expected 'group_id' to be this host in ns '%s'" % ns
)
regex = regex_obj.get("regex")
if isinstance(regex, basestring):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
@@ -251,6 +271,21 @@ class ApplicationService(object):
if regex_obj["exclusive"]
]
def get_groups_for_user(self, user_id):
"""Get the groups that this user is associated with by this AS
Args:
user_id (str): The ID of the user.
Returns:
iterable[str]: an iterable that yields group_id strings.
"""
return (
regex_obj["group_id"]
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
)
def is_rate_limited(self):
return self.rate_limited

View File

@@ -154,6 +154,7 @@ def _load_appservice(hostname, as_info, config_filename):
)
return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
url=as_info["url"],
namespaces=as_info["namespaces"],
hs_token=as_info["hs_token"],

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,28 +19,43 @@ from ._base import Config
class PushConfig(Config):
def read_config(self, config):
self.push_redact_content = False
push_config = config.get("push", {})
self.push_include_content = push_config.get("include_content", True)
# There was a a 'redact_content' setting but mistakenly read from the
# 'email'section'. Check for the flag in the 'push' section, and log,
# but do not honour it to avoid nasty surprises when people upgrade.
if push_config.get("redact_content") is not None:
print(
"The push.redact_content content option has never worked. "
"Please set push.include_content if you want this behaviour"
)
# Now check for the one in the 'email' section and honour it,
# with a warning.
push_config = config.get("email", {})
self.push_redact_content = push_config.get("redact_content", False)
redact_content = push_config.get("redact_content")
if redact_content is not None:
print(
"The 'email.redact_content' option is deprecated: "
"please set push.include_content instead"
)
self.push_include_content = not redact_content
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Control how push messages are sent to google/apple to notifications.
# Normally every message said in a room with one or more people using
# mobile devices will be posted to a push server hosted by matrix.org
# which is registered with google and apple in order to allow push
# notifications to be sent to these mobile devices.
#
# Setting redact_content to true will make the push messages contain no
# message content which will provide increased privacy. This is a
# temporary solution pending improvements to Android and iPhone apps
# to get content from the app rather than the notification.
#
# Clients requesting push notifications can either have the body of
# the message sent in the notification poke along with other details
# like the sender, or just the event ID and room ID (`event_id_only`).
# If clients choose the former, this option controls whether the
# notification request includes the content of the event (other details
# like the sender are still included). For `event_id_only` push, it
# has no effect.
# For modern android devices the notification content will still appear
# because it is loaded by the app. iPhone, however will send a
# notification saying only that a message arrived and who it came from.
#
#push:
# redact_content: false
# include_content: true
"""

View File

@@ -41,6 +41,12 @@ class ServerConfig(Config):
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
# whether to enable the media repository endpoints. This should be set
# to false if the media repository is running as a separate endpoint;
# doing so ensures that we will not run cache cleanup jobs on the
# master, potentially causing inconsistency.
self.enable_media_repo = config.get("enable_media_repo", True)
self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
# Whether we should block invites sent to users on this server

View File

@@ -25,7 +25,7 @@ from synapse.api.errors import (
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.events import FrozenEvent, builder
import synapse.metrics
@@ -420,7 +420,7 @@ class FederationClient(FederationBase):
for e_id in batch
]
res = yield preserve_context_over_deferred(
res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
)
for success, result in res:

View File

@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import logging
@@ -159,7 +159,7 @@ class ApplicationServicesHandler(object):
def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)
results = yield preserve_context_over_deferred(defer.DeferredList([
results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
], consumeErrors=True))

View File

@@ -551,7 +551,7 @@ class AuthHandler(BaseHandler):
qualified_user_id, password,
)
if is_valid:
defer.returnValue(qualified_user_id)
defer.returnValue((qualified_user_id, None))
if (not hasattr(provider, "get_supported_login_types")
or not hasattr(provider, "check_auth")):

View File

@@ -375,6 +375,12 @@ class GroupsLocalHandler(object):
def get_publicised_groups_for_user(self, user_id):
if self.hs.is_mine_id(user_id):
result = yield self.store.get_publicised_groups_for_user(user_id)
# Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`)
for app_service in self.store.get_app_services():
result.extend(app_service.get_groups_for_user(user_id))
defer.returnValue({"groups": result})
else:
result = yield self.transport_client.get_publicised_groups_for_user(
@@ -415,4 +421,9 @@ class GroupsLocalHandler(object):
uid
)
# Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`)
for app_service in self.store.get_app_services():
results[uid].extend(app_service.get_groups_for_user(uid))
defer.returnValue({"users": results})

View File

@@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler):
lambda states: states[event.event_id]
)
(messages, token), current_state = yield preserve_context_over_deferred(
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(

View File

@@ -1204,7 +1204,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
)
changed = True
else:
# We expect to be poked occaisonally by the other side.
# We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
# no one gets stuck online forever.
if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:

View File

@@ -154,6 +154,8 @@ class RoomListHandler(BaseHandler):
# We want larger rooms to be first, hence negating num_joined_users
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
logger.info("Getting ordering for %i rooms since %s",
len(room_ids), stream_token)
yield concurrently_execute(get_order_for_room, room_ids, 10)
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
@@ -181,34 +183,42 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse()
# Actually generate the entries. _append_room_entry_to_chunk will append to
# chunk but will stop if len(chunk) > limit
chunk = []
if limit and not search_filter:
logger.info("After sorting and filtering, %i rooms remain",
len(rooms_to_scan))
# _append_room_entry_to_chunk will append to chunk but will stop if
# len(chunk) > limit
#
# Normally we will generate enough results on the first iteration here,
# but if there is a search filter, _append_room_entry_to_chunk may
# filter some results out, in which case we loop again.
#
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
#
# XXX if there is no limit, we may end up DoSing the server with
# calls to get_current_state_ids for every single room on the
# server. Surely we should cap this somehow?
#
if limit:
step = limit + 1
for i in xrange(0, len(rooms_to_scan), step):
# We iterate here because the vast majority of cases we'll stop
# at first iteration, but occaisonally _append_room_entry_to_chunk
# won't append to the chunk and so we need to loop again.
# We don't want to scan over the entire range either as that
# would potentially waste a lot of work.
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
rooms_to_scan[i:i + step], 10
)
if len(chunk) >= limit + 1:
break
else:
step = len(rooms_to_scan)
chunk = []
for i in xrange(0, len(rooms_to_scan), step):
batch = rooms_to_scan[i:i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
rooms_to_scan, 5
batch, 5,
)
logger.info("Now %i rooms in result", len(chunk))
if len(chunk) >= limit + 1:
break
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))

View File

@@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host):
return res
# no logcontexts here, so we can safely fire these off and gatherResults
d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
d1 = dns_client.lookupAddress(host).addCallbacks(
cb, eb, errbackArgs=("A", ))
d2 = dns_client.lookupIPV6Address(host).addCallbacks(
cb, eb, errbackArgs=("AAAA", ))
results = yield defer.DeferredList(
[d1, d2], consumeErrors=True)

View File

@@ -28,6 +28,7 @@ from canonicaljson import (
)
from twisted.internet import defer
from twisted.python import failure
from twisted.web import server, resource
from twisted.web.server import NOT_DONE_YET
from twisted.web.util import redirectTo
@@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False):
version_string=self.version_string,
)
except Exception:
logger.exception(
"Failed handle request %s.%s on %r: %r",
# failure.Failure() fishes the original Failure out
# of our stack, and thus gives us a sensible stack
# trace.
f = failure.Failure()
logger.error(
"Failed handle request %s.%s on %r: %r: %s",
request_handler.__module__,
request_handler.__name__,
self,
request
request,
f.getTraceback().rstrip(),
)
respond_with_json(
request,

View File

@@ -167,7 +167,8 @@ def parse_json_value_from_request(request):
try:
content = simplejson.loads(content_bytes)
except simplejson.JSONDecodeError:
except Exception as e:
logger.warn("Unable to parse JSON: %s", e)
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
return content

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -295,7 +296,7 @@ class HttpPusher(object):
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
if not self.hs.config.push_redact_content and 'content' in event:
if self.hs.config.push_include_content and 'content' in event:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human

View File

@@ -17,7 +17,7 @@
from twisted.internet import defer
from .pusher import PusherFactory
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import run_on_reactor
import logging
@@ -136,7 +136,7 @@ class PusherPool:
)
)
yield preserve_context_over_deferred(defer.gatherResults(deferreds))
yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_notifications")
@@ -161,7 +161,7 @@ class PusherPool:
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
)
yield preserve_context_over_deferred(defer.gatherResults(deferreds))
yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_receipts")

View File

@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
class BaseSlavedStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(BaseSlavedStore, self).__init__(hs)
super(BaseSlavedStore, self).__init__(db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = SlavedIdTracker(
db_conn, "cache_invalidation_stream", "stream_id",

View File

@@ -12,20 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
import logging
from synapse.api.constants import EventTypes
from synapse.storage import DataStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore
from synapse.storage.state import StateStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupReadStore
from synapse.storage.stream import StreamStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
import logging
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
logger = logging.getLogger(__name__)
@@ -39,7 +37,7 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
class SlavedEventStore(BaseSlavedStore):
class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
@@ -90,25 +88,9 @@ class SlavedEventStore(BaseSlavedStore):
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
_get_state_group_for_event = (
StateStore.__dict__["_get_state_group_for_event"]
)
_get_state_groups_from_groups = (
StateStore.__dict__["_get_state_groups_from_groups"]
)
_get_state_groups_from_groups_txn = (
DataStore._get_state_groups_from_groups_txn.__func__
)
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
get_current_state_ids = (
StateStore.__dict__["get_current_state_ids"]
)
get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
has_room_changed_since = DataStore.has_room_changed_since.__func__
@@ -134,12 +116,6 @@ class SlavedEventStore(BaseSlavedStore):
DataStore.get_room_events_stream_for_room.__func__
)
get_events_around = DataStore.get_events_around.__func__
get_state_for_event = DataStore.get_state_for_event.__func__
get_state_for_events = DataStore.get_state_for_events.__func__
get_state_groups = DataStore.get_state_groups.__func__
get_state_groups_ids = DataStore.get_state_groups_ids.__func__
get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__
get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__
get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
_get_joined_users_from_context = (
@@ -169,10 +145,7 @@ class SlavedEventStore(BaseSlavedStore):
_get_rooms_for_user_where_membership_is_txn = (
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
_get_state_for_groups = DataStore._get_state_for_groups.__func__
_get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
_get_events_around_txn = DataStore._get_events_around_txn.__func__
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
get_backfill_events = DataStore.get_backfill_events.__func__
_get_backfill_events = DataStore._get_backfill_events.__func__

View File

@@ -216,11 +216,12 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
@defer.inlineCallbacks
def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
self.presence_handler.update_external_syncs_row(
yield self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing, last_sync_ms,
)
@@ -244,11 +245,12 @@ class ReplicationStreamer(object):
getattr(self.store, cache_func).invalidate(tuple(keys))
@measure_func("repl.on_user_ip")
@defer.inlineCallbacks
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
"""The client saw a user request
"""
user_ip_cache_counter.inc()
self.store.insert_client_ip(
yield self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen,
)

View File

@@ -382,6 +382,20 @@ class ThreepidDeleteRestServlet(RestServlet):
defer.returnValue((200, {}))
class WhoamiRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/account/whoami$")
def __init__(self, hs):
super(WhoamiRestServlet, self).__init__()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request)
defer.returnValue((200, {'user_id': requester.user.to_string()}))
def register_servlets(hs, http_server):
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
MsisdnPasswordRequestTokenRestServlet(hs).register(http_server)
@@ -391,3 +405,4 @@ def register_servlets(hs, http_server):
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)

View File

@@ -30,6 +30,7 @@ class VersionsRestServlet(RestServlet):
"r0.0.1",
"r0.1.0",
"r0.2.0",
"r0.3.0",
]
})

View File

@@ -20,11 +20,13 @@ from twisted.web.resource import Resource
from synapse.api.errors import (
SynapseError, Codes,
)
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.stringutils import random_string
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.http.client import SpiderHttpClient
from synapse.http.server import (
request_handler, respond_with_json_bytes
request_handler, respond_with_json_bytes,
respond_with_json,
)
from synapse.util.async import ObservableDeferred
from synapse.util.stringutils import is_ascii
@@ -63,21 +65,23 @@ class PreviewUrlResource(Resource):
self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist
# simple memory cache mapping urls to OG metadata
self.cache = ExpiringCache(
# memory cache mapping urls to an ObservableDeferred returning
# JSON-encoded OG metadata
self._cache = ExpiringCache(
cache_name="url_previews",
clock=self.clock,
# don't spider URLs more often than once an hour
expiry_ms=60 * 60 * 1000,
)
self.cache.start()
self.downloads = {}
self._cache.start()
self._cleaner_loop = self.clock.looping_call(
self._expire_url_cache_data, 10 * 1000
)
def render_OPTIONS(self, request):
return respond_with_json(request, 200, {}, send_cors=True)
def render_GET(self, request):
self._async_render_GET(request)
return NOT_DONE_YET
@@ -94,6 +98,7 @@ class PreviewUrlResource(Resource):
else:
ts = self.clock.time_msec()
# XXX: we could move this into _do_preview if we wanted.
url_tuple = urlparse.urlsplit(url)
for entry in self.url_preview_url_blacklist:
match = True
@@ -126,14 +131,42 @@ class PreviewUrlResource(Resource):
Codes.UNKNOWN
)
# first check the memory cache - good to handle all the clients on this
# HS thundering away to preview the same URL at the same time.
og = self.cache.get(url)
if og:
respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True)
return
# the in-memory cache:
# * ensures that only one request is active at a time
# * takes load off the DB for the thundering herds
# * also caches any failures (unlike the DB) so we don't keep
# requesting the same endpoint
# then check the URL cache in the DB (which will also provide us with
observable = self._cache.get(url)
if not observable:
download = preserve_fn(self._do_preview)(
url, requester.user, ts,
)
observable = ObservableDeferred(
download,
consumeErrors=True
)
self._cache[url] = observable
else:
logger.info("Returning cached response")
og = yield make_deferred_yieldable(observable.observe())
respond_with_json_bytes(request, 200, og, send_cors=True)
@defer.inlineCallbacks
def _do_preview(self, url, user, ts):
"""Check the db, and download the URL and build a preview
Args:
url (str):
user (str):
ts (int):
Returns:
Deferred[str]: json-encoded og data
"""
# check the URL cache in the DB (which will also provide us with
# historical previews, if we have any)
cache_result = yield self.store.get_url_cache(url, ts)
if (
@@ -141,32 +174,10 @@ class PreviewUrlResource(Resource):
cache_result["expires_ts"] > ts and
cache_result["response_code"] / 100 == 2
):
respond_with_json_bytes(
request, 200, cache_result["og"].encode('utf-8'),
send_cors=True
)
defer.returnValue(cache_result["og"])
return
# Ensure only one download for a given URL is active at a time
download = self.downloads.get(url)
if download is None:
download = self._download_url(url, requester.user)
download = ObservableDeferred(
download,
consumeErrors=True
)
self.downloads[url] = download
@download.addBoth
def callback(media_info):
del self.downloads[url]
return media_info
media_info = yield download.observe()
# FIXME: we should probably update our cache now anyway, so that
# even if the OG calculation raises, we don't keep hammering on the
# remote server. For now, leave it uncached to aid debugging OG
# calculation problems
media_info = yield self._download_url(url, user)
logger.debug("got media_info of '%s'" % media_info)
@@ -212,7 +223,7 @@ class PreviewUrlResource(Resource):
# just rely on the caching on the master request to speed things up.
if 'og:image' in og and og['og:image']:
image_info = yield self._download_url(
_rebase_url(og['og:image'], media_info['uri']), requester.user
_rebase_url(og['og:image'], media_info['uri']), user
)
if _is_media(image_info['media_type']):
@@ -239,8 +250,7 @@ class PreviewUrlResource(Resource):
logger.debug("Calculated OG for %s as %s" % (url, og))
# store OG in ephemeral in-memory cache
self.cache[url] = og
jsonog = json.dumps(og)
# store OG in history-aware DB cache
yield self.store.store_url_cache(
@@ -248,12 +258,12 @@ class PreviewUrlResource(Resource):
media_info["response_code"],
media_info["etag"],
media_info["expires"] + media_info["created_ts"],
json.dumps(og),
jsonog,
media_info["filesystem_id"],
media_info["created_ts"],
)
respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True)
defer.returnValue(jsonog)
@defer.inlineCallbacks
def _download_url(self, url, user):
@@ -342,11 +352,16 @@ class PreviewUrlResource(Resource):
def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails.
"""
# TODO: Delete from backup media store
now = self.clock.time_msec()
logger.info("Running url preview cache expiry")
if not (yield self.store.has_completed_background_updates()):
logger.info("Still running DB updates; skipping expiry")
return
# First we delete expired url cache entries
media_ids = yield self.store.get_expired_url_cache(now)
@@ -420,8 +435,7 @@ class PreviewUrlResource(Resource):
yield self.store.delete_url_cache_media(removed_media)
if removed_media:
logger.info("Deleted %d media from url cache", len(removed_media))
logger.info("Deleted %d media from url cache", len(removed_media))
def decode_and_calc_og(body, media_uri, request_encoding=None):

View File

@@ -60,7 +60,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool
from synapse.rest.media.v1.media_repository import MediaRepository
from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
)
from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.streams.events import EventSources
@@ -90,17 +93,12 @@ class HomeServer(object):
"""
DEPENDENCIES = [
'config',
'clock',
'http_client',
'db_pool',
'persistence_service',
'replication_layer',
'datastore',
'handlers',
'v1auth',
'auth',
'rest_servlet_factory',
'state_handler',
'presence_handler',
'sync_handler',
@@ -118,18 +116,7 @@ class HomeServer(object):
'device_message_handler',
'profile_handler',
'notifier',
'distributor',
'client_resource',
'resource_for_federation',
'resource_for_static_content',
'resource_for_web_client',
'resource_for_content_repo',
'resource_for_server_key',
'resource_for_server_key_v2',
'resource_for_media_repository',
'resource_for_metrics',
'event_sources',
'ratelimiter',
'keyring',
'pusherpool',
'event_builder_factory',
@@ -137,6 +124,7 @@ class HomeServer(object):
'http_client_context_factory',
'simple_http_client',
'media_repository',
'media_repository_resource',
'federation_transport_client',
'federation_sender',
'receipts_handler',
@@ -183,6 +171,21 @@ class HomeServer(object):
def is_mine_id(self, string):
return string.split(":", 1)[1] == self.hostname
def get_clock(self):
return self.clock
def get_datastore(self):
return self.datastore
def get_config(self):
return self.config
def get_distributor(self):
return self.distributor
def get_ratelimiter(self):
return self.ratelimiter
def build_replication_layer(self):
return initialize_http_replication(self)
@@ -294,6 +297,11 @@ class HomeServer(object):
**self.db_config.get("args", {})
)
def build_media_repository_resource(self):
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
return MediaRepositoryResource(self)
def build_media_repository(self):
return MediaRepository(self)

View File

@@ -5,6 +5,7 @@ import synapse.handlers
import synapse.handlers.auth
import synapse.handlers.device
import synapse.handlers.e2e_keys
import synapse.rest.media.v1.media_repository
import synapse.storage
import synapse.state
@@ -35,3 +36,9 @@ class HomeServer(object):
def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
pass
def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource:
pass
def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
pass

View File

@@ -268,7 +268,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
super(DataStore, self).__init__(hs)
super(DataStore, self).__init__(db_conn, hs)
def take_presence_startup_info(self):
active_on_startup = self._presence_on_startup

View File

@@ -16,8 +16,6 @@ import logging
from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
import synapse.metrics
@@ -162,7 +160,7 @@ class PerformanceCounters(object):
class SQLBaseStore(object):
_TXN_ID = 0
def __init__(self, hs):
def __init__(self, db_conn, hs):
self.hs = hs
self._clock = hs.get_clock()
self._db_pool = hs.get_db_pool()
@@ -180,10 +178,6 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3,
max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 500000 * CACHE_SIZE_FACTOR
)
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
@@ -475,23 +469,53 @@ class SQLBaseStore(object):
txn.executemany(sql, vals)
@defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):
"""
`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:
* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
* we somehow know that we are the only thread which will be updating
this table.
Args:
table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): key/values to use when inserting
insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
"""
return self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock
)
attempts = 0
while True:
try:
result = yield self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock=lock
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
attempts += 1
if attempts >= 5:
# don't retry forever, because things other than races
# can cause IntegrityErrors
raise
# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
table, e
)
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True):
@@ -499,7 +523,7 @@ class SQLBaseStore(object):
if lock:
self.database_engine.lock_table(txn, table)
# Try to update
# First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
@@ -508,24 +532,25 @@ class SQLBaseStore(object):
sqlargs = values.values() + keyvalues.values()
txn.execute(sql, sqlargs)
if txn.rowcount == 0:
# We didn't update and rows so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
txn.execute(sql, allvalues.values())
return True
else:
if txn.rowcount > 0:
# successfully updated at least one row.
return False
# We didn't update any rows so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
txn.execute(sql, allvalues.values())
# successfully inserted
return True
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
@@ -582,20 +607,18 @@ class SQLBaseStore(object):
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
else:
where = ""
sql = (
"SELECT %(retcol)s FROM %(table)s %(where)s"
"SELECT %(retcol)s FROM %(table)s"
) % {
"retcol": retcol,
"table": table,
"where": where,
}
txn.execute(sql, keyvalues.values())
if keyvalues:
sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
txn.execute(sql, keyvalues.values())
else:
txn.execute(sql)
return [r[0] for r in txn]
@@ -606,7 +629,7 @@ class SQLBaseStore(object):
Args:
table (str): table name
keyvalues (dict): column names and values to select the rows with
keyvalues (dict|None): column names and values to select the rows with
retcol (str): column whos value we wish to retrieve.
Returns:

View File

@@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
def add_account_data_txn(txn, next_id):
self._simple_upsert_txn(
txn,
with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so _simple_upsert will
# retry if there is a conflict.
yield self._simple_upsert(
desc="add_room_account_data",
table="room_account_data",
keyvalues={
"user_id": user_id,
@@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
}
},
lock=False,
)
txn.call_after(
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction(
"add_room_account_data", add_account_data_txn, next_id
)
# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
@@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
def add_account_data_txn(txn, next_id):
self._simple_upsert_txn(
txn,
with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so _simple_upsert will retry if
# there is a conflict.
yield self._simple_upsert(
desc="add_user_account_data",
table="account_data",
keyvalues={
"user_id": user_id,
@@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
}
},
lock=False,
)
txn.call_after(
self._account_data_stream_cache.entity_has_changed,
# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(
user_id, next_id,
)
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
txn.call_after(
self.get_global_account_data_by_type_for_user.invalidate,
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
(account_data_type, user_id,)
)
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction(
"add_user_account_data", add_account_data_txn, next_id
)
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
def _update_max_stream_id(self, txn, next_id):
def _update_max_stream_id(self, next_id):
"""Update the max stream_id
Args:
txn: The database cursor
next_id(int): The the revision to advance to.
"""
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
return self.runInteraction(
"update_account_data_max_stream_id",
_update,
)
txn.execute(update_max_id_sql, (next_id, next_id))
@cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):

View File

@@ -48,8 +48,8 @@ def _make_exclusive_regex(services_cache):
class ApplicationServiceStore(SQLBaseStore):
def __init__(self, hs):
super(ApplicationServiceStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(ApplicationServiceStore, self).__init__(db_conn, hs)
self.hostname = hs.hostname
self.services_cache = load_appservices(
hs.hostname,
@@ -173,8 +173,8 @@ class ApplicationServiceStore(SQLBaseStore):
class ApplicationServiceTransactionStore(SQLBaseStore):
def __init__(self, hs):
super(ApplicationServiceTransactionStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(ApplicationServiceTransactionStore, self).__init__(db_conn, hs)
@defer.inlineCallbacks
def get_appservices_by_state(self, state):

View File

@@ -80,11 +80,12 @@ class BackgroundUpdateStore(SQLBaseStore):
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
def __init__(self, hs):
super(BackgroundUpdateStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(BackgroundUpdateStore, self).__init__(db_conn, hs)
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False
@defer.inlineCallbacks
def start_doing_background_updates(self):
@@ -106,8 +107,40 @@ class BackgroundUpdateStore(SQLBaseStore):
"No more background updates to do."
" Unscheduling background update task."
)
self._all_done = True
defer.returnValue(None)
@defer.inlineCallbacks
def has_completed_background_updates(self):
"""Check if all the background updates have completed
Returns:
Deferred[bool]: True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
defer.returnValue(True)
# obviously, if we have things in our queue, we're not done.
if self._background_update_queue:
defer.returnValue(False)
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
updates = yield self._simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
desc="check_background_updates",
)
if not updates:
self._all_done = True
defer.returnValue(True)
defer.returnValue(False)
@defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
@@ -269,7 +302,7 @@ class BackgroundUpdateStore(SQLBaseStore):
# Sqlite doesn't support concurrent creation of indexes.
#
# We don't use partial indices on SQLite as it wasn't introduced
# until 3.8, and wheezy has 3.7
# until 3.8, and wheezy and CentOS 7 have 3.7
#
# We assume that sqlite doesn't give us invalid indices; however
# we may still end up with the index existing but the

View File

@@ -32,14 +32,14 @@ LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):
def __init__(self, hs):
def __init__(self, db_conn, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
max_entries=50000 * CACHE_SIZE_FACTOR,
)
super(ClientIpStore, self).__init__(hs)
super(ClientIpStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"user_ips_device_index",

View File

@@ -29,8 +29,8 @@ logger = logging.getLogger(__name__)
class DeviceInboxStore(BackgroundUpdateStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
def __init__(self, hs):
super(DeviceInboxStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(DeviceInboxStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"device_inbox_stream_index",

View File

@@ -26,8 +26,8 @@ logger = logging.getLogger(__name__)
class DeviceStore(SQLBaseStore):
def __init__(self, hs):
super(DeviceStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)
# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.

View File

@@ -39,8 +39,8 @@ class EventFederationStore(SQLBaseStore):
EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
def __init__(self, hs):
super(EventFederationStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(EventFederationStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_AUTH_STATE_ONLY,

View File

@@ -65,8 +65,8 @@ def _deserialize_action(actions, is_highlight):
class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, hs):
super(EventPushActionsStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(EventPushActionsStore, self).__init__(db_conn, hs)
self.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,

View File

@@ -197,8 +197,8 @@ class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts

View File

@@ -12,13 +12,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
class MediaRepositoryStore(SQLBaseStore):
class MediaRepositoryStore(BackgroundUpdateStore):
"""Persistence for attachments and avatars"""
def __init__(self, db_conn, hs):
super(MediaRepositoryStore, self).__init__(db_conn, hs)
self.register_background_index_update(
update_name='local_media_repository_url_idx',
index_name='local_media_repository_url_idx',
table='local_media_repository',
columns=['created_ts'],
where_clause='url_cache IS NOT NULL',
)
def get_default_thumbnails(self, top_level_type, sub_type):
return []

View File

@@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore):
pushkey, pushkey_ts, lang, data, last_stream_ordering,
profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
def f(txn):
newly_inserted = self._simple_upsert_txn(
txn,
"pushers",
{
"app_id": app_id,
"pushkey": pushkey,
"user_name": user_id,
},
{
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": encode_canonical_json(data),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
},
)
if newly_inserted:
# get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
newly_inserted = yield self._simple_upsert(
table="pushers",
keyvalues={
"app_id": app_id,
"pushkey": pushkey,
"user_name": user_id,
},
values={
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": encode_canonical_json(data),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
},
desc="add_pusher",
lock=False,
)
yield self.runInteraction("add_pusher", f)
if newly_inserted:
# get_if_user_has_pusher only cares if the user has
# at least *one* pusher.
self.get_if_user_has_pusher.invalidate(user_id,)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -243,11 +244,19 @@ class PusherStore(SQLBaseStore):
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
)
self._simple_upsert_txn(
# it's possible for us to end up with duplicate rows for
# (app_id, pushkey, user_id) at different stream_ids, but that
# doesn't really matter.
self._simple_insert_txn(
txn,
"deleted_pushers",
{"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
{"stream_id": stream_id},
table="deleted_pushers",
values={
"stream_id": stream_id,
"app_id": app_id,
"pushkey": pushkey,
"user_id": user_id,
},
)
with self._pushers_id_gen.get_next() as stream_id:
@@ -310,9 +319,12 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def set_throttle_params(self, pusher_id, room_id, params):
# no need to lock because `pusher_throttle` has a primary key on
# (pusher, room_id) so _simple_upsert will retry
yield self._simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
params,
desc="set_throttle_params"
desc="set_throttle_params",
lock=False,
)

View File

@@ -27,8 +27,8 @@ logger = logging.getLogger(__name__)
class ReceiptsStore(SQLBaseStore):
def __init__(self, hs):
super(ReceiptsStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(ReceiptsStore, self).__init__(db_conn, hs)
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()

View File

@@ -24,8 +24,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
class RegistrationStore(background_updates.BackgroundUpdateStore):
def __init__(self, hs):
super(RegistrationStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(RegistrationStore, self).__init__(db_conn, hs)
self.clock = hs.get_clock()

View File

@@ -49,8 +49,8 @@ _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
class RoomMemberStore(SQLBaseStore):
def __init__(self, hs):
super(RoomMemberStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(RoomMemberStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
)

View File

@@ -13,7 +13,10 @@
* limitations under the License.
*/
CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was
-- removed and replaced with 46/local_media_repository_url_idx.sql.
--
-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
-- indices on expressions until 3.9.

View File

@@ -0,0 +1,35 @@
/* Copyright 2017 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- drop the unique constraint on deleted_pushers so that we can just insert
-- into it rather than upserting.
CREATE TABLE deleted_pushers2 (
stream_id BIGINT NOT NULL,
app_id TEXT NOT NULL,
pushkey TEXT NOT NULL,
user_id TEXT NOT NULL
);
INSERT INTO deleted_pushers2 (stream_id, app_id, pushkey, user_id)
SELECT stream_id, app_id, pushkey, user_id from deleted_pushers;
DROP TABLE deleted_pushers;
ALTER TABLE deleted_pushers2 RENAME TO deleted_pushers;
-- create the index after doing the inserts because that's more efficient.
-- it also means we can give it the same name as the old one without renaming.
CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id);

View File

@@ -0,0 +1,24 @@
/* Copyright 2017 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- register a background update which will recreate the
-- local_media_repository_url_idx index.
--
-- We do this as a bg update not because it is a particularly onerous
-- operation, but because we'd like it to be a partial index if possible, and
-- the background_index_update code will understand whether we are on
-- postgres or sqlite and behave accordingly.
INSERT INTO background_updates (update_name, progress_json) VALUES
('local_media_repository_url_idx', '{}');

View File

@@ -33,8 +33,8 @@ class SearchStore(BackgroundUpdateStore):
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
def __init__(self, hs):
super(SearchStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(SearchStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
)

View File

@@ -13,16 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches import intern_string
from synapse.util.stringutils import to_ascii
from synapse.storage.engines import PostgresEngine
from collections import namedtuple
import logging
from twisted.internet import defer
from collections import namedtuple
import logging
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -40,45 +42,22 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
return len(self.delta_ids) if self.delta_ids else 0
class StateStore(SQLBaseStore):
""" Keeps track of the state at a given event.
class StateGroupReadStore(SQLBaseStore):
"""The read-only parts of StateGroupStore
This is done by the concept of `state groups`. Every event is a assigned
a state group (identified by an arbitrary string), which references a
collection of state events. The current state of an event is then the
collection of state events referenced by the event's state group.
Hence, every change in the current state causes a new state group to be
generated. However, if no change happens (e.g., if we get a message event
with only one parent it inherits the state group from its parent.)
There are three tables:
* `state_groups`: Stores group name, first event with in the group and
room id.
* `event_to_state_groups`: Maps events to state groups.
* `state_groups_state`: Maps state group to state events.
None of these functions write to the state tables, so are suitable for
including in the SlavedStores.
"""
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
def __init__(self, hs):
super(StateStore, self).__init__(hs)
self.register_background_update_handler(
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
self._background_deduplicate_state,
)
self.register_background_update_handler(
self.STATE_GROUP_INDEX_UPDATE_NAME,
self._background_index_state,
)
self.register_background_index_update(
self.CURRENT_STATE_INDEX_UPDATE_NAME,
index_name="current_state_events_member_index",
table="current_state_events",
columns=["state_key"],
where_clause="type='m.room.member'",
def __init__(self, db_conn, hs):
super(StateGroupReadStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 500000 * CACHE_SIZE_FACTOR
)
@cached(max_entries=100000, iterable=True)
@@ -190,178 +169,6 @@ class StateStore(SQLBaseStore):
for group, event_id_map in group_to_ids.iteritems()
})
def _have_persisted_state_group_txn(self, txn, state_group):
txn.execute(
"SELECT count(*) FROM state_groups WHERE id = ?",
(state_group,)
)
row = txn.fetchone()
return row and row[0]
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
state_groups = {}
for event, context in events_and_contexts:
if event.internal_metadata.is_outlier():
continue
if context.current_state_ids is None:
# AFAIK, this can never happen
logger.error(
"Non-outlier event %s had current_state_ids==None",
event.event_id)
continue
# if the event was rejected, just give it the same state as its
# predecessor.
if context.rejected:
state_groups[event.event_id] = context.prev_group
continue
state_groups[event.event_id] = context.state_group
if self._have_persisted_state_group_txn(txn, context.state_group):
continue
self._simple_insert_txn(
txn,
table="state_groups",
values={
"id": context.state_group,
"room_id": event.room_id,
"event_id": event.event_id,
},
)
# We persist as a delta if we can, while also ensuring the chain
# of deltas isn't tooo long, as otherwise read performance degrades.
if context.prev_group:
is_in_db = self._simple_select_one_onecol_txn(
txn,
table="state_groups",
keyvalues={"id": context.prev_group},
retcol="id",
allow_none=True,
)
if not is_in_db:
raise Exception(
"Trying to persist state with unpersisted prev_group: %r"
% (context.prev_group,)
)
potential_hops = self._count_state_group_hops_txn(
txn, context.prev_group
)
if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
self._simple_insert_txn(
txn,
table="state_group_edges",
values={
"state_group": context.state_group,
"prev_state_group": context.prev_group,
},
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": context.state_group,
"room_id": event.room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in context.delta_ids.iteritems()
],
)
else:
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": context.state_group,
"room_id": event.room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in context.current_state_ids.iteritems()
],
)
# Prefill the state group cache with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=dict(context.current_state_ids),
full=True,
)
self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
values=[
{
"state_group": state_group_id,
"event_id": event_id,
}
for event_id, state_group_id in state_groups.iteritems()
],
)
for event_id, state_group_id in state_groups.iteritems():
txn.call_after(
self._get_state_group_for_event.prefill,
(event_id,), state_group_id
)
def _count_state_group_hops_txn(self, txn, state_group):
"""Given a state group, count how many hops there are in the tree.
This is used to ensure the delta chains don't get too long.
"""
if isinstance(self.database_engine, PostgresEngine):
sql = ("""
WITH RECURSIVE state(state_group) AS (
VALUES(?::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
SELECT count(*) FROM state;
""")
txn.execute(sql, (state_group,))
row = txn.fetchone()
if row and row[0]:
return row[0]
else:
return 0
else:
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
next_group = state_group
count = 0
while next_group:
next_group = self._simple_select_one_onecol_txn(
txn,
table="state_group_edges",
keyvalues={"state_group": next_group},
retcol="prev_state_group",
allow_none=True,
)
if next_group:
count += 1
return count
@defer.inlineCallbacks
def _get_state_groups_from_groups(self, groups, types):
"""Returns dictionary state_group -> (dict of (type, state_key) -> event id)
@@ -741,6 +548,220 @@ class StateStore(SQLBaseStore):
defer.returnValue(results)
class StateStore(StateGroupReadStore, BackgroundUpdateStore):
""" Keeps track of the state at a given event.
This is done by the concept of `state groups`. Every event is a assigned
a state group (identified by an arbitrary string), which references a
collection of state events. The current state of an event is then the
collection of state events referenced by the event's state group.
Hence, every change in the current state causes a new state group to be
generated. However, if no change happens (e.g., if we get a message event
with only one parent it inherits the state group from its parent.)
There are three tables:
* `state_groups`: Stores group name, first event with in the group and
room id.
* `event_to_state_groups`: Maps events to state groups.
* `state_groups_state`: Maps state group to state events.
"""
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
def __init__(self, db_conn, hs):
super(StateStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
self._background_deduplicate_state,
)
self.register_background_update_handler(
self.STATE_GROUP_INDEX_UPDATE_NAME,
self._background_index_state,
)
self.register_background_index_update(
self.CURRENT_STATE_INDEX_UPDATE_NAME,
index_name="current_state_events_member_index",
table="current_state_events",
columns=["state_key"],
where_clause="type='m.room.member'",
)
def _have_persisted_state_group_txn(self, txn, state_group):
txn.execute(
"SELECT count(*) FROM state_groups WHERE id = ?",
(state_group,)
)
row = txn.fetchone()
return row and row[0]
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
state_groups = {}
for event, context in events_and_contexts:
if event.internal_metadata.is_outlier():
continue
if context.current_state_ids is None:
# AFAIK, this can never happen
logger.error(
"Non-outlier event %s had current_state_ids==None",
event.event_id)
continue
# if the event was rejected, just give it the same state as its
# predecessor.
if context.rejected:
state_groups[event.event_id] = context.prev_group
continue
state_groups[event.event_id] = context.state_group
if self._have_persisted_state_group_txn(txn, context.state_group):
continue
self._simple_insert_txn(
txn,
table="state_groups",
values={
"id": context.state_group,
"room_id": event.room_id,
"event_id": event.event_id,
},
)
# We persist as a delta if we can, while also ensuring the chain
# of deltas isn't tooo long, as otherwise read performance degrades.
if context.prev_group:
is_in_db = self._simple_select_one_onecol_txn(
txn,
table="state_groups",
keyvalues={"id": context.prev_group},
retcol="id",
allow_none=True,
)
if not is_in_db:
raise Exception(
"Trying to persist state with unpersisted prev_group: %r"
% (context.prev_group,)
)
potential_hops = self._count_state_group_hops_txn(
txn, context.prev_group
)
if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
self._simple_insert_txn(
txn,
table="state_group_edges",
values={
"state_group": context.state_group,
"prev_state_group": context.prev_group,
},
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": context.state_group,
"room_id": event.room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in context.delta_ids.iteritems()
],
)
else:
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": context.state_group,
"room_id": event.room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in context.current_state_ids.iteritems()
],
)
# Prefill the state group cache with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=dict(context.current_state_ids),
full=True,
)
self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
values=[
{
"state_group": state_group_id,
"event_id": event_id,
}
for event_id, state_group_id in state_groups.iteritems()
],
)
for event_id, state_group_id in state_groups.iteritems():
txn.call_after(
self._get_state_group_for_event.prefill,
(event_id,), state_group_id
)
def _count_state_group_hops_txn(self, txn, state_group):
"""Given a state group, count how many hops there are in the tree.
This is used to ensure the delta chains don't get too long.
"""
if isinstance(self.database_engine, PostgresEngine):
sql = ("""
WITH RECURSIVE state(state_group) AS (
VALUES(?::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
SELECT count(*) FROM state;
""")
txn.execute(sql, (state_group,))
row = txn.fetchone()
if row and row[0]:
return row[0]
else:
return 0
else:
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
next_group = state_group
count = 0
while next_group:
next_group = self._simple_select_one_onecol_txn(
txn,
table="state_group_edges",
keyvalues={"state_group": next_group},
retcol="prev_state_group",
allow_none=True,
)
if next_group:
count += 1
return count
def get_next_state_group(self):
return self._state_groups_id_gen.get_next()

View File

@@ -39,7 +39,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
@@ -234,7 +234,7 @@ class StreamStore(SQLBaseStore):
results = {}
room_ids = list(room_ids)
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
res = yield preserve_context_over_deferred(defer.gatherResults([
res = yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(self.get_room_events_stream_for_room)(
room_id, from_key, to_key, limit, order=order,
)

View File

@@ -46,8 +46,8 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
def __init__(self, hs):
super(TransactionStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(TransactionStore, self).__init__(db_conn, hs)
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)

View File

@@ -17,7 +17,7 @@
from twisted.internet import defer, reactor
from .logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
)
from synapse.util import logcontext, unwrapFirstError
@@ -351,7 +351,7 @@ class ReadWriteLock(object):
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
yield curr_writer
yield make_deferred_yieldable(curr_writer)
@contextmanager
def _ctx_manager():
@@ -380,7 +380,7 @@ class ReadWriteLock(object):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager():

View File

@@ -13,32 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_fn
)
from synapse.util import unwrapFirstError
import logging
from twisted.internet import defer
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
return preserve_context_over_fn(
distributor.fire,
"user_left_room", user=user, room_id=room_id
)
with PreserveLoggingContext():
distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
return preserve_context_over_fn(
distributor.fire,
"user_joined_room", user=user, room_id=room_id
)
with PreserveLoggingContext():
distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):

View File

@@ -261,67 +261,6 @@ class PreserveLoggingContext(object):
)
class _PreservingContextDeferred(defer.Deferred):
"""A deferred that ensures that all callbacks and errbacks are called with
the given logging context.
"""
def __init__(self, context):
self._log_context = context
defer.Deferred.__init__(self)
def addCallbacks(self, callback, errback=None,
callbackArgs=None, callbackKeywords=None,
errbackArgs=None, errbackKeywords=None):
callback = self._wrap_callback(callback)
errback = self._wrap_callback(errback)
return defer.Deferred.addCallbacks(
self, callback,
errback=errback,
callbackArgs=callbackArgs,
callbackKeywords=callbackKeywords,
errbackArgs=errbackArgs,
errbackKeywords=errbackKeywords,
)
def _wrap_callback(self, f):
def g(res, *args, **kwargs):
with PreserveLoggingContext(self._log_context):
res = f(res, *args, **kwargs)
return res
return g
def preserve_context_over_fn(fn, *args, **kwargs):
"""Takes a function and invokes it with the given arguments, but removes
and restores the current logging context while doing so.
If the result is a deferred, call preserve_context_over_deferred before
returning it.
"""
with PreserveLoggingContext():
res = fn(*args, **kwargs)
if isinstance(res, defer.Deferred):
return preserve_context_over_deferred(res)
else:
return res
def preserve_context_over_deferred(deferred, context=None):
"""Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context.
Deprecated: this almost certainly doesn't do want you want, ie make
the deferred follow the synapse logcontext rules: try
``make_deferred_yieldable`` instead.
"""
if context is None:
context = LoggingContext.current_context()
d = _PreservingContextDeferred(context)
deferred.chainDeferred(d)
return d
def preserve_fn(f):
"""Wraps a function, to ensure that the current context is restored after
return from the function, and that the sentinel context is set once the

View File

@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership, EventTypes
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import logging
@@ -58,7 +58,7 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
always_include_ids (set(event_id)): set of event ids to specifically
include (unless sender is ignored)
"""
forgotten = yield preserve_context_over_deferred(defer.gatherResults([
forgotten = yield make_deferred_yieldable(defer.gatherResults([
defer.maybeDeferred(
preserve_fn(store.who_forgot_in_room),
room_id,

View File

@@ -36,6 +36,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
id="unique_identifier",
url="some_url",
token="some_token",
hostname="matrix.org", # only used by get_groups_for_user
namespaces={
ApplicationService.NS_USERS: [],
ApplicationService.NS_ROOMS: [],

View File

@@ -58,7 +58,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob")
self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob")
# must be done after inserts
self.store = ApplicationServiceStore(hs)
self.store = ApplicationServiceStore(None, hs)
def tearDown(self):
# TODO: suboptimal that we need to create files for tests!
@@ -150,7 +150,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
self.as_yaml_files = []
self.store = TestTransactionStore(hs)
self.store = TestTransactionStore(None, hs)
def _add_service(self, url, as_token, id):
as_yaml = dict(url=url, as_token=as_token, hs_token="something",
@@ -420,8 +420,8 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
class TestTransactionStore(ApplicationServiceTransactionStore,
ApplicationServiceStore):
def __init__(self, hs):
super(TestTransactionStore, self).__init__(hs)
def __init__(self, db_conn, hs):
super(TestTransactionStore, self).__init__(db_conn, hs)
class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
@@ -458,7 +458,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
replication_layer=Mock(),
)
ApplicationServiceStore(hs)
ApplicationServiceStore(None, hs)
@defer.inlineCallbacks
def test_duplicate_ids(self):
@@ -477,7 +477,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
)
with self.assertRaises(ConfigError) as cm:
ApplicationServiceStore(hs)
ApplicationServiceStore(None, hs)
e = cm.exception
self.assertIn(f1, e.message)
@@ -501,7 +501,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
)
with self.assertRaises(ConfigError) as cm:
ApplicationServiceStore(hs)
ApplicationServiceStore(None, hs)
e = cm.exception
self.assertIn(f1, e.message)

View File

@@ -56,7 +56,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
database_engine=create_engine(config.database_config),
)
self.datastore = SQLBaseStore(hs)
self.datastore = SQLBaseStore(None, hs)
@defer.inlineCallbacks
def test_insert_1col(self):

View File

@@ -29,7 +29,7 @@ class DirectoryStoreTestCase(unittest.TestCase):
def setUp(self):
hs = yield setup_test_homeserver()
self.store = DirectoryStore(hs)
self.store = DirectoryStore(None, hs)
self.room = RoomID.from_string("!abcde:test")
self.alias = RoomAlias.from_string("#my-room:test")

View File

@@ -29,7 +29,7 @@ class PresenceStoreTestCase(unittest.TestCase):
def setUp(self):
hs = yield setup_test_homeserver(clock=MockClock())
self.store = PresenceStore(hs)
self.store = PresenceStore(None, hs)
self.u_apple = UserID.from_string("@apple:test")
self.u_banana = UserID.from_string("@banana:test")

View File

@@ -29,7 +29,7 @@ class ProfileStoreTestCase(unittest.TestCase):
def setUp(self):
hs = yield setup_test_homeserver()
self.store = ProfileStore(hs)
self.store = ProfileStore(None, hs)
self.u_frank = UserID.from_string("@frank:test")