1
0

Compare commits

...

72 Commits

Author SHA1 Message Date
Half-Shot b296915ad0 Remove leading / from prefix 2019-12-04 15:55:31 +00:00
Half-Shot 8b2aa8decf s/user_id/key 2019-12-04 11:36:35 +00:00
Half-Shot 99c5545ab8 Tidy up 2019-12-04 11:28:42 +00:00
Half-Shot 44dabec8f3 newsfile 2019-12-03 18:22:36 +00:00
Half-Shot fbcfeba2dc use build_uri to properly join url parts together 2019-12-03 18:17:22 +00:00
Half-Shot 3ad48dddb2 Use the /_matrix/app/v1 endpoint 2019-12-03 18:17:08 +00:00
Amber Brown fdec84aa42 Add benchmarks for structured logging performance (#6266) 2019-12-03 20:21:25 +11:00
Andrew Morgan 72078e4be5 Transfer power level state events on room upgrade (#6237) 2019-12-02 15:11:32 +00:00
Andrew Morgan 0ad75fd98e Use python3 packages for Ubuntu (#6443) 2019-12-02 15:09:57 +00:00
Filip Štědronský 81731c6e75 Fix: Pillow error when uploading RGBA image (#3325) (#6241)
Signed-Off-By: Filip Štědronský <g@regnarg.cz>
2019-12-02 12:12:55 +00:00
Andrew Morgan 23ea572125 Add User-Interactive Auth to /account/3pid/add (#6119) 2019-11-29 13:51:14 +00:00
Erik Johnston 1c3a61529f Merge pull request #6434 from matrix-org/erikj/msc2367_membership_reasons
Implement MSC 2367 - Membership Reasons
2019-11-29 13:30:36 +00:00
Brendan Abolivier 6d8576c4ce Merge pull request #6436 from matrix-org/babolivier/fix-state-retrieval
Discard retention policies when retrieving state
2019-11-29 11:25:11 +00:00
Brendan Abolivier 78ec11c085 Lint 2019-11-28 20:35:22 +00:00
Brendan Abolivier 5ee2beeddb Changelog 2019-11-28 19:32:49 +00:00
Brendan Abolivier 708cef88cf Discard retention policies when retrieving state
Purge jobs don't delete the latest event in a room in order to keep the forward extremity and not break the room. On the other hand, get_state_events, when given an at_token argument calls filter_events_for_client to know if the user can see the event that matches that (sync) token. That function uses the retention policies of the events it's given to filter out those that are too old from a client's view.

Some clients, such as Riot, when loading a room, request the list of members for the latest sync token it knows about, and get confused to the point of refusing to send any message if the server tells it that it can't get that information. This can happen very easily with the message retention feature turned on and a room with low activity so that the last event sent becomes too old according to the room's retention policy.

An easy and clean fix for that issue is to discard the room's retention policies when retrieving state.
2019-11-28 19:26:26 +00:00
Richard van der Hoff 7baeea9f37 blacklist more tests 2019-11-28 14:55:19 +00:00
Erik Johnston 19ba7c142e Newsfile 2019-11-28 13:59:32 +00:00
Andrew Morgan 96562131a4 Merge branch 'master' into develop 2019-11-28 12:08:02 +00:00
Erik Johnston 8c9a713f8d Add tests 2019-11-28 11:32:06 +00:00
Erik Johnston 2173785f0d Propagate reason in remotely rejected invites 2019-11-28 11:31:56 +00:00
Erik Johnston 69d8fb83c6 MSC2367 Allow reason field on all member events 2019-11-28 11:02:04 +00:00
Andrew Morgan a9c44d4008 Remove local threepids on account deactivation (#6426) 2019-11-28 10:40:42 +00:00
Richard van der Hoff c48ea98007 Clarifications for the email configuration settings. (#6423)
Cf #6422
2019-11-28 09:29:18 +00:00
Amber Brown 0f87b912ab Implementation of MSC2314 (#6176) 2019-11-28 08:54:07 +11:00
Hubert Chathi 0d27aba900 add etag and count to key backup endpoints (#5858) 2019-11-27 16:14:44 -05:00
Richard van der Hoff 6f4a63df00 Add more tests to the worker blacklist (#6429) 2019-11-27 18:18:33 +00:00
Brendan Abolivier d31f69afa0 Merge pull request #6358 from matrix-org/babolivier/message_retention
Implement message retention policies (MSC1763)
2019-11-27 15:04:38 +00:00
Richard van der Hoff 70c0da4d82 clean up buildkite output 2019-11-26 19:00:24 +00:00
Andrew Morgan ce578031f4 Remove assertion and provide a clear warning on startup for missing public_baseurl (#6379) 2019-11-26 18:42:27 +00:00
Richard van der Hoff 651d930f16 Merge pull request #6343 from matrix-org/rav/event_auth/4
Refactor _update_auth_events_and_context_for_auth
2019-11-26 18:15:03 +00:00
Richard van der Hoff ef1a85e773 Fix startup error when http proxy is defined. (#6421)
Guess I only tested this on python 2 :/

Fixes #6419.
2019-11-26 18:10:50 +00:00
Brendan Abolivier 9e937c28ee Merge branch 'develop' into babolivier/message_retention 2019-11-26 17:53:57 +00:00
Brendan Abolivier f0ef970824 Don't restrict the tests to v1 rooms 2019-11-26 17:49:12 +00:00
Erik Johnston f085894cd1 Merge pull request #6420 from matrix-org/erikj/fix_find_next_generated_user_id_localpart
Fix guest registration
2019-11-26 17:04:15 +00:00
Erik Johnston f8f14ba466 Don't construct a set 2019-11-26 16:06:41 +00:00
Erik Johnston ba110a2030 Newsfile 2019-11-26 15:54:48 +00:00
Erik Johnston 8bb7b15894 Fix find_next_generated_user_id_localpart 2019-11-26 15:54:48 +00:00
Andrew Morgan 9fb350af65 Merge branch 'master' into develop 2019-11-26 14:15:30 +00:00
Andrew Morgan a8175d0f96 Prevent account_data content from being sent over TCP replication (#6333) 2019-11-26 13:58:39 +00:00
Erik Johnston f9f1c8acbb Merge pull request #6332 from matrix-org/erikj/query_devices_fix
Fix caching devices for remote servers in worker.
2019-11-26 12:56:05 +00:00
Richard van der Hoff 4d394d6415 remove confusing fixme 2019-11-26 12:32:37 +00:00
Erik Johnston 35f9165e96 Fixup docs 2019-11-26 12:04:48 +00:00
Richard van der Hoff c01d543584 Make sure that we close cursors before returning from a query (#6408)
There are lots of words in the comment as to why this is a good idea.

Fixes #6403.
2019-11-25 21:03:17 +00:00
Richard van der Hoff 07929bd62f Merge tag 'v1.6.0rc2' into develop
Synapse 1.6.0rc2 (2019-11-25)
=============================

Bugfixes
--------

- Fix a bug which could cause the background database update hander for event labels to get stuck in a loop raising exceptions. ([\#6407](https://github.com/matrix-org/synapse/issues/6407))
2019-11-25 17:51:39 +00:00
Amber Brown 9eebd46048 Improve the performance of structured logging (#6322) 2019-11-26 03:45:50 +11:00
Brendan Abolivier 78cfc05fc4 Merge pull request #6392 from matrix-org/babolivier/fix-1623
Test if a purge can make /messages return 500 responses
2019-11-25 12:59:36 +00:00
Andrew Morgan 265c0bd2fe Add working build command for docker image (#6390)
* Add working build command for docker image

* Add changelog
2019-11-23 06:54:05 +11:00
Aaron Raimist 24cc31ee96 Fix link to user_dir_populate.sql in the user directory docs (#6388) 2019-11-21 17:38:14 +00:00
Andrew Morgan 3916e1b97a Clean up newline quote marks around the codebase (#6362) 2019-11-21 12:00:14 +00:00
Matthew Hodgson 9cc168e42e update macOS installation instructions 2019-11-20 18:44:45 +00:00
Brendan Abolivier e2a20326e8 Lint 2019-11-20 15:08:47 +00:00
Brendan Abolivier 486be06f48 Changelog 2019-11-20 15:08:03 +00:00
Brendan Abolivier 6356f2088f Test if a purge can make /messages return 500 responses 2019-11-20 12:09:06 +00:00
Brendan Abolivier cdd3cb870d Fix worker mode 2019-11-19 14:40:21 +00:00
Brendan Abolivier a6fc6754f8 Fix 3PID invite exchange 2019-11-19 14:07:39 +00:00
Brendan Abolivier 97b863fe32 Lint again 2019-11-19 13:33:58 +00:00
Brendan Abolivier bf9a11c54d Lint again 2019-11-19 13:30:04 +00:00
Brendan Abolivier 7c24d0f443 Lint 2019-11-19 13:22:37 +00:00
Richard van der Hoff 870c00e278 Merge remote-tracking branch 'origin/develop' into rav/event_auth/4 2019-11-18 12:05:36 +00:00
Andrew Morgan a65a5ea125 Merge branch 'develop' of github.com:matrix-org/synapse into anoa/fix_account_data_sync
* 'develop' of github.com:matrix-org/synapse:
  Blacklist PurgeRoomTestCase (#6361)
  Set room version default to 5
2019-11-14 10:26:56 +00:00
Richard van der Hoff 20d687516f newsfile 2019-11-08 16:17:02 +00:00
Andrew Morgan cd96b4586f lint 2019-11-08 15:45:45 +00:00
Andrew Morgan 318dd21b47 Add changelog 2019-11-08 15:45:08 +00:00
Andrew Morgan c4bdf2d785 Remove content from being sent for account data rdata stream 2019-11-08 15:44:02 +00:00
Richard van der Hoff f41027f746 Use get_events_as_list rather than lots of calls to get_event
It's more efficient and clearer.
2019-11-08 12:21:28 +00:00
Richard van der Hoff f8407975e7 Update some docstrings and comments 2019-11-08 12:18:20 +00:00
Richard van der Hoff 772d414975 Simplify _update_auth_events_and_context_for_auth
move event_key calculation into _update_context_for_auth_events, since it's
only used there.
2019-11-08 11:40:11 +00:00
Brendan Abolivier f03c9d3444 Don't apply retention policy based filtering on state events
As per MSC1763, 'Retention is only considered for non-state events.', so don't filter out state events based on the room's retention policy.
2019-11-06 18:40:04 +00:00
Erik Johnston 248111bae8 Newsfile 2019-11-05 15:54:23 +00:00
Erik Johnston c16e192e2f Fix caching devices for remote servers in worker.
When the `/keys/query` API is hit on client_reader worker Synapse may
decide that it needs to resync some remote deivces. Usually this happens
on master, and then gets cached. However, that fails on workers and so
it falls back to fetching devices from remotes directly, which may in
turn fail if the remote is down.
2019-11-05 15:49:43 +00:00
Brendan Abolivier 09957ce0e4 Implement per-room message retention policies 2019-11-04 17:09:22 +00:00
109 changed files with 2578 additions and 386 deletions
+3 -1
View File
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -ex
set -e
if [[ "$BUILDKITE_BRANCH" =~ ^(develop|master|dinsic|shhs|release-.*)$ ]]; then
echo "Not merging forward, as this is a release branch"
@@ -18,6 +18,8 @@ else
GITBASE=$BUILDKITE_PULL_REQUEST_BASE_BRANCH
fi
echo "--- merge_base_branch $GITBASE"
# Show what we are before
git --no-pager show -s
+36
View File
@@ -28,3 +28,39 @@ User sees updates to presence from other users in the incremental sync.
Gapped incremental syncs include all state changes
Old members are included in gappy incr LL sync if they start speaking
# new failures as of https://github.com/matrix-org/sytest/pull/732
Device list doesn't change if remote server is down
Remote servers cannot set power levels in rooms without existing powerlevels
Remote servers should reject attempts by non-creators to set the power levels
# new failures as of https://github.com/matrix-org/sytest/pull/753
GET /rooms/:room_id/messages returns a message
GET /rooms/:room_id/messages lazy loads members correctly
Read receipts are sent as events
Only original members of the room can see messages from erased users
Device deletion propagates over federation
If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes
Changing user-signing key notifies local users
Newly updated tags appear in an incremental v2 /sync
Server correctly handles incoming m.device_list_update
Local device key changes get to remote servers with correct prev_id
AS-ghosted users can use rooms via AS
Ghost user must register before joining room
Test that a message is pushed
Invites are pushed
Rooms with aliases are correctly named in pushed
Rooms with names are correctly named in pushed
Rooms with canonical alias are correctly named in pushed
Rooms with many users are correctly pushed
Don't get pushed for rooms you've muted
Rejected events are not pushed
Test that rejected pushers are removed.
Events come down the correct room
# https://buildkite.com/matrix-dot-org/sytest/builds/326#cca62404-a88a-4fcb-ad41-175fd3377603
Presence changes to UNAVAILABLE are reported to remote room members
If remote user leaves room, changes device and rejoins we see update in sync
uploading self-signing key notifies over federation
Inbound federation can receive redacted events
Outbound federation can request missing events
+12 -4
View File
@@ -109,8 +109,8 @@ Installing prerequisites on Ubuntu or Debian:
```
sudo apt-get install build-essential python3-dev libffi-dev \
python-pip python-setuptools sqlite3 \
libssl-dev python-virtualenv libjpeg-dev libxslt1-dev
python3-pip python3-setuptools sqlite3 \
libssl-dev python3-virtualenv libjpeg-dev libxslt1-dev
```
#### ArchLinux
@@ -133,9 +133,9 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
sudo yum groupinstall "Development Tools"
```
#### Mac OS X
#### macOS
Installing prerequisites on Mac OS X:
Installing prerequisites on macOS:
```
xcode-select --install
@@ -144,6 +144,14 @@ sudo pip install virtualenv
brew install pkg-config libffi
```
On macOS Catalina (10.15) you may need to explicitly install OpenSSL
via brew and inform `pip` about it so that `psycopg2` builds:
```
brew install openssl@1.1
export LDFLAGS=-L/usr/local/Cellar/openssl\@1.1/1.1.1d/lib/
```
#### OpenSUSE
Installing prerequisites on openSUSE:
+1
View File
@@ -0,0 +1 @@
Implement per-room message retention policies.
+1
View File
@@ -0,0 +1 @@
Add etag and count fields to key backup endpoints to help clients guess if there are new keys.
+1
View File
@@ -0,0 +1 @@
Require User-Interactive Authentication for `/account/3pid/add`, meaning the user's password will be required to add a third-party ID to their account.
+1
View File
@@ -0,0 +1 @@
Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314.
+1
View File
@@ -0,0 +1 @@
Transfer non-standard power levels on room upgrade.
+1
View File
@@ -0,0 +1 @@
Fix error from the Pillow library when uploading RGBA images.
+1
View File
@@ -0,0 +1 @@
Add benchmarks for structured logging and improve output performance.
+1
View File
@@ -0,0 +1 @@
Improve the performance of outputting structured logging.
+1
View File
@@ -0,0 +1 @@
Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices.
+1
View File
@@ -0,0 +1 @@
Prevent account data syncs getting lost across TCP replication.
+1
View File
@@ -0,0 +1 @@
Refactor some code in the event authentication path for clarity.
+1
View File
@@ -0,0 +1 @@
Clean up some unnecessary quotation marks around the codebase.
+1
View File
@@ -0,0 +1 @@
Complain on startup instead of 500'ing during runtime when `public_baseurl` isn't set when necessary.
+1
View File
@@ -0,0 +1 @@
Fix link in the user directory documentation.
+1
View File
@@ -0,0 +1 @@
Add build instructions to the docker readme.
+1
View File
@@ -0,0 +1 @@
Add a test scenario to make sure room history purges don't break `/messages` in the future.
+1
View File
@@ -0,0 +1 @@
Fix an intermittent exception when handling read-receipts.
+1
View File
@@ -0,0 +1 @@
Fix broken guest registration when there are existing blocks of numeric user IDs.
+1
View File
@@ -0,0 +1 @@
Fix startup error when http proxy is defined.
+1
View File
@@ -0,0 +1 @@
Clarifications for the email configuration settings.
+1
View File
@@ -0,0 +1 @@
Clean up local threepids from user on account deactivation.
+1
View File
@@ -0,0 +1 @@
Add more tests to the blacklist when running in worker mode.
+1
View File
@@ -0,0 +1 @@
Add support for MSC 2367, which allows specifying a reason on all membership events.
+1
View File
@@ -0,0 +1 @@
Fix a bug where a room could become unusable with a low retention policy and a low activity.
+1
View File
@@ -0,0 +1 @@
Switch Ubuntu package install recommendation to use python3 packages in INSTALL.md.
+1
View File
@@ -0,0 +1 @@
Use `/_matrix/app/v1` endpoint prefix for appservices.
+12
View File
@@ -130,3 +130,15 @@ docker run -it --rm \
This will generate the same configuration file as the legacy mode used, but
will store it in `/data/homeserver.yaml` instead of a temporary location. You
can then use it as shown above at [Running synapse](#running-synapse).
## Building the image
If you need to build the image from a Synapse checkout, use the following `docker
build` command from the repo's root:
```
docker build -t matrixdotorg/synapse -f docker/Dockerfile .
```
You can choose to build a different docker image by changing the value of the `-f` flag to
point to another Dockerfile.
+79 -1
View File
@@ -328,6 +328,69 @@ listeners:
#
#user_ips_max_age: 14d
# Message retention policy at the server level.
#
# Room admins and mods can define a retention period for their rooms using the
# 'm.room.retention' state event, and server admins can cap this period by setting
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
#
# If this feature is enabled, Synapse will regularly look for and purge events
# which are older than the room's maximum retention period. Synapse will also
# filter events received over federation so that events that should have been
# purged are ignored and not stored again.
#
retention:
# The message retention policies feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# Default retention policy. If set, Synapse will apply it to rooms that lack the
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
# matter much because Synapse doesn't take it into account yet.
#
#default_policy:
# min_lifetime: 1d
# max_lifetime: 1y
# Retention policy limits. If set, a user won't be able to send a
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
# that's not within this range. This is especially useful in closed federations,
# in which server admins can make sure every federating server applies the same
# rules.
#
#allowed_lifetime_min: 1d
#allowed_lifetime_max: 1y
# Server admins can define the settings of the background jobs purging the
# events which lifetime has expired under the 'purge_jobs' section.
#
# If no configuration is provided, a single job will be set up to delete expired
# events in every room daily.
#
# Each job's configuration defines which range of message lifetimes the job
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
# lower than or equal to 3 days. Both the minimum and the maximum value of a
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
# which 'max_lifetime' is lower than or equal to three days.
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
## TLS ##
@@ -1270,8 +1333,23 @@ password_config:
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name: Matrix
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
#
# # Enable email notifications by default
# #
+1 -2
View File
@@ -7,7 +7,6 @@ who are present in a publicly viewable room present on the server.
The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL here
https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
and then restart synapse. This should then start a background task to
flush the current tables and regenerate the directory.
+2
View File
@@ -94,6 +94,8 @@ class EventTypes(object):
ServerACL = "m.room.server_acl"
Pinned = "m.room.pinned_events"
Retention = "m.room.retention"
class RejectedReason(object):
AUTH_ERROR = "auth_error"
+1 -1
View File
@@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn):
sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()
+16 -20
View File
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os.path
import urllib.parse
from six.moves import urllib
@@ -46,9 +48,7 @@ sent_events_counter = Counter(
)
HOUR_IN_MS = 60 * 60 * 1000
APP_SERVICE_PREFIX = "/_matrix/app/unstable"
APP_SERVICE_PREFIX = "_matrix/app/v1"
def _is_valid_3pe_metadata(info):
@@ -81,6 +81,11 @@ def _is_valid_3pe_result(r, field):
return True
def _build_as_uri(service, endpoint_name, key):
key = urllib.parse.quote(key)
return os.path.join(service.url, APP_SERVICE_PREFIX, endpoint_name, key)
class ApplicationServiceApi(SimpleHttpClient):
"""This class manages HS -> AS communications, including querying and
pushing.
@@ -98,7 +103,7 @@ class ApplicationServiceApi(SimpleHttpClient):
def query_user(self, service, user_id):
if service.url is None:
return False
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
uri = _build_as_uri(service, "users", user_id)
response = None
try:
response = yield self.get_json(uri, {"access_token": service.hs_token})
@@ -116,7 +121,7 @@ class ApplicationServiceApi(SimpleHttpClient):
def query_alias(self, service, alias):
if service.url is None:
return False
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
uri = _build_as_uri(service, "rooms", alias)
response = None
try:
response = yield self.get_json(uri, {"access_token": service.hs_token})
@@ -141,12 +146,8 @@ class ApplicationServiceApi(SimpleHttpClient):
if service.url is None:
return []
uri = "%s%s/thirdparty/%s/%s" % (
service.url,
APP_SERVICE_PREFIX,
kind,
urllib.parse.quote(protocol),
)
uri = _build_as_uri(service, "thirdparty/%s" % kind, protocol)
try:
response = yield self.get_json(uri, fields)
if not isinstance(response, list):
@@ -175,17 +176,13 @@ class ApplicationServiceApi(SimpleHttpClient):
@defer.inlineCallbacks
def _get():
uri = "%s%s/thirdparty/protocol/%s" % (
service.url,
APP_SERVICE_PREFIX,
urllib.parse.quote(protocol),
)
uri = _build_as_uri(service, "thirdparty/protocol", protocol)
try:
info = yield self.get_json(uri, {})
if not _is_valid_3pe_metadata(info):
logger.warning(
"query_3pe_protocol to %s did not return a" " valid result", uri
"query_3pe_protocol to %s did not return a valid result", uri
)
return None
@@ -215,10 +212,9 @@ class ApplicationServiceApi(SimpleHttpClient):
logger.warning(
"push_bulk: Missing txn ID sending events to %s", service.url
)
txn_id = str(0)
txn_id = str(txn_id)
txn_id = 0
uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
uri = _build_as_uri(service, "transactions", str(txn_id))
try:
yield self.put_json(
uri=uri,
+1 -1
View File
@@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename):
for regex_obj in as_info["namespaces"][ns]:
if not isinstance(regex_obj, dict):
raise ValueError(
"Expected namespace entry in %s to be an object," " but got %s",
"Expected namespace entry in %s to be an object, but got %s",
ns,
regex_obj,
)
+18 -1
View File
@@ -146,6 +146,8 @@ class EmailConfig(Config):
if k not in email_config:
missing.append("email." + k)
# public_baseurl is required to build password reset and validation links that
# will be emailed to users
if config.get("public_baseurl") is None:
missing.append("public_baseurl")
@@ -305,8 +307,23 @@ class EmailConfig(Config):
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name: Matrix
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
#
# # Enable email notifications by default
# #
+7
View File
@@ -106,6 +106,13 @@ class RegistrationConfig(Config):
account_threepid_delegates = config.get("account_threepid_delegates") or {}
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
if self.account_threepid_delegate_msisdn and not self.public_baseurl:
raise ConfigError(
"The configuration option `public_baseurl` is required if "
"`account_threepid_delegate.msisdn` is set, such that "
"clients know where to submit validation tokens to. Please "
"configure `public_baseurl`."
)
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
+1 -1
View File
@@ -170,7 +170,7 @@ class _RoomDirectoryRule(object):
self.action = action
else:
raise ConfigError(
"%s rules can only have action of 'allow'" " or 'deny'" % (option_name,)
"%s rules can only have action of 'allow' or 'deny'" % (option_name,)
)
self._alias_matches_all = alias == "*"
+185 -4
View File
@@ -19,7 +19,7 @@ import logging
import os.path
import re
from textwrap import indent
from typing import List
from typing import Dict, List, Optional
import attr
import yaml
@@ -223,7 +223,7 @@ class ServerConfig(Config):
self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
except Exception as e:
raise ConfigError(
"Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
)
if self.public_baseurl is not None:
@@ -246,6 +246,124 @@ class ServerConfig(Config):
# events with profile information that differ from the target's global profile.
self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
retention_config = config.get("retention")
if retention_config is None:
retention_config = {}
self.retention_enabled = retention_config.get("enabled", False)
retention_default_policy = retention_config.get("default_policy")
if retention_default_policy is not None:
self.retention_default_min_lifetime = retention_default_policy.get(
"min_lifetime"
)
if self.retention_default_min_lifetime is not None:
self.retention_default_min_lifetime = self.parse_duration(
self.retention_default_min_lifetime
)
self.retention_default_max_lifetime = retention_default_policy.get(
"max_lifetime"
)
if self.retention_default_max_lifetime is not None:
self.retention_default_max_lifetime = self.parse_duration(
self.retention_default_max_lifetime
)
if (
self.retention_default_min_lifetime is not None
and self.retention_default_max_lifetime is not None
and (
self.retention_default_min_lifetime
> self.retention_default_max_lifetime
)
):
raise ConfigError(
"The default retention policy's 'min_lifetime' can not be greater"
" than its 'max_lifetime'"
)
else:
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None
self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
if self.retention_allowed_lifetime_min is not None:
self.retention_allowed_lifetime_min = self.parse_duration(
self.retention_allowed_lifetime_min
)
self.retention_allowed_lifetime_max = retention_config.get(
"allowed_lifetime_max"
)
if self.retention_allowed_lifetime_max is not None:
self.retention_allowed_lifetime_max = self.parse_duration(
self.retention_allowed_lifetime_max
)
if (
self.retention_allowed_lifetime_min is not None
and self.retention_allowed_lifetime_max is not None
and self.retention_allowed_lifetime_min
> self.retention_allowed_lifetime_max
):
raise ConfigError(
"Invalid retention policy limits: 'allowed_lifetime_min' can not be"
" greater than 'allowed_lifetime_max'"
)
self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]]
for purge_job_config in retention_config.get("purge_jobs", []):
interval_config = purge_job_config.get("interval")
if interval_config is None:
raise ConfigError(
"A retention policy's purge jobs configuration must have the"
" 'interval' key set."
)
interval = self.parse_duration(interval_config)
shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime")
if shortest_max_lifetime is not None:
shortest_max_lifetime = self.parse_duration(shortest_max_lifetime)
longest_max_lifetime = purge_job_config.get("longest_max_lifetime")
if longest_max_lifetime is not None:
longest_max_lifetime = self.parse_duration(longest_max_lifetime)
if (
shortest_max_lifetime is not None
and longest_max_lifetime is not None
and shortest_max_lifetime > longest_max_lifetime
):
raise ConfigError(
"A retention policy's purge jobs configuration's"
" 'shortest_max_lifetime' value can not be greater than its"
" 'longest_max_lifetime' value."
)
self.retention_purge_jobs.append(
{
"interval": interval,
"shortest_max_lifetime": shortest_max_lifetime,
"longest_max_lifetime": longest_max_lifetime,
}
)
if not self.retention_purge_jobs:
self.retention_purge_jobs = [
{
"interval": self.parse_duration("1d"),
"shortest_max_lifetime": None,
"longest_max_lifetime": None,
}
]
self.listeners = [] # type: List[dict]
for listener in config.get("listeners", []):
if not isinstance(listener.get("port", None), int):
@@ -761,6 +879,69 @@ class ServerConfig(Config):
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
#
#user_ips_max_age: 14d
# Message retention policy at the server level.
#
# Room admins and mods can define a retention period for their rooms using the
# 'm.room.retention' state event, and server admins can cap this period by setting
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
#
# If this feature is enabled, Synapse will regularly look for and purge events
# which are older than the room's maximum retention period. Synapse will also
# filter events received over federation so that events that should have been
# purged are ignored and not stored again.
#
retention:
# The message retention policies feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# Default retention policy. If set, Synapse will apply it to rooms that lack the
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
# matter much because Synapse doesn't take it into account yet.
#
#default_policy:
# min_lifetime: 1d
# max_lifetime: 1y
# Retention policy limits. If set, a user won't be able to send a
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
# that's not within this range. This is especially useful in closed federations,
# in which server admins can make sure every federating server applies the same
# rules.
#
#allowed_lifetime_min: 1d
#allowed_lifetime_max: 1y
# Server admins can define the settings of the background jobs purging the
# events which lifetime has expired under the 'purge_jobs' section.
#
# If no configuration is provided, a single job will be set up to delete expired
# events in every room daily.
#
# Each job's configuration defines which range of message lifetimes the job
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
# lower than or equal to 3 days. Both the minimum and the maximum value of a
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
# which 'max_lifetime' is lower than or equal to three days.
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
"""
% locals()
)
@@ -787,14 +968,14 @@ class ServerConfig(Config):
"--print-pidfile",
action="store_true",
default=None,
help="Print the path to the pidfile just" " before daemonizing",
help="Print the path to the pidfile just before daemonizing",
)
server_group.add_argument(
"--manhole",
metavar="PORT",
dest="manhole",
type=int,
help="Turn on the twisted telnet manhole" " service on the given port.",
help="Turn on the twisted telnet manhole service on the given port.",
)
+97 -3
View File
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six import string_types
from six import integer_types, string_types
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
@@ -22,11 +22,12 @@ from synapse.types import EventID, RoomID, UserID
class EventValidator(object):
def validate_new(self, event):
def validate_new(self, event, config):
"""Validates the event has roughly the right format
Args:
event (FrozenEvent)
event (FrozenEvent): The event to validate.
config (Config): The homeserver's configuration.
"""
self.validate_builder(event)
@@ -67,6 +68,99 @@ class EventValidator(object):
Codes.INVALID_PARAM,
)
if event.type == EventTypes.Retention:
self._validate_retention(event, config)
def _validate_retention(self, event, config):
"""Checks that an event that defines the retention policy for a room respects the
boundaries imposed by the server's administrator.
Args:
event (FrozenEvent): The event to validate.
config (Config): The homeserver's configuration.
"""
min_lifetime = event.content.get("min_lifetime")
max_lifetime = event.content.get("max_lifetime")
if min_lifetime is not None:
if not isinstance(min_lifetime, integer_types):
raise SynapseError(
code=400,
msg="'min_lifetime' must be an integer",
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_min is not None
and min_lifetime < config.retention_allowed_lifetime_min
):
raise SynapseError(
code=400,
msg=(
"'min_lifetime' can't be lower than the minimum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_max is not None
and min_lifetime > config.retention_allowed_lifetime_max
):
raise SynapseError(
code=400,
msg=(
"'min_lifetime' can't be greater than the maximum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if max_lifetime is not None:
if not isinstance(max_lifetime, integer_types):
raise SynapseError(
code=400,
msg="'max_lifetime' must be an integer",
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_min is not None
and max_lifetime < config.retention_allowed_lifetime_min
):
raise SynapseError(
code=400,
msg=(
"'max_lifetime' can't be lower than the minimum allowed value"
" enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_max is not None
and max_lifetime > config.retention_allowed_lifetime_max
):
raise SynapseError(
code=400,
msg=(
"'max_lifetime' can't be greater than the maximum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
min_lifetime is not None
and max_lifetime is not None
and min_lifetime > max_lifetime
):
raise SynapseError(
code=400,
msg="'min_lifetime' can't be greater than 'max_lifetime",
errcode=Codes.BAD_JSON,
)
def validate_builder(self, event):
"""Validates that the builder/event has roughly the right format. Only
checks values that we expect a proto event to have, rather than all the
+17 -9
View File
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -73,6 +74,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -264,9 +266,6 @@ class FederationServer(FederationBase):
await self.registry.on_edu(edu_type, origin, content)
async def on_context_state_request(self, origin, room_id, event_id):
if not event_id:
raise NotImplementedError("Specify an event")
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
@@ -280,13 +279,18 @@ class FederationServer(FederationBase):
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (await self._server_linearizer.queue((origin, room_id))):
resp = await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
resp = dict(
await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
)
)
room_version = await self.store.get_room_version(room_id)
resp["room_version"] = room_version
return 200, resp
async def on_state_ids_request(self, origin, room_id, event_id):
@@ -306,7 +310,11 @@ class FederationServer(FederationBase):
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
async def _on_context_state_request_compute(self, room_id, event_id):
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
else:
pdus = (await self.state.get_current_state(room_id)).values()
auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
return {
+2 -2
View File
@@ -44,7 +44,7 @@ class TransactionActions(object):
response code and response body.
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.get_received_txn_response(transaction.transaction_id, origin)
@@ -56,7 +56,7 @@ class TransactionActions(object):
Deferred
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.set_received_txn_response(
transaction.transaction_id, origin, code, response
+1 -1
View File
@@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total",
"" "Total number of PDUs queued for sending across all destinations",
"Total number of PDUs queued for sending across all destinations",
)
@@ -84,7 +84,7 @@ class TransactionManager(object):
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
"TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
@@ -103,7 +103,7 @@ class TransactionManager(object):
self._next_txn_id += 1
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
"TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
+3 -3
View File
@@ -421,7 +421,7 @@ class FederationEventServlet(BaseFederationServlet):
return await self.handler.on_pdu_request(origin, event_id)
class FederationStateServlet(BaseFederationServlet):
class FederationStateV1Servlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?"
# This is when someone asks for all data for a given context.
@@ -429,7 +429,7 @@ class FederationStateServlet(BaseFederationServlet):
return await self.handler.on_context_state_request(
origin,
context,
parse_string_from_args(query, "event_id", None, required=True),
parse_string_from_args(query, "event_id", None, required=False),
)
@@ -1360,7 +1360,7 @@ class RoomComplexityServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
FederationStateServlet,
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,
+1 -1
View File
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
"This application service has not reserved" " this kind of alias.",
"This application service has not reserved this kind of alias.",
errcode=Codes.EXCLUSIVE,
)
else:
+16 -3
View File
@@ -30,6 +30,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
if self._is_master:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
user_devices = yield self._user_device_resync_client(
user_id=user_id
)
user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
+78 -48
View File
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
rooms
session_id(string): session ID to delete keys for, for None to delete keys
for all sessions
Raises:
NotFoundError: if the backup version does not exist
Returns:
A deferred of the deletion transaction
A dict containing the count and etag for the backup version
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
# make sure the backup version exists
try:
version_info = yield self.store.get_e2e_room_keys_version_info(
user_id, version
)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown backup version")
else:
raise
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
version_etag = version_info["etag"] + 1
yield self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
)
count = yield self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
}
}
Returns:
A dict containing the count and etag for the backup version
Raises:
NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
else:
raise
# go through the room_keys.
# XXX: this should/could be done concurrently, given we're in a lock.
# Fetch any existing room keys for the sessions that have been
# submitted. Then compare them with the submitted keys. If the
# key is new, insert it; if the key should be updated, then update
# it; otherwise, drop it.
existing_keys = yield self.store.get_e2e_room_keys_multi(
user_id, version, room_keys["rooms"]
)
to_insert = [] # batch the inserts together
changed = False # if anything has changed, we need to update the etag
for room_id, room in iteritems(room_keys["rooms"]):
for session_id, session in iteritems(room["sessions"]):
yield self._upload_room_key(
user_id, version, room_id, session_id, session
for session_id, room_key in iteritems(room["sessions"]):
log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
current_room_key = existing_keys.get(room_id, {}).get(session_id)
if current_room_key:
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
# updates are done one at a time in the DB, so send
# updates right away rather than batching them up,
# like we do with the inserts
yield self.store.update_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
changed = True
else:
log_kv({"message": "Not replacing room_key."})
else:
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
log_kv({"message": "Replacing room key."})
to_insert.append((room_id, session_id, room_key))
changed = True
@defer.inlineCallbacks
def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
"""Upload a given room_key for a given room and session into a given
version of the backup. Merges the key with any which might already exist.
if len(to_insert):
yield self.store.add_e2e_room_keys(user_id, version, to_insert)
Args:
user_id(str): the user whose backup we're setting
version(str): the version ID of the backup we're updating
room_id(str): the ID of the room whose keys we're setting
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
current_room_key = yield self.store.get_e2e_room_key(
user_id, version, room_id, session_id
)
except StoreError as e:
if e.code == 404:
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
version_etag = version_info["etag"]
if changed:
version_etag = version_etag + 1
yield self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
)
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
log_kv({"message": "Not replacing room_key."})
count = yield self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
raise NotFoundError("Unknown backup version")
else:
raise
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
return res
@trace
+48 -41
View File
@@ -1428,9 +1428,9 @@ class FederationHandler(BaseHandler):
return event
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts, room_id, user_id, "leave"
target_hosts, room_id, user_id, "leave", content=content,
)
# Mark as outlier as we don't have any state for this event; we're not
# even in the room.
@@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
What we expect the event's auth_events to be, based on the event's
position in the dag. I think? maybe??
Normally, our calculated auth_events based on the state of the room
at the event's position in the DAG, though occasionally (eg if the
event is an outlier), may be the auth events claimed by the remote
server.
Also NB that this function adds entries to it.
Returns:
@@ -2091,30 +2093,35 @@ class FederationHandler(BaseHandler):
origin (str):
event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
Normally, our calculated auth_events based on the state of the room
at the event's position in the DAG, though occasionally (eg if the
event is an outlier), may be the auth events claimed by the remote
server.
Also NB that this function adds entries to it.
Returns:
defer.Deferred[EventContext]: updated context
"""
event_auth_events = set(event.auth_event_ids())
if event.is_state():
event_key = (event.type, event.state_key)
else:
event_key = None
# if the event's auth_events refers to events which are not in our
# calculated auth_events, we need to fetch those events from somewhere.
#
# we start by fetching them from the store, and then try calling /event_auth/.
# missing_auth is the set of the event's auth_events which we don't yet have
# in auth_events.
missing_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
# if we have missing events, we need to fetch those events from somewhere.
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
logger.debug("Got events %s from store", have_events)
logger.debug("Found events %s in the store", have_events)
missing_auth.difference_update(have_events.keys())
else:
have_events = {}
@@ -2169,15 +2176,17 @@ class FederationHandler(BaseHandler):
event.auth_event_ids()
)
except Exception:
# FIXME:
logger.exception("Failed to get auth chain")
if event.internal_metadata.is_outlier():
# XXX: given that, for an outlier, we'll be working with the
# event's *claimed* auth events rather than those we calculated:
# (a) is there any point in this test, since different_auth below will
# obviously be empty
# (b) alternatively, why don't we do it earlier?
logger.info("Skipping auth_event fetch for outlier")
return context
# FIXME: Assumes we have and stored all the state for all the
# prev_events
different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
@@ -2191,27 +2200,22 @@ class FederationHandler(BaseHandler):
different_auth,
)
# now we state-resolve between our own idea of the auth events, and the remote's
# idea of them.
room_version = yield self.store.get_room_version(event.room_id)
different_event_ids = [
d for d in different_auth if d in have_events and not have_events[d]
]
different_events = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.store.get_event, d, allow_none=True, allow_rejected=False
)
for d in different_auth
if d in have_events and not have_events[d]
],
consumeErrors=True,
)
).addErrback(unwrapFirstError)
if different_event_ids:
# XXX: currently this checks for redactions but I'm not convinced that is
# necessary?
different_events = yield self.store.get_events_as_list(different_event_ids)
if different_events:
local_view = dict(auth_events)
remote_view = dict(auth_events)
remote_view.update(
{(d.type, d.state_key): d for d in different_events if d}
)
remote_view.update({(d.type, d.state_key): d for d in different_events})
new_state = yield self.state_handler.resolve_events(
room_version,
@@ -2231,13 +2235,13 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
context = yield self._update_context_for_auth_events(
event, context, auth_events, event_key
event, context, auth_events
)
return context
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events, event_key):
def _update_context_for_auth_events(self, event, context, auth_events):
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.
@@ -2246,18 +2250,21 @@ class FederationHandler(BaseHandler):
context (synapse.events.snapshot.EventContext): initial event context
auth_events (dict[(str, str)->str]): Events to update in the event
auth_events (dict[(str, str)->EventBase]): Events to update in the event
context.
event_key ((str, str)): (type, state_key) for the current event.
this will not be included in the current_state in the context.
Returns:
Deferred[EventContext]: new event context
"""
# exclude the state key of the new event from the current_state in the context.
if event.is_state():
event_key = (event.type, event.state_key)
else:
event_key = None
state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
}
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids)
@@ -2459,7 +2466,7 @@ class FederationHandler(BaseHandler):
room_version, event_dict, event, context
)
EventValidator().validate_new(event)
EventValidator().validate_new(event, self.config)
# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
@@ -2574,7 +2581,7 @@ class FederationHandler(BaseHandler):
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
EventValidator().validate_new(event)
EventValidator().validate_new(event, self.config)
return (event, context)
@defer.inlineCallbacks
+3 -3
View File
@@ -138,7 +138,7 @@ class MessageHandler(object):
raise NotFoundError("Can't find event for token %s" % (at_token,))
visible_events = yield filter_events_for_client(
self.storage, user_id, last_events
self.storage, user_id, last_events, apply_retention_policies=False
)
event = last_events[0]
@@ -417,7 +417,7 @@ class EventCreationHandler(object):
403, "You must be in the room to create an alias for it"
)
self.validator.validate_new(event)
self.validator.validate_new(event, self.config)
return (event, context)
@@ -634,7 +634,7 @@ class EventCreationHandler(object):
if requester:
context.app_service = requester.app_service
self.validator.validate_new(event)
self.validator.validate_new(event, self.config)
# If this event is an annotation then we check that that the sender
# can't annotate the same way twice (e.g. stops users from liking an
+106
View File
@@ -15,12 +15,15 @@
# limitations under the License.
import logging
from six import iteritems
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
@@ -80,6 +83,109 @@ class PaginationHandler(object):
self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
self.clock.looping_call(
run_as_background_process,
job["interval"],
"purge_history_for_rooms_in_range",
self.purge_history_for_rooms_in_range,
job["shortest_max_lifetime"],
job["longest_max_lifetime"],
)
@defer.inlineCallbacks
def purge_history_for_rooms_in_range(self, min_ms, max_ms):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
'max_lifetime' is within this range, also targets rooms which don't have a
retention policy.
Args:
min_ms (int|None): Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, it means that the range has no
lower limit.
max_ms (int|None): Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, it means that the range has no
upper limit.
"""
# We want the storage layer to to include rooms with no retention policy in its
# return value only if a default retention policy is defined in the server's
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
# max_ms or higher than min_ms (or both).
if self._retention_default_max_lifetime is not None:
include_null = True
if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
# The default max_lifetime is lower than (or equal to) min_ms.
include_null = False
if max_ms is not None and max_ms < self._retention_default_max_lifetime:
# The default max_lifetime is higher than max_ms.
include_null = False
else:
include_null = False
rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
for room_id, retention_policy in iteritems(rooms):
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
max_lifetime = retention_policy["max_lifetime"]
if max_lifetime is None:
# If max_lifetime is None, it means that include_null equals True,
# therefore we can safely assume that there is a default policy defined
# in the server's configuration.
max_lifetime = self._retention_default_max_lifetime
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
r = yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
ts,
stream_ordering,
)
continue
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
purge_id = random_string(16)
self._purges_by_id[purge_id] = PurgeStatus()
logger.info(
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
"_purge_history", self._purge_history, purge_id, room_id, token, True,
)
def start_purge_history(self, room_id, token, delete_local_events=False):
"""Start off a history purge on a room.
+31 -5
View File
@@ -198,21 +198,21 @@ class RoomCreationHandler(BaseHandler):
# finally, shut down the PLs in the old room, and update them in the new
# room.
yield self._update_upgraded_room_pls(
requester, old_room_id, new_room_id, old_room_state
requester, old_room_id, new_room_id, old_room_state,
)
return new_room_id
@defer.inlineCallbacks
def _update_upgraded_room_pls(
self, requester, old_room_id, new_room_id, old_room_state
self, requester, old_room_id, new_room_id, old_room_state,
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (unicode): the id of the room to be replaced
new_room_id (unicode): the id of the replacement room
old_room_id (str): the id of the room to be replaced
new_room_id (str): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
Returns:
@@ -298,7 +298,7 @@ class RoomCreationHandler(BaseHandler):
tombstone_event_id (unicode|str): the ID of the tombstone event in the old
room.
Returns:
Deferred[None]
Deferred
"""
user_id = requester.user.to_string()
@@ -333,6 +333,7 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
(EventTypes.PowerLevels, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -346,6 +347,31 @@ class RoomCreationHandler(BaseHandler):
if old_event:
initial_state[k] = old_event.content
# Resolve the minimum power level required to send any state event
# We will give the upgrading user this power level temporarily (if necessary) such that
# they are able to copy all of the state events over, then revert them back to their
# original power level afterwards in _update_upgraded_room_pls
# Copy over user power levels now as this will not be possible with >100PL users once
# the room has been created
power_levels = initial_state[(EventTypes.PowerLevels, "")]
# Calculate the minimum power level needed to clone the room
event_power_levels = power_levels.get("events", {})
state_default = power_levels.get("state_default", 0)
ban = power_levels.get("ban")
needed_power_level = max(state_default, ban, max(event_power_levels.values()))
# Raise the requester's power level in the new room if necessary
current_power_level = power_levels["users"][requester.user.to_string()]
if current_power_level < needed_power_level:
# Assign this power level to the requester
power_levels["users"][requester.user.to_string()] = needed_power_level
# Set the power levels to the modified state
initial_state[(EventTypes.PowerLevels, "")] = power_levels
yield self._send_events_for_new_room(
requester,
new_room_id,
+9 -4
View File
@@ -94,7 +94,9 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
@@ -104,6 +106,7 @@ class RoomMemberHandler(object):
reject invite
room_id (str)
target (UserID): The user rejecting the invite
content (dict): The content for the rejection event
Returns:
Deferred[dict]: A dictionary to be returned to the client, may
@@ -471,7 +474,7 @@ class RoomMemberHandler(object):
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
res = yield self._remote_reject_invite(
requester, remote_room_hosts, room_id, target
requester, remote_room_hosts, room_id, target, content,
)
return res
@@ -971,13 +974,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
@defer.inlineCallbacks
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string()
remote_room_hosts, room_id, target.to_string(), content=content,
)
return ret
except Exception as e:
+4 -1
View File
@@ -55,7 +55,9 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
"""Implements RoomMemberHandler._remote_reject_invite
"""
return self._remote_reject_client(
@@ -63,6 +65,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
content=content,
)
def _user_joined_room(self, target, room_id):
+1 -1
View File
@@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False):
return {b"true": True, b"false": False}[args[name][0]]
except Exception:
message = (
"Boolean query parameter %r must be one of" " ['true', 'false']"
"Boolean query parameter %r must be one of ['true', 'false']"
) % (name,)
raise SynapseError(400, message)
else:
+13 -1
View File
@@ -261,6 +261,18 @@ def parse_drain_configs(
)
class StoppableLogPublisher(LogPublisher):
"""
A log publisher that can tell its observers to shut down any external
communications.
"""
def stop(self):
for obs in self._observers:
if hasattr(obs, "stop"):
obs.stop()
def setup_structured_logging(
hs,
config,
@@ -336,7 +348,7 @@ def setup_structured_logging(
# We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,))
publisher = LogPublisher(*observers)
publisher = StoppableLogPublisher(*observers)
log_filter = LogLevelFilterPredicate()
for namespace, namespace_config in log_config.get(
+77 -28
View File
@@ -17,25 +17,29 @@
Log formatters that output terse JSON.
"""
import json
import sys
import traceback
from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor
from typing import IO
from typing import IO, Optional
import attr
from simplejson import dumps
from zope.interface import implementer
from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
from twisted.logger import FileLogObserver, ILogObserver, Logger
from twisted.python.failure import Failure
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
def flatten_event(event: dict, metadata: dict, include_time: bool = False):
@@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
def formatEvent(_event: dict) -> str:
flattened = flatten_event(_event, metadata)
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
return _encoder.encode(flattened) + "\n"
return FileLogObserver(outFile, formatEvent)
@attr.s
@implementer(IPushProducer)
class LogProducer(object):
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
"""
transport = attr.ib(type=ITransport)
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
def pauseProducing(self):
self._paused = True
def stopProducing(self):
self._paused = True
self._buffer = None
def resumeProducing(self):
self._paused = False
while self._paused is False and (self._buffer and self.transport.connected):
try:
event = self._buffer.popleft()
self.transport.write(_encoder.encode(event).encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break
@attr.s
@implementer(ILogObserver)
class TerseJSONToTCPLogObserver(object):
@@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
metadata = attr.ib(type=dict)
maximum_buffer = attr.ib(type=int)
_buffer = attr.ib(default=attr.Factory(deque), type=deque)
_writer = attr.ib(default=None)
_connection_waiter = attr.ib(default=None, type=Optional[Deferred])
_logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None, type=Optional[LogProducer])
def start(self) -> None:
@@ -187,38 +230,44 @@ class TerseJSONToTCPLogObserver(object):
factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service.startService()
self._connect()
def _write_loop(self) -> None:
def stop(self):
self._service.stopService()
def _connect(self) -> None:
"""
Implement the write loop.
Triggers an attempt to connect then write to the remote if not already writing.
"""
if self._writer:
if self._connection_waiter:
return
self._writer = self._service.whenConnected()
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
@self._writer.addBoth
@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
@self._connection_waiter.addCallback
def writer(r):
if isinstance(r, Failure):
r.printTraceback(file=sys.__stderr__)
self._writer = None
self.hs.get_reactor().callLater(1, self._write_loop)
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
try:
for event in self._buffer:
r.transport.write(
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
"utf8"
)
)
r.transport.write(b"\n")
self._buffer.clear()
except Exception as e:
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
# If the producer is still producing, stop it.
if self._producer:
self._producer.stopProducing()
self._writer = False
self.hs.get_reactor().callLater(1, self._write_loop)
# Make a new producer and start it.
self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
def _handle_pressure(self) -> None:
"""
@@ -277,4 +326,4 @@ class TerseJSONToTCPLogObserver(object):
self._logger.failure("Failed clearing backpressure")
# Try and write immediately.
self._write_loop()
self._connect()
+2 -3
View File
@@ -246,7 +246,7 @@ class HttpPusher(object):
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warning(
"Giving up on a notification to user %s, " "pushkey %s",
"Giving up on a notification to user %s, pushkey %s",
self.user_id,
self.pushkey,
)
@@ -299,8 +299,7 @@ class HttpPusher(object):
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warning(
("Ignoring rejected pushkey %s because we" " didn't send it"),
pk,
("Ignoring rejected pushkey %s because we didn't send it"), pk,
)
else:
logger.info("Pushkey %s was rejected: removing", pk)
+2 -2
View File
@@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
MESSAGE_FROM_PERSON_IN_ROOM = (
"You have a message on %(app)s from %(person)s " "in the %(room)s room..."
"You have a message on %(app)s from %(person)s in the %(room)s room..."
)
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
"You have messages on %(app)s from %(person)s and others..."
)
INVITE_FROM_PERSON_TO_ROOM = (
"%(person)s has invited you to join the " "%(room)s room on %(app)s..."
"%(person)s has invited you to join the %(room)s room on %(app)s..."
)
INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
+9 -1
View File
@@ -14,7 +14,14 @@
# limitations under the License.
from synapse.http.server import JsonResource
from synapse.replication.http import federation, login, membership, register, send_event
from synapse.replication.http import (
devices,
federation,
login,
membership,
register,
send_event,
)
REPLICATION_PREFIX = "/_synapse/replication"
@@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
federation.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
+73
View File
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.replication.http._base import ReplicationEndpoint
logger = logging.getLogger(__name__)
class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.
This must happen on master so that the results can be correctly cached in
the database and streamed to workers.
Request format:
POST /_synapse/replication/user_device_resync/:user_id
{}
Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:
{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""
NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False
def __init__(self, hs):
super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
self.device_list_updater = hs.get_device_handler().device_list_updater
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(user_id):
return {}
async def _handle_request(self, request, user_id):
user_devices = await self.device_list_updater.user_device_resync(user_id)
return 200, user_devices
def register_servlets(hs, http_server):
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+5 -2
View File
@@ -93,6 +93,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
{
"requester": ...,
"remote_room_hosts": [...],
"content": { ... }
}
"""
@@ -107,7 +108,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
"""
Args:
requester(Requester)
@@ -118,12 +119,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"content": content,
}
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -134,7 +137,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
try:
event = await self.federation_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, user_id
remote_room_hosts, room_id, user_id, event_content,
)
ret = event.get_pdu_json()
except Exception as e:
+3 -4
View File
@@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream",
("user_id", "room_id", "data_type", "data"), # str # str # str # dict
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
@@ -421,8 +420,8 @@ class AccountDataStream(Stream):
results = list(room_results)
results.extend(
(stream_id, user_id, None, account_data_type, content)
for stream_id, user_id, account_data_type, content in global_results
(stream_id, user_id, None, account_data_type)
for stream_id, user_id, account_data_type in global_results
)
return results
+1 -1
View File
@@ -714,7 +714,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
target = UserID.from_string(content["user_id"])
event_content = None
if "reason" in content and membership_action in ["kick", "ban"]:
if "reason" in content:
event_content = {"reason": content["reason"]}
await self.room_member_handler.update_membership(
+5
View File
@@ -642,6 +642,7 @@ class ThreepidAddRestServlet(RestServlet):
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
@interactive_auth_handler
@defer.inlineCallbacks
def on_POST(self, request):
requester = yield self.auth.get_user_by_req(request)
@@ -652,6 +653,10 @@ class ThreepidAddRestServlet(RestServlet):
client_secret = body["client_secret"]
sid = body["sid"]
yield self.auth_handler.validate_user_via_ui_auth(
requester, body, self.hs.get_ip_from_request(request)
)
validation_session = yield self.identity_handler.validate_threepid_session(
client_secret, sid
)
+4 -4
View File
@@ -134,8 +134,8 @@ class RoomKeysServlet(RestServlet):
if room_id:
body = {"rooms": {room_id: body}}
yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
return 200, {}
ret = yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
return 200, ret
@defer.inlineCallbacks
def on_GET(self, request, room_id, session_id):
@@ -239,10 +239,10 @@ class RoomKeysServlet(RestServlet):
user_id = requester.user.to_string()
version = parse_string(request, "version")
yield self.e2e_room_keys_handler.delete_room_keys(
ret = yield self.e2e_room_keys_handler.delete_room_keys(
user_id, version, room_id, session_id
)
return 200, {}
return 200, ret
class RoomKeysNewVersionServlet(RestServlet):
@@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource):
pattern = entry[attrib]
value = getattr(url_tuple, attrib)
logger.debug(
"Matching attrib '%s' with value '%s' against" " pattern '%s'",
"Matching attrib '%s' with value '%s' against pattern '%s'",
attrib,
value,
pattern,
+4 -1
View File
@@ -129,5 +129,8 @@ class Thumbnailer(object):
def _encode_image(self, output_image, output_type):
output_bytes_io = BytesIO()
output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80)
fmt = self.FORMATS[output_type]
if fmt == "JPEG":
output_image = output_image.convert("RGB")
output_image.save(output_bytes_io, fmt, quality=80)
return output_bytes_io
@@ -54,7 +54,7 @@ class ConsentServerNotices(object):
)
if "body" not in self._server_notice_content:
raise ConfigError(
"user_consent server_notice_consent must contain a 'body' " "key."
"user_consent server_notice_consent must contain a 'body' key."
)
self._consent_uri_builder = ConsentURIBuilder(hs.config)
+43 -10
View File
@@ -409,16 +409,15 @@ class SQLBaseStore(object):
i = 0
N = 5
while True:
cursor = LoggingTransaction(
conn.cursor(),
name,
self.database_engine,
after_callbacks,
exception_callbacks,
)
try:
txn = conn.cursor()
txn = LoggingTransaction(
txn,
name,
self.database_engine,
after_callbacks,
exception_callbacks,
)
r = func(txn, *args, **kwargs)
r = func(cursor, *args, **kwargs)
conn.commit()
return r
except self.database_engine.module.OperationalError as e:
@@ -456,6 +455,40 @@ class SQLBaseStore(object):
)
continue
raise
finally:
# we're either about to retry with a new cursor, or we're about to
# release the connection. Once we release the connection, it could
# get used for another query, which might do a conn.rollback().
#
# In the latter case, even though that probably wouldn't affect the
# results of this transaction, python's sqlite will reset all
# statements on the connection [1], which will make our cursor
# invalid [2].
#
# In any case, continuing to read rows after commit()ing seems
# dubious from the PoV of ACID transactional semantics
# (sqlite explicitly says that once you commit, you may see rows
# from subsequent updates.)
#
# In psycopg2, cursors are essentially a client-side fabrication -
# all the data is transferred to the client side when the statement
# finishes executing - so in theory we could go on streaming results
# from the cursor, but attempting to do so would make us
# incompatible with sqlite, so let's make sure we're not doing that
# by closing the cursor.
#
# (*named* cursors in psycopg2 are different and are proper server-
# side things, but (a) we don't use them and (b) they are implicitly
# closed by ending the transaction anyway.)
#
# In short, if we haven't finished with the cursor yet, that's a
# problem waiting to bite us.
#
# TL;DR: we're done with the cursor, so we can close it.
#
# [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
# [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
cursor.close()
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
@@ -851,7 +884,7 @@ class SQLBaseStore(object):
allvalues.update(values)
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
@@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
current_id(int): The position to fetch up to.
Returns:
A deferred pair of lists of tuples of stream_id int, user_id string,
room_id string, type string, and content string.
room_id string, and type string.
"""
if last_room_id == current_id and last_global_id == current_id:
return defer.succeed(([], []))
def get_updated_account_data_txn(txn):
sql = (
"SELECT stream_id, user_id, account_data_type, content"
"SELECT stream_id, user_id, account_data_type"
" FROM account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
@@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
global_results = txn.fetchall()
sql = (
"SELECT stream_id, user_id, room_id, account_data_type, content"
"SELECT stream_id, user_id, room_id, account_data_type"
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
@@ -380,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
sql = "SELECT device_id FROM devices WHERE user_id = ?"
txn.execute(sql, (user_id,))
message_json = json.dumps(messages_by_device["*"])
for row in txn:
+160 -66
View File
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
class EndToEndRoomKeyStore(SQLBaseStore):
@defer.inlineCallbacks
def get_e2e_room_key(self, user_id, version, room_id, session_id):
"""Get the encrypted E2E room key for a given session from a given
backup version of room_keys. We only store the 'best' room key for a given
session at a given time, as determined by the handler.
Args:
user_id(str): the user whose backup we're querying
version(str): the version ID of the backup for the set of keys we're querying
room_id(str): the ID of the room whose keys we're querying.
This is a bit redundant as it's implied by the session_id, but
we include for consistency with the rest of the API.
session_id(str): the session whose room_key we're querying.
Returns:
A deferred dict giving the session_data and message metadata for
this room key.
"""
row = yield self._simple_select_one(
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
"version": version,
"room_id": room_id,
"session_id": session_id,
},
retcols=(
"first_message_index",
"forwarded_count",
"is_verified",
"session_data",
),
desc="get_e2e_room_key",
)
row["session_data"] = json.loads(row["session_data"])
return row
@defer.inlineCallbacks
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
"""Replaces or inserts the encrypted E2E room key for a given session in
a given backup
def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
"""Replaces the encrypted E2E room key for a given session in a given backup
Args:
user_id(str): the user whose backup we're setting
@@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
StoreError
"""
yield self._simple_upsert(
yield self._simple_update_one(
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
@@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"room_id": room_id,
"session_id": session_id,
},
values={
updatevalues={
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json.dumps(room_key["session_data"]),
},
lock=False,
desc="update_e2e_room_key",
)
log_kv(
{
"message": "Set room key",
"room_id": room_id,
"session_id": session_id,
"room_key": room_key,
}
@defer.inlineCallbacks
def add_e2e_room_keys(self, user_id, version, room_keys):
"""Bulk add room keys to a given backup.
Args:
user_id (str): the user whose backup we're adding to
version (str): the version ID of the backup for the set of keys we're adding to
room_keys (iterable[(str, str, dict)]): the keys to add, in the form
(roomID, sessionID, keyData)
"""
values = []
for (room_id, session_id, room_key) in room_keys:
values.append(
{
"user_id": user_id,
"version": version,
"room_id": room_id,
"session_id": session_id,
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json.dumps(room_key["session_data"]),
}
)
log_kv(
{
"message": "Set room key",
"room_id": room_id,
"session_id": session_id,
"room_key": room_key,
}
)
yield self._simple_insert_many(
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
)
@trace
@@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
room, or a given session.
Args:
user_id(str): the user whose backup we're querying
version(str): the version ID of the backup for the set of keys we're querying
room_id(str): Optional. the ID of the room whose keys we're querying, if any.
user_id (str): the user whose backup we're querying
version (str): the version ID of the backup for the set of keys we're querying
room_id (str): Optional. the ID of the room whose keys we're querying, if any.
If not specified, we return the keys for all the rooms in the backup.
session_id(str): Optional. the session whose room_key we're querying, if any.
session_id (str): Optional. the session whose room_key we're querying, if any.
If specified, we also require the room_id to be specified.
If not specified, we return all the keys in this version of
the backup (or for the specified room)
@@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
def get_e2e_room_keys_multi(self, user_id, version, room_keys):
"""Get multiple room keys at a time. The difference between this function and
get_e2e_room_keys is that this function can be used to retrieve
multiple specific keys at a time, whereas get_e2e_room_keys is used for
getting all the keys in a backup version, all the keys for a room, or a
specific key.
Args:
user_id (str): the user whose backup we're querying
version (str): the version ID of the backup we're querying about
room_keys (dict[str, dict[str, iterable[str]]]): a map from
room ID -> {"session": [session ids]} indicating the session IDs
that we want to query
Returns:
Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
"""
return self.runInteraction(
"get_e2e_room_keys_multi",
self._get_e2e_room_keys_multi_txn,
user_id,
version,
room_keys,
)
@staticmethod
def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
if not room_keys:
return {}
where_clauses = []
params = [user_id, version]
for room_id, room in room_keys.items():
sessions = list(room["sessions"])
if not sessions:
continue
params.append(room_id)
params.extend(sessions)
where_clauses.append(
"(room_id = ? AND session_id IN (%s))"
% (",".join(["?" for _ in sessions]),)
)
# check if we're actually querying something
if not where_clauses:
return {}
sql = """
SELECT room_id, session_id, first_message_index, forwarded_count,
is_verified, session_data
FROM e2e_room_keys
WHERE user_id = ? AND version = ? AND (%s)
""" % (
" OR ".join(where_clauses)
)
txn.execute(sql, params)
ret = {}
for row in txn:
room_id = row[0]
session_id = row[1]
ret.setdefault(room_id, {})
ret[room_id][session_id] = {
"first_message_index": row[2],
"forwarded_count": row[3],
"is_verified": row[4],
"session_data": json.loads(row[5]),
}
return ret
def count_e2e_room_keys(self, user_id, version):
"""Get the number of keys in a backup version.
Args:
user_id (str): the user whose backup we're querying
version (str): the version ID of the backup we're querying about
"""
return self._simple_select_one_onecol(
table="e2e_room_keys",
keyvalues={"user_id": user_id, "version": version},
retcol="COUNT(*)",
desc="count_e2e_room_keys",
)
@trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
version(str)
algorithm(str)
auth_data(object): opaque dict supplied by the client
etag(int): tag of the keys in the backup
"""
def _get_e2e_room_keys_version_info_txn(txn):
@@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
txn,
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
retcols=("version", "algorithm", "auth_data"),
retcols=("version", "algorithm", "auth_data", "etag"),
)
result["auth_data"] = json.loads(result["auth_data"])
result["version"] = str(result["version"])
if result["etag"] is None:
result["etag"] = 0
return result
return self.runInteraction(
@@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
@trace
def update_e2e_room_keys_version(self, user_id, version, info):
def update_e2e_room_keys_version(
self, user_id, version, info=None, version_etag=None
):
"""Update a given backup version
Args:
user_id(str): the user whose backup version we're updating
version(str): the version ID of the backup version we're updating
info(dict): the new backup version info to store
info (dict): the new backup version info to store. If None, then
the backup version info is not updated
version_etag (Optional[int]): etag of the keys in the backup. If
None, then the etag is not updated
"""
updatevalues = {}
return self._simple_update(
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": version},
updatevalues={"auth_data": json.dumps(info["auth_data"])},
desc="update_e2e_room_keys_version",
)
if info is not None and "auth_data" in info:
updatevalues["auth_data"] = json.dumps(info["auth_data"])
if version_etag is not None:
updatevalues["etag"] = version_etag
if updatevalues:
return self._simple_update(
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": version},
updatevalues=updatevalues,
desc="update_e2e_room_keys_version",
)
@trace
def delete_e2e_room_keys_version(self, user_id, version=None):
@@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
result.setdefault(user_id, {})[device_id] = None
# get signatures on the device
signature_sql = (
"SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s"
) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
" OR ".join("(" + q + ")" for q in signature_query_clauses)
)
txn.execute(signature_sql, signature_query_params)
rows = self.cursor_to_dict(txn)
+6 -5
View File
@@ -713,9 +713,7 @@ class EventsStore(
metadata_json = encode_json(event.internal_metadata.get_dict())
sql = (
"UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
)
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the
@@ -732,7 +730,7 @@ class EventsStore(
},
)
sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this
@@ -929,6 +927,9 @@ class EventsStore(
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
elif event.type == EventTypes.Retention:
# Update the room_retention table.
self._store_retention_policy_for_room_txn(txn, event)
self._handle_event_relations(txn, event)
@@ -1479,7 +1480,7 @@ class EventsStore(
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall()
@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
if filter_id_response is not None:
return filter_id_response[0]
sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_localpart,))
max_id = txn.fetchone()[0]
if max_id is None:
@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
if len(media_ids) == 0:
return
sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
return
def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
+1 -1
View File
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
args.append(limit)
txn.execute(sql, args)
return (r[0:5] + (json.loads(r[5]),) for r in txn)
return list(r[0:5] + (json.loads(r[5]),) for r in txn)
return self.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn
@@ -19,7 +19,6 @@ import logging
import re
from six import iterkeys
from six.moves import range
from twisted.internet import defer
from twisted.internet.defer import Deferred
@@ -377,9 +376,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
def f(txn):
sql = (
"SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
)
sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
txn.execute(sql, (user_id,))
return dict(txn)
@@ -484,12 +481,8 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
Gets the localpart of the next generated user ID.
Generated user IDs are integers, and we aim for them to be as small as
we can. Unfortunately, it's possible some of them are already taken by
existing users, and there may be gaps in the already taken range. This
function returns the start of the first allocatable gap. This is to
avoid the case of ID 1000 being pre-allocated and starting at 1001 while
0-999 are available.
Generated user IDs are integers, so we find the largest integer user ID
already taken and return that plus one.
"""
def _find_next_generated_user_id(txn):
@@ -499,15 +492,14 @@ class RegistrationWorkerStore(SQLBaseStore):
regex = re.compile(r"^@(\d+):")
found = set()
max_found = 0
for (user_id,) in txn:
match = regex.search(user_id)
if match:
found.add(int(match.group(1)))
for i in range(len(found) + 1):
if i not in found:
return i
max_found = max(int(match.group(1)), max_found)
return max_found + 1
return (
(
+251
View File
@@ -19,10 +19,13 @@ import logging
import re
from typing import Optional, Tuple
from six import integer_types
from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.search import SearchStore
@@ -300,8 +303,141 @@ class RoomWorkerStore(SQLBaseStore):
else:
return None
@cachedInlineCallbacks()
def get_retention_policy_for_room(self, room_id):
"""Get the retention policy for a given room.
If no retention policy has been found for this room, returns a policy defined
by the configured default policy (which has None as both the 'min_lifetime' and
the 'max_lifetime' if no default policy has been defined in the server's
configuration).
Args:
room_id (str): The ID of the room to get the retention policy of.
Returns:
dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
"""
def get_retention_policy_for_room_txn(txn):
txn.execute(
"""
SELECT min_lifetime, max_lifetime FROM room_retention
INNER JOIN current_state_events USING (event_id, room_id)
WHERE room_id = ?;
""",
(room_id,),
)
return self.cursor_to_dict(txn)
ret = yield self.runInteraction(
"get_retention_policy_for_room", get_retention_policy_for_room_txn,
)
# If we don't know this room ID, ret will be None, in this case return the default
# policy.
if not ret:
defer.returnValue(
{
"min_lifetime": self.config.retention_default_min_lifetime,
"max_lifetime": self.config.retention_default_max_lifetime,
}
)
row = ret[0]
# If one of the room's policy's attributes isn't defined, use the matching
# attribute from the default policy.
# The default values will be None if no default policy has been defined, or if one
# of the attributes is missing from the default policy.
if row["min_lifetime"] is None:
row["min_lifetime"] = self.config.retention_default_min_lifetime
if row["max_lifetime"] is None:
row["max_lifetime"] = self.config.retention_default_max_lifetime
defer.returnValue(row)
class RoomStore(RoomWorkerStore, SearchStore):
def __init__(self, db_conn, hs):
super(RoomStore, self).__init__(db_conn, hs)
self.config = hs.config
self.register_background_update_handler(
"insert_room_retention", self._background_insert_retention,
)
@defer.inlineCallbacks
def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
them into the room_retention table.
NULLs the property's columns if missing from the retention event in the room's
state (or NULLs all of them if there's no retention event in the room's state),
so that we fall back to the server's retention policy.
"""
last_room = progress.get("room_id", "")
def _background_insert_retention_txn(txn):
txn.execute(
"""
SELECT state.room_id, state.event_id, events.json
FROM current_state_events as state
LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
WHERE state.room_id > ? AND state.type = '%s'
ORDER BY state.room_id ASC
LIMIT ?;
"""
% EventTypes.Retention,
(last_room, batch_size),
)
rows = self.cursor_to_dict(txn)
if not rows:
return True
for row in rows:
if not row["json"]:
retention_policy = {}
else:
ev = json.loads(row["json"])
retention_policy = json.dumps(ev["content"])
self._simple_insert_txn(
txn=txn,
table="room_retention",
values={
"room_id": row["room_id"],
"event_id": row["event_id"],
"min_lifetime": retention_policy.get("min_lifetime"),
"max_lifetime": retention_policy.get("max_lifetime"),
},
)
logger.info("Inserted %d rows into room_retention", len(rows))
self._background_update_progress_txn(
txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
)
if batch_size > len(rows):
return True
else:
return False
end = yield self.runInteraction(
"insert_room_retention", _background_insert_retention_txn,
)
if end:
yield self._end_background_update("insert_room_retention")
defer.returnValue(batch_size)
@defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public):
"""Stores a room.
@@ -502,6 +638,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
txn, event, "content.body", event.content["body"]
)
def _store_retention_policy_for_room_txn(self, txn, event):
if hasattr(event, "content") and (
"min_lifetime" in event.content or "max_lifetime" in event.content
):
if (
"min_lifetime" in event.content
and not isinstance(event.content.get("min_lifetime"), integer_types)
) or (
"max_lifetime" in event.content
and not isinstance(event.content.get("max_lifetime"), integer_types)
):
# Ignore the event if one of the value isn't an integer.
return
self._simple_insert_txn(
txn=txn,
table="room_retention",
values={
"room_id": event.room_id,
"event_id": event.event_id,
"min_lifetime": event.content.get("min_lifetime"),
"max_lifetime": event.content.get("max_lifetime"),
},
)
self._invalidate_cache_and_stream(
txn, self.get_retention_policy_for_room, (event.room_id,)
)
def add_event_report(
self, room_id, event_id, user_id, reason, content, received_ts
):
@@ -683,3 +848,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
remote_media_mxcs.append((hostname, media_id))
return local_media_mxcs, remote_media_mxcs
@defer.inlineCallbacks
def get_rooms_for_retention_period_in_range(
self, min_ms, max_ms, include_null=False
):
"""Retrieves all of the rooms within the given retention range.
Optionally includes the rooms which don't have a retention policy.
Args:
min_ms (int|None): Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, doesn't set a lower limit.
max_ms (int|None): Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, doesn't set an upper limit.
include_null (bool): Whether to include rooms which retention policy is NULL
in the returned set.
Returns:
dict[str, dict]: The rooms within this range, along with their retention
policy. The key is "room_id", and maps to a dict describing the retention
policy associated with this room ID. The keys for this nested dict are
"min_lifetime" (int|None), and "max_lifetime" (int|None).
"""
def get_rooms_for_retention_period_in_range_txn(txn):
range_conditions = []
args = []
if min_ms is not None:
range_conditions.append("max_lifetime > ?")
args.append(min_ms)
if max_ms is not None:
range_conditions.append("max_lifetime <= ?")
args.append(max_ms)
# Do a first query which will retrieve the rooms that have a retention policy
# in their current state.
sql = """
SELECT room_id, min_lifetime, max_lifetime FROM room_retention
INNER JOIN current_state_events USING (event_id, room_id)
"""
if len(range_conditions):
sql += " WHERE (" + " AND ".join(range_conditions) + ")"
if include_null:
sql += " OR max_lifetime IS NULL"
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
rooms_dict = {}
for row in rows:
rooms_dict[row["room_id"]] = {
"min_lifetime": row["min_lifetime"],
"max_lifetime": row["max_lifetime"],
}
if include_null:
# If required, do a second query that retrieves all of the rooms we know
# of so we can handle rooms with no retention policy.
sql = "SELECT DISTINCT room_id FROM current_state_events"
txn.execute(sql)
rows = self.cursor_to_dict(txn)
# If a room isn't already in the dict (i.e. it doesn't have a retention
# policy in its state), add it with a null policy.
for row in rows:
if row["room_id"] not in rooms_dict:
rooms_dict[row["room_id"]] = {
"min_lifetime": None,
"max_lifetime": None,
}
return rooms_dict
rooms = yield self.runInteraction(
"get_rooms_for_retention_period_in_range",
get_rooms_for_retention_period_in_range_txn,
)
defer.returnValue(rooms)
@@ -0,0 +1,17 @@
/* Copyright 2019 Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- store the current etag of backup version
ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;
@@ -0,0 +1,33 @@
/* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Tracks the retention policy of a room.
-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
-- the room's retention policy state event.
-- If a room doesn't have a retention policy state event in its state, both max_lifetime
-- and min_lifetime are NULL.
CREATE TABLE IF NOT EXISTS room_retention(
room_id TEXT,
event_id TEXT,
min_lifetime BIGINT,
max_lifetime BIGINT,
PRIMARY KEY(room_id, event_id)
);
CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
INSERT INTO background_updates (update_name, progress_json) VALUES
('insert_room_retention', '{}');
+1 -1
View File
@@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def _get_max_topological_txn(self, txn, room_id):
txn.execute(
"SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
"SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
(room_id,),
)
+1 -3
View File
@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
)
def get_tag_content(txn, tag_ids):
sql = (
"SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
)
sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
results = []
for stream_id, user_id, room_id in tag_ids:
txn.execute(sql, (user_id, room_id))
+1 -1
View File
@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
# Mark as done.
cur.execute(
database_engine.convert_param_style(
"INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
"INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
),
(modname, name),
)
+6 -3
View File
@@ -88,9 +88,12 @@ class PaginationConfig(object):
raise SynapseError(400, "Invalid request.")
def __repr__(self):
return (
"PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
) % (self.from_token, self.to_token, self.direction, self.limit)
return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % (
self.from_token,
self.to_token,
self.direction,
self.limit,
)
def get_source_config(self, source_name):
keyname = "%s_key" % source_name
+31 -1
View File
@@ -44,7 +44,12 @@ MEMBERSHIP_PRIORITY = (
@defer.inlineCallbacks
def filter_events_for_client(
storage: Storage, user_id, events, is_peeking=False, always_include_ids=frozenset()
storage: Storage,
user_id,
events,
is_peeking=False,
always_include_ids=frozenset(),
apply_retention_policies=True,
):
"""
Check which events a user is allowed to see
@@ -59,6 +64,10 @@ def filter_events_for_client(
events
always_include_ids (set(event_id)): set of event ids to specifically
include (unless sender is ignored)
apply_retention_policies (bool): Whether to filter out events that's older than
allowed by the room's retention policy. Useful when this function is called
to e.g. check whether a user should be allowed to see the state at a given
event rather than to know if it should send an event to a user's client(s).
Returns:
Deferred[list[synapse.events.EventBase]]
@@ -86,6 +95,15 @@ def filter_events_for_client(
erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
if apply_retention_policies:
room_ids = set(e.room_id for e in events)
retention_policies = {}
for room_id in room_ids:
retention_policies[
room_id
] = yield storage.main.get_retention_policy_for_room(room_id)
def allowed(event):
"""
Args:
@@ -103,6 +121,18 @@ def filter_events_for_client(
if not event.is_state() and event.sender in ignore_list:
return None
# Don't try to apply the room's retention policy if the event is a state event, as
# MSC1763 states that retention is only considered for non-state events.
if apply_retention_policies and not event.is_state():
retention_policy = retention_policies[event.room_id]
max_lifetime = retention_policy.get("max_lifetime")
if max_lifetime is not None:
oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
if event.origin_server_ts < oldest_allowed_ts:
return None
if event.event_id in always_include_ids:
return event
+72
View File
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from twisted.internet import epollreactor
from twisted.internet.main import installReactor
from synapse.config.homeserver import HomeServerConfig
from synapse.util import Clock
from tests.utils import default_config, setup_test_homeserver
async def make_homeserver(reactor, config=None):
"""
Make a Homeserver suitable for running benchmarks against.
Args:
reactor: A Twisted reactor to run under.
config: A HomeServerConfig to use, or None.
"""
cleanup_tasks = []
clock = Clock(reactor)
if not config:
config = default_config("test")
config_obj = HomeServerConfig()
config_obj.parse_config_dict(config, "", "")
hs = await setup_test_homeserver(
cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock
)
stor = hs.get_datastore()
# Run the database background updates.
if hasattr(stor, "do_next_background_update"):
while not await stor.has_completed_background_updates():
await stor.do_next_background_update(1)
def cleanup():
for i in cleanup_tasks:
i()
return hs, clock.sleep, cleanup
def make_reactor():
"""
Instantiate and install a Twisted reactor suitable for testing (i.e. not the
default global one).
"""
reactor = epollreactor.EPollReactor()
if "twisted.internet.reactor" in sys.modules:
del sys.modules["twisted.internet.reactor"]
installReactor(reactor)
return reactor
+90
View File
@@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from contextlib import redirect_stderr
from io import StringIO
import pyperf
from synmark import make_reactor
from synmark.suites import SUITES
from twisted.internet.defer import ensureDeferred
from twisted.logger import globalLogBeginner, textFileLogObserver
from twisted.python.failure import Failure
from tests.utils import setupdb
def make_test(main):
"""
Take a benchmark function and wrap it in a reactor start and stop.
"""
def _main(loops):
reactor = make_reactor()
file_out = StringIO()
with redirect_stderr(file_out):
d = ensureDeferred(main(reactor, loops))
def on_done(_):
if isinstance(_, Failure):
_.printTraceback()
print(file_out.getvalue())
reactor.stop()
return _
d.addBoth(on_done)
reactor.run()
return d.result
return _main
if __name__ == "__main__":
def add_cmdline_args(cmd, args):
if args.log:
cmd.extend(["--log"])
runner = pyperf.Runner(
processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args
)
runner.argparser.add_argument("--log", action="store_true")
runner.parse_args()
orig_loops = runner.args.loops
runner.args.inherit_environ = ["SYNAPSE_POSTGRES"]
if runner.args.worker:
if runner.args.log:
globalLogBeginner.beginLoggingTo(
[textFileLogObserver(sys.__stdout__)], redirectStandardIO=False
)
setupdb()
for suite, loops in SUITES:
if loops:
runner.args.loops = loops
else:
runner.args.loops = orig_loops
loops = "auto"
runner.bench_time_func(
suite.__name__ + "_" + str(loops), make_test(suite.main),
)
+3
View File
@@ -0,0 +1,3 @@
from . import logging
SUITES = [(logging, 1000), (logging, 10000), (logging, None)]
+118
View File
@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
from io import StringIO
from mock import Mock
from pyperf import perf_counter
from synmark import make_homeserver
from twisted.internet.defer import Deferred
from twisted.internet.protocol import ServerFactory
from twisted.logger import LogBeginner, Logger, LogPublisher
from twisted.protocols.basic import LineOnlyReceiver
from synapse.logging._structured import setup_structured_logging
class LineCounter(LineOnlyReceiver):
delimiter = b"\n"
def __init__(self, *args, **kwargs):
self.count = 0
super().__init__(*args, **kwargs)
def lineReceived(self, line):
self.count += 1
if self.count >= self.factory.wait_for and self.factory.on_done:
on_done = self.factory.on_done
self.factory.on_done = None
on_done.callback(True)
async def main(reactor, loops):
"""
Benchmark how long it takes to send `loops` messages.
"""
servers = []
def protocol():
p = LineCounter()
servers.append(p)
return p
logger_factory = ServerFactory.forProtocol(protocol)
logger_factory.wait_for = loops
logger_factory.on_done = Deferred()
port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1")
hs, wait, cleanup = await make_homeserver(reactor)
errors = StringIO()
publisher = LogPublisher()
mock_sys = Mock()
beginner = LogBeginner(
publisher, errors, mock_sys, warnings, initialBufferSize=loops
)
log_config = {
"loggers": {"synapse": {"level": "DEBUG"}},
"drains": {
"tersejson": {
"type": "network_json_terse",
"host": "127.0.0.1",
"port": port.getHost().port,
"maximum_buffer": 100,
}
},
}
logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher)
logging_system = setup_structured_logging(
hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False
)
# Wait for it to connect...
await logging_system._observers[0]._service.whenConnected()
start = perf_counter()
# Send a bunch of useful messages
for i in range(0, loops):
logger.info("test message %s" % (i,))
if (
len(logging_system._observers[0]._buffer)
== logging_system._observers[0].maximum_buffer
):
while (
len(logging_system._observers[0]._buffer)
> logging_system._observers[0].maximum_buffer / 2
):
await wait(0.01)
await logger_factory.on_done
end = perf_counter() - start
logging_system.stop()
port.stopListening()
cleanup()
return end
+5 -1
View File
@@ -1,6 +1,6 @@
# This file serves as a blacklist for SyTest tests that we expect will fail in
# Synapse.
#
#
# Each line of this file is scanned by sytest during a run and if the line
# exactly matches the name of a test, it will be marked as "expected fail",
# meaning the test will still run, but failure will not mark the entire test
@@ -29,3 +29,7 @@ Enabling an unknown default rule fails with 404
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1663
New federated private chats get full presence information (SYN-115)
# Blacklisted due to https://github.com/matrix-org/matrix-doc/pull/2314 removing
# this requirement from the spec
Inbound federation of state requires event_id as a mandatory paramater
+3 -25
View File
@@ -18,17 +18,14 @@ from mock import Mock
from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.types import UserID
from synapse.util.ratelimitutils import FederationRateLimiter
from tests import unittest
class RoomComplexityTests(unittest.HomeserverTestCase):
class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
@@ -41,25 +38,6 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
config["limit_remote_rooms"] = {"enabled": True, "complexity": 0.05}
return config
def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return defer.succeed("otherserver.nottld")
ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)
def test_complexity_simple(self):
u1 = self.register_user("u1", "pass")
@@ -105,7 +83,7 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
"roomid",
UserID.from_string(u1),
{"membership": "join"},
@@ -146,7 +124,7 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
room_1,
UserID.from_string(u1),
{"membership": "join"},
+3 -1
View File
@@ -19,7 +19,7 @@ from twisted.internet import defer
from synapse.types import ReadReceipt
from tests.unittest import HomeserverTestCase
from tests.unittest import HomeserverTestCase, override_config
class FederationSenderTestCases(HomeserverTestCase):
@@ -29,6 +29,7 @@ class FederationSenderTestCases(HomeserverTestCase):
federation_transport_client=Mock(spec=["send_transaction"]),
)
@override_config({"send_federation": True})
def test_send_receipts(self):
mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
@@ -69,6 +70,7 @@ class FederationSenderTestCases(HomeserverTestCase):
],
)
@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,6 +17,8 @@ import logging
from synapse.events import FrozenEvent
from synapse.federation.federation_server import server_matches_acl_event
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
@@ -41,6 +44,66 @@ class ServerACLsTestCase(unittest.TestCase):
self.assertTrue(server_matches_acl_event("1:2:3:4", e))
class StateQueryTests(unittest.FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def test_without_event_id(self):
"""
Querying v1/state/<room_id> without an event ID will return the current
known state.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.inject_room_member(room_1, "@user:other.example.com", "join")
request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertEqual(
channel.json_body["room_version"],
self.hs.config.default_room_version.identifier,
)
members = set(
map(
lambda x: x["state_key"],
filter(
lambda x: x["type"] == "m.room.member", channel.json_body["pdus"]
),
)
)
self.assertEqual(members, set(["@user:other.example.com", u1]))
self.assertEqual(len(channel.json_body["pdus"]), 6)
def test_needs_to_be_in_room(self):
"""
Querying v1/state/<room_id> requires the server
be in the room to provide data.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(403, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
def _create_acl_event(content):
return FrozenEvent(
{
+31
View File
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2017 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -94,23 +95,29 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user)
version_etag = res["etag"]
del res["etag"]
self.assertDictEqual(
res,
{
"version": "1",
"algorithm": "m.megolm_backup.v1",
"auth_data": "first_version_auth_data",
"count": 0,
},
)
# check we can retrieve it as a specific version
res = yield self.handler.get_version_info(self.local_user, "1")
self.assertEqual(res["etag"], version_etag)
del res["etag"]
self.assertDictEqual(
res,
{
"version": "1",
"algorithm": "m.megolm_backup.v1",
"auth_data": "first_version_auth_data",
"count": 0,
},
)
@@ -126,12 +133,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user)
del res["etag"]
self.assertDictEqual(
res,
{
"version": "2",
"algorithm": "m.megolm_backup.v1",
"auth_data": "second_version_auth_data",
"count": 0,
},
)
@@ -158,12 +167,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user)
del res["etag"]
self.assertDictEqual(
res,
{
"algorithm": "m.megolm_backup.v1",
"auth_data": "revised_first_version_auth_data",
"version": version,
"count": 0,
},
)
@@ -207,12 +218,14 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
# check we can retrieve it as the current version
res = yield self.handler.get_version_info(self.local_user)
del res["etag"] # etag is opaque, so don't test its contents
self.assertDictEqual(
res,
{
"algorithm": "m.megolm_backup.v1",
"auth_data": "revised_first_version_auth_data",
"version": version,
"count": 0,
},
)
@@ -409,6 +422,11 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
yield self.handler.upload_room_keys(self.local_user, version, room_keys)
# get the etag to compare to future versions
res = yield self.handler.get_version_info(self.local_user)
backup_etag = res["etag"]
self.assertEqual(res["count"], 1)
new_room_keys = copy.deepcopy(room_keys)
new_room_key = new_room_keys["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]
@@ -423,6 +441,10 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
"SSBBTSBBIEZJU0gK",
)
# the etag should be the same since the session did not change
res = yield self.handler.get_version_info(self.local_user)
self.assertEqual(res["etag"], backup_etag)
# test that marking the session as verified however /does/ replace it
new_room_key["is_verified"] = True
yield self.handler.upload_room_keys(self.local_user, version, new_room_keys)
@@ -432,6 +454,11 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new"
)
# the etag should NOT be equal now, since the key changed
res = yield self.handler.get_version_info(self.local_user)
self.assertNotEqual(res["etag"], backup_etag)
backup_etag = res["etag"]
# test that a session with a higher forwarded_count doesn't replace one
# with a lower forwarding count
new_room_key["forwarded_count"] = 2
@@ -443,6 +470,10 @@ class E2eRoomKeysHandlerTestCase(unittest.TestCase):
res["rooms"]["!abc:matrix.org"]["sessions"]["c0ff33"]["session_data"], "new"
)
# the etag should be the same since the session did not change
res = yield self.handler.get_version_info(self.local_user)
self.assertEqual(res["etag"], backup_etag)
# TODO: check edge cases as well as the common variations here
@defer.inlineCallbacks
+3
View File
@@ -24,6 +24,7 @@ from synapse.api.errors import AuthError
from synapse.types import UserID
from tests import unittest
from tests.unittest import override_config
from tests.utils import register_federation_servlets
# Some local users to test with
@@ -174,6 +175,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
],
)
@override_config({"send_federation": True})
def test_started_typing_remote_send(self):
self.room_members = [U_APPLE, U_ONION]
@@ -237,6 +239,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
],
)
@override_config({"send_federation": True})
def test_stopped_typing(self):
self.room_members = [U_APPLE, U_BANANA, U_ONION]
+3
View File
@@ -48,7 +48,10 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = server_factory.streamer
handler_factory = Mock()
self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler.factory = handler_factory
client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
+4
View File
@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from synapse.replication.tcp.commands import ReplicateCommand
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -30,7 +32,9 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
server = server_factory.buildProtocol(None)
# build a replication client, with a dummy handler
handler_factory = Mock()
self.test_handler = TestReplicationClientHandler()
self.test_handler.factory = handler_factory
self.client = ClientReplicationStreamProtocol(
"client", "test", clock, self.test_handler
)

Some files were not shown because too many files have changed in this diff Show More