1
0

Compare commits

..

2 Commits

Author SHA1 Message Date
Neil Johnson
4d1a8718b5 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/update_limits_error_codes 2018-08-15 16:28:30 +01:00
Neil Johnson
fdb612c1dd fix typo 2018-08-15 12:05:51 +01:00
188 changed files with 1330 additions and 3674 deletions

View File

@@ -35,6 +35,10 @@ matrix:
- python: 3.6
env: TOX_ENV=check-newsfragment
allow_failures:
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
install:
- pip install tox

View File

@@ -1,85 +1,3 @@
Synapse 0.33.3 (2018-08-22)
===========================
Bugfixes
--------
- Fix bug introduced in v0.33.3rc1 which made the ToS give a 500 error ([\#3732](https://github.com/matrix-org/synapse/issues/3732))
Synapse 0.33.3rc2 (2018-08-21)
==============================
Bugfixes
--------
- Fix bug in v0.33.3rc1 which caused infinite loops and OOMs ([\#3723](https://github.com/matrix-org/synapse/issues/3723))
Synapse 0.33.3rc1 (2018-08-21)
==============================
Features
--------
- Add support for the SNI extension to federation TLS connections. Thanks to @vojeroen! ([\#3439](https://github.com/matrix-org/synapse/issues/3439))
- Add /_media/r0/config ([\#3184](https://github.com/matrix-org/synapse/issues/3184))
- speed up /members API and add `at` and `membership` params as per MSC1227 ([\#3568](https://github.com/matrix-org/synapse/issues/3568))
- implement `summary` block in /sync response as per MSC688 ([\#3574](https://github.com/matrix-org/synapse/issues/3574))
- Add lazy-loading support to /messages as per MSC1227 ([\#3589](https://github.com/matrix-org/synapse/issues/3589))
- Add ability to limit number of monthly active users on the server ([\#3633](https://github.com/matrix-org/synapse/issues/3633))
- Support more federation endpoints on workers ([\#3653](https://github.com/matrix-org/synapse/issues/3653))
- Basic support for room versioning ([\#3654](https://github.com/matrix-org/synapse/issues/3654))
- Ability to disable client/server Synapse via conf toggle ([\#3655](https://github.com/matrix-org/synapse/issues/3655))
- Ability to whitelist specific threepids against monthly active user limiting ([\#3662](https://github.com/matrix-org/synapse/issues/3662))
- Add some metrics for the appservice and federation event sending loops ([\#3664](https://github.com/matrix-org/synapse/issues/3664))
- Where server is disabled, block ability for locked out users to read new messages ([\#3670](https://github.com/matrix-org/synapse/issues/3670))
- set admin uri via config, to be used in error messages where the user should contact the administrator ([\#3687](https://github.com/matrix-org/synapse/issues/3687))
- Synapse's presence functionality can now be disabled with the "use_presence" configuration option. ([\#3694](https://github.com/matrix-org/synapse/issues/3694))
- For resource limit blocked users, prevent writing into rooms ([\#3708](https://github.com/matrix-org/synapse/issues/3708))
Bugfixes
--------
- Fix occasional glitches in the synapse_event_persisted_position metric ([\#3658](https://github.com/matrix-org/synapse/issues/3658))
- Fix bug on deleting 3pid when using identity servers that don't support unbind API ([\#3661](https://github.com/matrix-org/synapse/issues/3661))
- Make the tests pass on Twisted < 18.7.0 ([\#3676](https://github.com/matrix-org/synapse/issues/3676))
- Dont ship recaptcha_ajax.js, use it directly from Google ([\#3677](https://github.com/matrix-org/synapse/issues/3677))
- Fixes test_reap_monthly_active_users so it passes under postgres ([\#3681](https://github.com/matrix-org/synapse/issues/3681))
- Fix mau blocking calulation bug on login ([\#3689](https://github.com/matrix-org/synapse/issues/3689))
- Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users ([\#3692](https://github.com/matrix-org/synapse/issues/3692))
- Improve HTTP request logging to include all requests ([\#3700](https://github.com/matrix-org/synapse/issues/3700))
- Avoid timing out requests while we are streaming back the response ([\#3701](https://github.com/matrix-org/synapse/issues/3701))
- Support more federation endpoints on workers ([\#3705](https://github.com/matrix-org/synapse/issues/3705), [\#3713](https://github.com/matrix-org/synapse/issues/3713))
- Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning ([\#3710](https://github.com/matrix-org/synapse/issues/3710))
- Fix bug where `state_cache` cache factor ignored environment variables ([\#3719](https://github.com/matrix-org/synapse/issues/3719))
Deprecations and Removals
-------------------------
- The Shared-Secret registration method of the legacy v1/register REST endpoint has been removed. For a replacement, please see [the admin/register API documentation](https://github.com/matrix-org/synapse/blob/master/docs/admin_api/register_api.rst). ([\#3703](https://github.com/matrix-org/synapse/issues/3703))
Internal Changes
----------------
- The test suite now can run under PostgreSQL. ([\#3423](https://github.com/matrix-org/synapse/issues/3423))
- Refactor HTTP replication endpoints to reduce code duplication ([\#3632](https://github.com/matrix-org/synapse/issues/3632))
- Tests now correctly execute on Python 3. ([\#3647](https://github.com/matrix-org/synapse/issues/3647))
- Sytests can now be run inside a Docker container. ([\#3660](https://github.com/matrix-org/synapse/issues/3660))
- Port over enough to Python 3 to allow the sytests to start. ([\#3668](https://github.com/matrix-org/synapse/issues/3668))
- Update docker base image from alpine 3.7 to 3.8. ([\#3669](https://github.com/matrix-org/synapse/issues/3669))
- Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7. ([\#3678](https://github.com/matrix-org/synapse/issues/3678))
- Synapse's tests are now formatted with the black autoformatter. ([\#3679](https://github.com/matrix-org/synapse/issues/3679))
- Implemented a new testing base class to reduce test boilerplate. ([\#3684](https://github.com/matrix-org/synapse/issues/3684))
- Rename MAU prometheus metrics ([\#3690](https://github.com/matrix-org/synapse/issues/3690))
- add new error type ResourceLimit ([\#3707](https://github.com/matrix-org/synapse/issues/3707))
- Logcontexts for replication command handlers ([\#3709](https://github.com/matrix-org/synapse/issues/3709))
- Update admin register API documentation to reference a real user ID. ([\#3712](https://github.com/matrix-org/synapse/issues/3712))
Synapse 0.33.2 (2018-08-09)
===========================
@@ -106,7 +24,7 @@ Features
Bugfixes
--------
- Make /directory/list API return 404 for room not found instead of 400. Thanks to @fuzzmz! ([\#3620](https://github.com/matrix-org/synapse/issues/3620))
- Make /directory/list API return 404 for room not found instead of 400 ([\#2952](https://github.com/matrix-org/synapse/issues/2952))
- Default inviter_display_name to mxid for email invites ([\#3391](https://github.com/matrix-org/synapse/issues/3391))
- Don't generate TURN credentials if no TURN config options are set ([\#3514](https://github.com/matrix-org/synapse/issues/3514))
- Correctly announce deleted devices over federation ([\#3520](https://github.com/matrix-org/synapse/issues/3520))

View File

@@ -30,11 +30,11 @@ use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
We use `Jenkins <http://matrix.org/jenkins>`_ and
We use `Jenkins <http://matrix.org/jenkins>`_ and
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
pull request for feedback.
Code style
@@ -56,18 +56,17 @@ entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``PRnumber.type``. The type can be
file named in the format of ``issuenumberOrPR.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain Markdown
formatting. Adding credits to the changelog is encouraged, we value
your contributions and would like to have you shouted out in the
release notes!
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
For example, a fix in PR #1234 would have its changelog entry in
``changelog.d/1234.bugfix``, and contain content like "The security levels of
Florbs are now validated when recieved over federation. Contributed by Jane
Matrix".
For example, a fix for a bug reported in #1234 would have its
changelog entry in ``changelog.d/1234.bugfix``, and contain content
like "The security levels of Florbs are now validated when
recieved over federation. Contributed by Jane Matrix".
Attribution
~~~~~~~~~~~
@@ -126,7 +125,7 @@ the contribution or otherwise have the right to contribute it to Matrix::
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment::

View File

@@ -167,6 +167,11 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
tested with VirtualBox/AWS/DigitalOcean - see
https://github.com/EMnify/matrix-synapse-auto-deploy
for details.
Configuring synapse
-------------------

1
changelog.d/1491.feature Normal file
View File

@@ -0,0 +1 @@
Add support for the SNI extension to federation TLS connections

View File

@@ -1 +0,0 @@
Removed the link to the unmaintained matrix-synapse-auto-deploy project from the readme.

1
changelog.d/3423.misc Normal file
View File

@@ -0,0 +1 @@
The test suite now can run under PostgreSQL.

1
changelog.d/3632.misc Normal file
View File

@@ -0,0 +1 @@
Refactor HTTP replication endpoints to reduce code duplication

1
changelog.d/3633.feature Normal file
View File

@@ -0,0 +1 @@
Add ability to limit number of monthly active users on the server

1
changelog.d/3647.misc Normal file
View File

@@ -0,0 +1 @@
Tests now correctly execute on Python 3.

1
changelog.d/3653.feature Normal file
View File

@@ -0,0 +1 @@
Support more federation endpoints on workers

1
changelog.d/3654.feature Normal file
View File

@@ -0,0 +1 @@
Basic support for room versioning

1
changelog.d/3655.feature Normal file
View File

@@ -0,0 +1 @@
Ability to disable client/server Synapse via conf toggle

1
changelog.d/3658.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix occasional glitches in the synapse_event_persisted_position metric

View File

@@ -1 +0,0 @@
Support profile API endpoints on workers

1
changelog.d/3660.misc Normal file
View File

@@ -0,0 +1 @@
Sytests can now be run inside a Docker container.

1
changelog.d/3661.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug on deleting 3pid when using identity servers that don't support unbind API

1
changelog.d/3662.feature Normal file
View File

@@ -0,0 +1 @@
Ability to whitelist specific threepids against monthly active user limiting

1
changelog.d/3664.feature Normal file
View File

@@ -0,0 +1 @@
Add some metrics for the appservice and federation event sending loops

1
changelog.d/3669.misc Normal file
View File

@@ -0,0 +1 @@
Update docker base image from alpine 3.7 to 3.8.

1
changelog.d/3670.feature Normal file
View File

@@ -0,0 +1 @@
Where server is disabled, block ability for locked out users to read new messages

View File

@@ -1 +0,0 @@
Refactor state module to support multiple room versions

1
changelog.d/3676.bugfix Normal file
View File

@@ -0,0 +1 @@
Make the tests pass on Twisted < 18.7.0

1
changelog.d/3677.bugfix Normal file
View File

@@ -0,0 +1 @@
Dont ship recaptcha_ajax.js, use it directly from Google

1
changelog.d/3678.misc Normal file
View File

@@ -0,0 +1 @@
Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7.

1
changelog.d/3679.misc Normal file
View File

@@ -0,0 +1 @@
Synapse's tests are now formatted with the black autoformatter.

View File

@@ -1 +0,0 @@
Server notices for resource limit blocking

1
changelog.d/3681.bugfix Normal file
View File

@@ -0,0 +1 @@
Fixes test_reap_monthly_active_users so it passes under postgres

1
changelog.d/3684.misc Normal file
View File

@@ -0,0 +1 @@
Implemented a new testing base class to reduce test boilerplate.

1
changelog.d/3687.feature Normal file
View File

@@ -0,0 +1 @@
set admin uri via config, to be used in error messages where the user should contact the administrator

1
changelog.d/3690.misc Normal file
View File

@@ -0,0 +1 @@
Rename MAU prometheus metrics

1
changelog.d/3692.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users

View File

@@ -1 +0,0 @@
Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues

View File

@@ -1 +0,0 @@
Allow guests to use /rooms/:roomId/event/:eventId

View File

@@ -1 +0,0 @@
The synapse.storage module has been ported to Python 3.

View File

@@ -1 +0,0 @@
Split the state_group_cache into member and non-member state events (and so speed up LL /sync)

View File

@@ -1 +0,0 @@
Log failure to authenticate remote servers as warnings (without stack traces)

View File

@@ -1 +0,0 @@
The CONTRIBUTING guidelines have been updated to mention our use of Markdown and that .misc files have content.

View File

@@ -1 +0,0 @@
Reference the need for an HTTP replication port when using the federation_reader worker

View File

@@ -1 +0,0 @@
Fix minor spelling error in federation client documentation.

View File

@@ -1 +0,0 @@
Remove redundant state resolution function

View File

@@ -1 +0,0 @@
The test suite now passes on PostgreSQL.

View File

@@ -1 +0,0 @@
Fix MAU cache invalidation due to missing yield

View File

@@ -1 +0,0 @@
Fix bug where we resent "limit exceeded" server notices repeatedly

View File

@@ -1 +0,0 @@
Add mau_trial_days config param, so that users only get counted as MAU after N days.

View File

@@ -1 +0,0 @@
Require twisted 17.1 or later (fixes [#3741](https://github.com/matrix-org/synapse/issues/3741)).

View File

@@ -1 +0,0 @@
Fix bug where we broke sync when using limit_usage_by_mau but hadn't configured server notices

View File

@@ -1 +0,0 @@
Fix 'federation_domain_whitelist' such that an empty list correctly blocks all outbound federation traffic

View File

@@ -1 +0,0 @@
Fix tagging of server notice rooms

View File

@@ -1 +0,0 @@
Fix tagging of server notice rooms

View File

@@ -1 +0,0 @@
Fix 'admin_uri' config variable and error parameter to be 'admin_contact' to match the spec.

View File

@@ -1 +0,0 @@
Don't return non-LL-member state in incremental sync state blocks

View File

@@ -1 +0,0 @@
Make sure that we close db connections opened during init

View File

@@ -1 +0,0 @@
Fix bug in sending presence over federation

View File

@@ -1 +0,0 @@
Fix bug where preserved threepid user comes to sign up and server is mau blocked

View File

@@ -1 +0,0 @@
Improve human readable error messages for threepid registration/account update

View File

@@ -33,7 +33,7 @@ As an example::
< {
"access_token": "token_here",
"user_id": "@pepper_roni:localhost",
"user_id": "@pepper_roni@test",
"home_server": "test",
"device_id": "device_id_here"
}

View File

@@ -74,7 +74,7 @@ replication endpoints that it's talking to on the main synapse process.
``worker_replication_port`` should point to the TCP replication listener port and
``worker_replication_http_port`` should point to the HTTP replication port.
Currently, the ``event_creator`` and ``federation_reader`` workers require specifying
Currently, only the ``event_creator`` worker requires specifying
``worker_replication_http_port``.
For instance::
@@ -241,14 +241,6 @@ regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
If ``use_presence`` is False in the homeserver config, it can also handle REST
endpoints matching the following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status
This "stub" presence handler will pass through ``GET`` request but make the
``PUT`` effectively a no-op.
It will proxy any requests it cannot handle to the main synapse instance. It
must therefore be configured with the location of the main instance, via
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
@@ -265,7 +257,6 @@ Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.

View File

@@ -31,5 +31,5 @@ $TOX_BIN/pip install 'setuptools>=18.5'
$TOX_BIN/pip install 'pip>=10'
{ python synapse/python_dependencies.py
echo lxml
echo lxml psycopg2
} | xargs $TOX_BIN/pip install

View File

@@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.33.3"
__version__ = "0.33.2"

View File

@@ -25,8 +25,7 @@ from twisted.internet import defer
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
from synapse.api.errors import AuthError, Codes
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
@@ -212,7 +211,7 @@ class Auth(object):
user_agent = request.requestHeaders.getRawHeaders(
b"User-Agent",
default=[b""]
)[0].decode('ascii', 'surrogateescape')
)[0]
if user and access_token and ip_addr:
yield self.store.insert_client_ip(
user_id=user.to_string(),
@@ -683,7 +682,7 @@ class Auth(object):
Returns:
bool: False if no access_token was given, True otherwise.
"""
query_params = request.args.get(b"access_token")
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
return bool(query_params) or bool(auth_headers)
@@ -699,7 +698,7 @@ class Auth(object):
401 since some of the old clients depended on auth errors returning
403.
Returns:
unicode: The access_token
str: The access_token
Raises:
AuthError: If there isn't an access_token in the request.
"""
@@ -721,9 +720,9 @@ class Auth(object):
"Too many Authorization headers.",
errcode=Codes.MISSING_TOKEN,
)
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
return parts[1].decode('ascii')
parts = auth_headers[0].split(" ")
if parts[0] == "Bearer" and len(parts) == 2:
return parts[1]
else:
raise AuthError(
token_not_found_http_status,
@@ -739,7 +738,7 @@ class Auth(object):
errcode=Codes.MISSING_TOKEN
)
return query_params[0].decode('ascii')
return query_params[0]
@defer.inlineCallbacks
def check_in_room_or_world_readable(self, room_id, user_id):
@@ -776,56 +775,31 @@ class Auth(object):
)
@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None, threepid=None):
def check_auth_blocking(self, user_id=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
threepid(dict|None): If present, checks for presence against configured
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
"""
# Never fail an auth check for the server notices users
# This can be a problem where event creation is prohibited due to blocking
if user_id == self.hs.config.server_notices_mxid:
return
if self.hs.config.hs_disabled:
raise ResourceLimitError(
raise AuthError(
403, self.hs.config.hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_contact=self.hs.config.admin_contact,
limit_type=self.hs.config.hs_disabled_limit_type
admin_uri=self.hs.config.admin_uri,
)
if self.hs.config.limit_usage_by_mau is True:
assert not (user_id and threepid)
# If the user is already part of the MAU cohort or a trial user
# If the user is already part of the MAU cohort
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
is_trial = yield self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
# If the user does not exist yet, but is signing up with a
# reserved threepid then pass auth check
if is_threepid_reserved(self.hs.config, threepid):
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise ResourceLimitError(
403, "Monthly Active User Limit Exceeded",
admin_contact=self.hs.config.admin_contact,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type="monthly_active_user"
raise AuthError(
403, "Monthly Active User Limits AU Limit Exceeded",
admin_uri=self.hs.config.admin_uri,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED
)

View File

@@ -78,7 +78,6 @@ class EventTypes(object):
Name = "m.room.name"
ServerACL = "m.room.server_acl"
Pinned = "m.room.pinned_events"
class RejectedReason(object):
@@ -98,17 +97,9 @@ class ThirdPartyEntityKind(object):
LOCATION = "location"
class RoomVersions(object):
V1 = "1"
VDH_TEST = "vdh-test-version"
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = RoomVersions.V1
DEFAULT_ROOM_VERSION = "1"
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"}

View File

@@ -224,34 +224,15 @@ class NotFoundError(SynapseError):
class AuthError(SynapseError):
"""An error raised when there was a problem authorising an event."""
def __init__(self, *args, **kwargs):
if "errcode" not in kwargs:
kwargs["errcode"] = Codes.FORBIDDEN
super(AuthError, self).__init__(*args, **kwargs)
class ResourceLimitError(SynapseError):
"""
Any error raised when there is a problem with resource usage.
For instance, the monthly active user limit for the server has been exceeded
"""
def __init__(
self, code, msg,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_contact=None,
limit_type=None,
):
self.admin_contact = admin_contact
self.limit_type = limit_type
super(ResourceLimitError, self).__init__(code, msg, errcode=errcode)
def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None):
self.admin_uri = admin_uri
super(AuthError, self).__init__(code, msg, errcode=errcode)
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
admin_contact=self.admin_contact,
limit_type=self.limit_type
admin_uri=self.admin_uri,
)

View File

@@ -72,7 +72,7 @@ class Ratelimiter(object):
return allowed, time_allowed
def prune_message_counts(self, time_now_s):
for user_id in list(self.message_counts.keys()):
for user_id in self.message_counts.keys():
message_count, time_start, msg_rate_hz = (
self.message_counts[user_id]
)

View File

@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
logger.info("Metrics now reporting on %s:%d", host, port)
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
def listen_tcp(bind_addresses, port, factory, backlog=50):
"""
Create a TCP socket for a port and several addresses
"""
@@ -156,9 +156,7 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
check_bind_error(e, address, bind_addresses)
def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
"""
Create an SSL socket for a port and several addresses
"""

View File

@@ -51,7 +51,10 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
DATASTORE_CLASS = AppserviceSlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -114,9 +117,8 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()

View File

@@ -74,7 +74,10 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
DATASTORE_CLASS = ClientReaderSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -45,11 +45,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
RoomMembershipRestServlet,
@@ -58,7 +53,6 @@ from synapse.rest.client.v1.room import (
)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
@@ -68,9 +62,6 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
DirectoryStore,
SlavedTransactionStore,
SlavedProfileStore,
@@ -90,7 +81,10 @@ class EventCreatorSlavedStore(
class EventCreatorServer(HomeServer):
DATASTORE_CLASS = EventCreatorSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -107,9 +101,6 @@ class EventCreatorServer(HomeServer):
RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource)
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,

View File

@@ -32,7 +32,6 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
@@ -55,7 +54,6 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
SlavedAccountDataStore,
SlavedProfileStore,
SlavedApplicationServiceStore,
SlavedPusherStore,
@@ -72,7 +70,10 @@ class FederationReaderSlavedStore(
class FederationReaderServer(HomeServer):
DATASTORE_CLASS = FederationReaderSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -78,7 +78,10 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
DATASTORE_CLASS = FederationSenderSlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -141,9 +144,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(FederationSenderReplicationHandler, self).on_rdata(
super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)

View File

@@ -38,7 +38,6 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -50,35 +49,6 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -148,7 +118,10 @@ class FrontendProxySlavedStore(
class FrontendProxyServer(HomeServer):
DATASTORE_CLASS = FrontendProxySlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -162,12 +135,6 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -186,8 +153,7 @@ class FrontendProxyServer(HomeServer):
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor()
)
)
logger.info("Synapse client reader now listening on port %d", port)

View File

@@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage import are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -111,8 +111,6 @@ def build_resource_for_web_client(hs):
class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
def _listener_http(self, config, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
@@ -358,13 +356,13 @@ def setup(config_options):
logger.info("Preparing database: %s...", config.database_config['name'])
try:
with hs.get_db_conn(run_new_connection=False) as db_conn:
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
db_conn = hs.get_db_conn(run_new_connection=False)
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
hs.run_startup_checks(db_conn, database_engine)
hs.run_startup_checks(db_conn, database_engine)
db_conn.commit()
db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
@@ -527,7 +525,6 @@ def run(hs):
clock.looping_call(
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
)
hs.get_datastore().reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():

View File

@@ -60,7 +60,10 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
DATASTORE_CLASS = MediaRepositorySlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]

View File

@@ -78,7 +78,10 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
DATASTORE_CLASS = PusherSlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -145,9 +148,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
@@ -160,11 +162,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
self.pusher_pool.on_new_notifications(
yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
self.pusher_pool.on_new_receipts(
yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:

View File

@@ -114,10 +114,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
)
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
@@ -214,13 +211,10 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
else:
return set()
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
class SynchrotronTyping(object):
@@ -249,7 +243,10 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
DATASTORE_CLASS = SynchrotronSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -335,9 +332,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):

View File

@@ -94,7 +94,10 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
DATASTORE_CLASS = UserDirectorySlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -166,9 +169,8 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":

View File

@@ -168,8 +168,7 @@ def setup_logging(config, use_worker_options=False):
if log_file:
# TODO: Customisable file size / backup count
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3,
encoding='utf8'
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
)
def sighup(signum, stack):

View File

@@ -49,9 +49,6 @@ class ServerConfig(Config):
# "disable" federation
self.send_federation = config.get("send_federation", True)
# Whether to enable user presence.
self.use_presence = config.get("use_presence", True)
# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
@@ -77,23 +74,17 @@ class ServerConfig(Config):
self.max_mau_value = config.get(
"max_mau_value", 0,
)
self.mau_limits_reserved_threepids = config.get(
"mau_limit_reserved_threepids", []
)
self.mau_trial_days = config.get(
"mau_trial_days", 0,
)
# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
self.hs_disabled_limit_type = config.get("hs_disabled_limit_type", "")
# Admin uri to direct users at should their instance become blocked
# due to resource constraints
self.admin_contact = config.get("admin_contact", None)
self.admin_uri = config.get("admin_uri", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
@@ -258,9 +249,6 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
use_presence: true
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]
@@ -352,33 +340,6 @@ class ServerConfig(Config):
# - port: 9000
# bind_addresses: ['::1', '127.0.0.1']
# type: manhole
# Homeserver blocking
#
# How to reach the server admin, used in ResourceLimitError
# admin_contact: 'mailto:admin@server.com'
#
# Global block config
#
# hs_disabled: False
# hs_disabled_message: 'Human readable reason for why the HS is blocked'
# hs_disabled_limit_type: 'error code(str), to help clients decode reason'
#
# Monthly Active User Blocking
#
# Enables monthly active user checking
# limit_usage_by_mau: False
# max_mau_value: 50
# mau_trial_days: 2
#
# Sometimes the server admin will want to ensure certain accounts are
# never blocked by mau checking. These accounts are specified here.
#
# mau_limit_reserved_threepids:
# - medium: 'email'
# address: 'reserved_user@example.com'
""" % locals()
def read_arguments(self, args):
@@ -404,23 +365,6 @@ class ServerConfig(Config):
" service on the given port.")
def is_threepid_reserved(config, threepid):
"""Check the threepid against the reserved threepid config
Args:
config(ServerConfig) - to access server config attributes
threepid(dict) - The threepid to test for
Returns:
boolean Is the threepid undertest reserved_user
"""
for tp in config.mau_limits_reserved_threepids:
if (threepid['medium'] == tp['medium']
and threepid['address'] == tp['address']):
return True
return False
def read_gc_thresholds(thresholds):
"""Reads the three integer thresholds for garbage collection. Ensures that
the thresholds are integers if thresholds are supplied.

View File

@@ -18,9 +18,7 @@ import logging
from canonicaljson import json
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
from twisted.internet.protocol import Factory
from twisted.names.error import DomainError
from twisted.web.http import HTTPClient
from synapse.http.endpoint import matrix_federation_endpoint
@@ -49,14 +47,12 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
server_response, server_certificate = yield protocol.remote_key
defer.returnValue((server_response, server_certificate))
except SynapseKeyClientError as e:
logger.warn("Error getting key for %r: %s", server_name, e)
logger.exception("Error getting key for %r" % (server_name,))
if e.status.startswith("4"):
# Don't retry for 4xx responses.
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
except Exception as e:
logger.exception("Error getting key for %r", server_name)
logger.exception(e)
raise IOError("Cannot get key for %r" % server_name)

View File

@@ -32,7 +32,7 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
from six import iteritems
from six import iteritems, itervalues
from sortedcontainers import SortedDict
@@ -117,7 +117,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
for uids in self.presence_changed.values()
for uids in itervalues(self.presence_changed)
for user_id in uids
)

View File

@@ -58,7 +58,6 @@ class TransactionQueue(object):
"""
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
@@ -309,9 +308,6 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled

View File

@@ -106,7 +106,7 @@ class TransportLayerClient(object):
dest (str)
room_id (str)
event_tuples (list)
limit (int)
limt (int)
Returns:
Deferred: Results in a dict received from the remote homeserver.

View File

@@ -261,10 +261,10 @@ class BaseFederationServlet(object):
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
logger.exception("authenticate_request failed")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
except Exception:
logger.exception("authenticate_request failed")
raise
if origin:

View File

@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
yield self.auth.check_auth_blocking(user_id)
yield self.auth.check_auth_blocking()
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -734,6 +734,7 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
yield self.auth.check_auth_blocking()
auth_api = self.hs.get_auth()
user_id = None
try:
@@ -742,7 +743,6 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@defer.inlineCallbacks

View File

@@ -291,9 +291,8 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
room_version = yield self.store.get_room_version(pdu.room_id)
state_map = yield resolve_events_with_factory(
room_version, state_groups, {pdu.event_id: pdu}, fetch
state_groups, {pdu.event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
@@ -1829,10 +1828,7 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
room_version = yield self.store.get_room_version(event.room_id)
new_state = yield self.state_handler.resolve_events(
room_version,
new_state = self.state_handler.resolve_events(
[list(local_view.values()), list(remote_view.values())],
event
)
@@ -2390,7 +2386,8 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
self.pusher_pool.on_new_notifications(
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)

View File

@@ -372,10 +372,6 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,

View File

@@ -25,13 +25,7 @@ from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
NotFoundError,
SynapseError,
)
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
@@ -42,7 +36,6 @@ from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -89,85 +82,28 @@ class MessageHandler(object):
defer.returnValue(data)
@defer.inlineCallbacks
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
at_token=None, is_guest=False,
):
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left. If an explicit
'at' parameter is passed, return the state events as of that event, if
visible.
left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
NotFoundError (404) if the at token does not yield an event
AuthError (403) if the user doesn't have permission to view
members of this room.
"""
if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1,
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
)
if not last_events:
raise NotFoundError("Can't find event for token %s" % (at_token, ))
visible_events = yield filter_events_for_client(
self.store, user_id, last_events,
)
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
[event.event_id], types, filtered_types=filtered_types,
)
room_state = room_state[event.event_id]
else:
raise AuthError(
403,
"User %s not allowed to view events in room %s at token %s" % (
user_id, room_id, at_token,
)
)
else:
membership, membership_event_id = (
yield self.auth.check_in_room_or_world_readable(
room_id, user_id,
)
)
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], types, filtered_types=filtered_types,
)
room_state = room_state[membership_event_id]
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
@@ -276,14 +212,10 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
Returns:
Tuple of created event (FrozenEvent), Context
"""
yield self.auth.check_auth_blocking(requester.user.to_string())
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
@@ -778,8 +710,11 @@ class EventCreationHandler(object):
event, context=context
)
self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)
def _notify():

View File

@@ -18,7 +18,7 @@ import logging
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
@@ -251,33 +251,6 @@ class PaginationHandler(object):
is_peeking=(member_event_id is None),
)
state = None
if event_filter and event_filter.lazy_load_members():
# TODO: remove redundant members
types = [
(EventTypes.Member, state_key)
for state_key in set(
event.sender # FIXME: we also care about invite targets etc.
for event in events
)
]
state_ids = yield self.store.get_state_ids_for_event(
events[0].event_id, types=types,
)
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
if state:
state = yield filter_events_for_client(
self.store,
user_id,
state.values(),
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
@@ -289,10 +262,4 @@ class PaginationHandler(object):
"end": next_token.to_string(),
}
if state:
chunk["state"] = [
serialize_event(e, time_now, as_client_event)
for e in state
]
defer.returnValue(chunk)

View File

@@ -395,10 +395,6 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.use_presence:
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -428,11 +424,6 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
# Override if it should affect the user's presence, if presence is
# disabled.
if not self.hs.config.use_presence:
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -478,16 +469,13 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
if self.hs.config.use_presence:
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):

View File

@@ -32,16 +32,12 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
class BaseProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.
BaseProfileHandler can be instantiated directly on workers and will
delegate to master when necessary. The master process should use the
subclass MasterProfileHandler
"""
class ProfileHandler(BaseHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(BaseProfileHandler, self).__init__(hs)
super(ProfileHandler, self).__init__(hs)
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -50,6 +46,11 @@ class BaseProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
if hs.config.worker_app is None:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
@@ -281,20 +282,6 @@ class BaseProfileHandler(BaseHandler):
room_id, str(e.message)
)
class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(MasterProfileHandler, self).__init__(hs)
assert hs.config.worker_app is None
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache,

View File

@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.types import get_domain_from_id
from synapse.util import logcontext
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@@ -115,15 +116,16 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)
defer.returnValue(True)
defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks

View File

@@ -125,7 +125,6 @@ class RegistrationHandler(BaseHandler):
guest_access_token=None,
make_guest=False,
admin=False,
threepid=None,
):
"""Registers a new client on the server.
@@ -146,7 +145,7 @@ class RegistrationHandler(BaseHandler):
RegistrationError if there was a problem registering.
"""
yield self.auth.check_auth_blocking(threepid=threepid)
yield self.auth.check_auth_blocking()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)

View File

@@ -98,13 +98,9 @@ class RoomCreationHandler(BaseHandler):
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
ResourceLimitError if server is blocked to some resource being
exceeded
"""
user_id = requester.user.to_string()
self.auth.check_auth_blocking(user_id)
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")

View File

@@ -344,7 +344,6 @@ class RoomMemberHandler(object):
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)

View File

@@ -75,7 +75,6 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral",
"account_data",
"unread_notifications",
"summary",
])):
__slots__ = []
@@ -185,7 +184,6 @@ class SyncResult(collections.namedtuple("SyncResult", [
class SyncHandler(object):
def __init__(self, hs):
self.hs_config = hs.config
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
@@ -505,142 +503,10 @@ class SyncHandler(object):
state = {}
defer.returnValue(state)
@defer.inlineCallbacks
def compute_summary(self, room_id, sync_config, batch, state, now_token):
""" Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
state events to 'state' if needed to describe the heroes.
Args:
room_id(str):
sync_config(synapse.handlers.sync.SyncConfig):
batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
the room that will be sent to the user.
state(dict): dict of (type, state_key) -> Event as returned by
compute_state_delta
now_token(str): Token of the end of the current batch.
Returns:
A deferred dict describing the room summary
"""
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=now_token.room_key, limit=1,
)
if not last_events:
defer.returnValue(None)
return
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id, [
(EventTypes.Member, None),
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
]
)
member_ids = {
state_key: event_id
for (t, state_key), event_id in state_ids.iteritems()
if t == EventTypes.Member
}
name_id = state_ids.get((EventTypes.Name, ''))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
summary = {}
# FIXME: it feels very heavy to load up every single membership event
# just to calculate the counts.
member_events = yield self.store.get_events(member_ids.values())
joined_user_ids = []
invited_user_ids = []
for ev in member_events.values():
if ev.content.get("membership") == Membership.JOIN:
joined_user_ids.append(ev.state_key)
elif ev.content.get("membership") == Membership.INVITE:
invited_user_ids.append(ev.state_key)
# TODO: only send these when they change.
summary["m.joined_member_count"] = len(joined_user_ids)
summary["m.invited_member_count"] = len(invited_user_ids)
if name_id or canonical_alias_id:
defer.returnValue(summary)
# FIXME: order by stream ordering, not alphabetic
me = sync_config.user.to_string()
if (joined_user_ids or invited_user_ids):
summary['m.heroes'] = sorted(
[
user_id
for user_id in (joined_user_ids + invited_user_ids)
if user_id != me
]
)[0:5]
else:
summary['m.heroes'] = sorted(
[user_id for user_id in member_ids.keys() if user_id != me]
)[0:5]
if not sync_config.filter_collection.lazy_load_members():
defer.returnValue(summary)
# ensure we send membership events for heroes if needed
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
# track which members the client should already know about via LL:
# Ones which are already in state...
existing_members = set(
user_id for (typ, user_id) in state.keys()
if typ == EventTypes.Member
)
# ...or ones which are in the timeline...
for ev in batch.events:
if ev.type == EventTypes.Member:
existing_members.add(ev.state_key)
# ...and then ensure any missing ones get included in state.
missing_hero_event_ids = [
member_ids[hero_id]
for hero_id in summary['m.heroes']
if (
cache.get(hero_id) != member_ids[hero_id] and
hero_id not in existing_members
)
]
missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
missing_hero_state = missing_hero_state.values()
for s in missing_hero_state:
cache.set(s.state_key, s.event_id)
state[(EventTypes.Member, s.state_key)] = s
defer.returnValue(summary)
def get_lazy_loaded_members_cache(self, cache_key):
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
return cache
@defer.inlineCallbacks
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
full_state):
""" Works out the difference in state between the start of the timeline
""" Works out the differnce in state between the start of the timeline
and the previous sync.
Args:
@@ -654,7 +520,7 @@ class SyncHandler(object):
full_state(bool): Whether to force returning the full state.
Returns:
A deferred dict of (type, state_key) -> Event
A deferred new event dictionary
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -745,21 +611,20 @@ class SyncHandler(object):
state_ids = {}
if lazy_load_members:
if types:
# We're returning an incremental sync, with no "gap" since
# the previous sync, so normally there would be no state to return
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here, and then dedupe any redundant ones below.
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=None, # we only want members!
filtered_types=filtered_types,
)
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
@@ -868,7 +733,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if self.hs_config.use_presence and not block_all_presence_data:
if not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
@@ -1560,6 +1425,7 @@ class SyncHandler(object):
if events == [] and tags is None:
return
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -1602,18 +1468,6 @@ class SyncHandler(object):
full_state=full_state
)
summary = {}
if (
sync_config.filter_collection.lazy_load_members() and
(
any(ev.type == EventTypes.Member for ev in batch.events) or
since_token is None
)
):
summary = yield self.compute_summary(
room_id, sync_config, batch, state, now_token
)
if room_builder.rtype == "joined":
unread_notifications = {}
room_sync = JoinedSyncResult(
@@ -1623,7 +1477,6 @@ class SyncHandler(object):
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
)
if room_sync or always_include:

View File

@@ -119,8 +119,6 @@ class UserDirectoryHandler(object):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None,
)
@@ -129,8 +127,6 @@ class UserDirectoryHandler(object):
def handle_user_deactivated(self, user_id):
"""Called when a user ID is deactivated
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id)

View File

@@ -133,7 +133,7 @@ class MatrixFederationHttpClient(object):
failures, connection failures, SSL failures.)
"""
if (
self.hs.config.federation_domain_whitelist is not None and
self.hs.config.federation_domain_whitelist and
destination not in self.hs.config.federation_domain_whitelist
):
raise FederationDeniedError(destination)

View File

@@ -15,7 +15,6 @@
# limitations under the License.
import logging
import threading
from prometheus_client.core import Counter, Histogram
@@ -112,9 +111,6 @@ in_flight_requests_db_sched_duration = Counter(
# The set of all in flight requests, set[RequestMetrics]
_in_flight_requests = set()
# Protects the _in_flight_requests set from concurrent accesss
_in_flight_requests_lock = threading.Lock()
def _get_in_flight_counts():
"""Returns a count of all in flight requests by (method, server_name)
@@ -124,8 +120,7 @@ def _get_in_flight_counts():
"""
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
with _in_flight_requests_lock:
reqs = list(_in_flight_requests)
reqs = list(_in_flight_requests)
for rm in reqs:
rm.update_metrics()
@@ -159,12 +154,10 @@ class RequestMetrics(object):
# to the "in flight" metrics.
self._request_stats = self.start_context.get_resource_usage()
with _in_flight_requests_lock:
_in_flight_requests.add(self)
_in_flight_requests.add(self)
def stop(self, time_sec, request):
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
_in_flight_requests.discard(self)
context = LoggingContext.current_context()

View File

@@ -25,9 +25,8 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso
from twisted.internet import defer
from twisted.python import failure
from twisted.web import resource
from twisted.web import resource, server
from twisted.web.server import NOT_DONE_YET
from twisted.web.static import NoRangeStaticProducer
from twisted.web.util import redirectTo
import synapse.events
@@ -38,13 +37,10 @@ from synapse.api.errors import (
SynapseError,
UnrecognizedRequestError,
)
from synapse.http.request_metrics import requests_counter
from synapse.util.caches import intern_dict
from synapse.util.logcontext import preserve_fn
if PY3:
from io import BytesIO
else:
from cStringIO import StringIO as BytesIO
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -64,10 +60,11 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
def wrap_json_request_handler(h):
"""Wraps a request handler method with exception handling.
Also does the wrapping with request.processing as per wrap_async_request_handler.
Also adds logging as per wrap_request_handler_with_logging.
The handler method must have a signature of "handle_foo(self, request)",
where "request" must be a SynapseRequest.
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
The handler must return a deferred. If the deferred succeeds we assume that
a response has been sent. If the deferred fails with a SynapseError we use
@@ -111,23 +108,24 @@ def wrap_json_request_handler(h):
pretty_print=_request_user_agent_is_curl(request),
)
return wrap_async_request_handler(wrapped_request_handler)
return wrap_request_handler_with_logging(wrapped_request_handler)
def wrap_html_request_handler(h):
"""Wraps a request handler method with exception handling.
Also does the wrapping with request.processing as per wrap_async_request_handler.
Also adds logging as per wrap_request_handler_with_logging.
The handler method must have a signature of "handle_foo(self, request)",
where "request" must be a SynapseRequest.
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
"""
def wrapped_request_handler(self, request):
d = defer.maybeDeferred(h, self, request)
d.addErrback(_return_html_error, request)
return d
return wrap_async_request_handler(wrapped_request_handler)
return wrap_request_handler_with_logging(wrapped_request_handler)
def _return_html_error(f, request):
@@ -172,26 +170,46 @@ def _return_html_error(f, request):
finish_request(request)
def wrap_async_request_handler(h):
"""Wraps an async request handler so that it calls request.processing.
This helps ensure that work done by the request handler after the request is completed
is correctly recorded against the request metrics/logs.
def wrap_request_handler_with_logging(h):
"""Wraps a request handler to provide logging and metrics
The handler method must have a signature of "handle_foo(self, request)",
where "request" must be a SynapseRequest.
where "self" must have a "clock" attribute (and "request" must be a
SynapseRequest).
The handler may return a deferred, in which case the completion of the request isn't
logged until the deferred completes.
As well as calling `request.processing` (which will log the response and
duration for this request), the wrapped request handler will insert the
request id into the logging context.
"""
@defer.inlineCallbacks
def wrapped_async_request_handler(self, request):
with request.processing():
yield h(self, request)
def wrapped_request_handler(self, request):
"""
Args:
self:
request (synapse.http.site.SynapseRequest):
"""
# we need to preserve_fn here, because the synchronous render method won't yield for
# us (obviously)
return preserve_fn(wrapped_async_request_handler)
request_id = request.get_request_id()
with LoggingContext(request_id) as request_context:
request_context.request = request_id
with Measure(self.clock, "wrapped_request_handler"):
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = self.__class__.__name__
with request.processing(servlet_name):
with PreserveLoggingContext(request_context):
d = defer.maybeDeferred(h, self, request)
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(request.method,
request.request_metrics.name).inc()
yield d
return wrapped_request_handler
class HttpServer(object):
@@ -254,7 +272,7 @@ class JsonResource(HttpServer, resource.Resource):
""" This gets called by twisted every time someone sends us a request.
"""
self._async_render(request)
return NOT_DONE_YET
return server.NOT_DONE_YET
@wrap_json_request_handler
@defer.inlineCallbacks
@@ -395,7 +413,8 @@ def respond_with_json(request, code, json_object, send_cors=False,
return
if pretty_print:
json_bytes = encode_pretty_printed_json(json_object) + b"\n"
json_bytes = (encode_pretty_printed_json(json_object) + "\n"
).encode("utf-8")
else:
if canonical_json or synapse.events.USE_FROZEN_DICTS:
# canonicaljson already encodes to bytes
@@ -431,12 +450,8 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
if send_cors:
set_cors_headers(request)
# todo: we can almost certainly avoid this copy and encode the json straight into
# the bytesIO, but it would involve faffing around with string->bytes wrappers.
bytes_io = BytesIO(json_bytes)
producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
request.write(json_bytes)
finish_request(request)
return NOT_DONE_YET

View File

@@ -29,7 +29,7 @@ def parse_integer(request, name, default=None, required=False):
Args:
request: the twisted HTTP request.
name (bytes/unicode): the name of the query parameter.
name (str): the name of the query parameter.
default (int|None): value to use if the parameter is absent, defaults
to None.
required (bool): whether to raise a 400 SynapseError if the
@@ -46,10 +46,6 @@ def parse_integer(request, name, default=None, required=False):
def parse_integer_from_args(args, name, default=None, required=False):
if not isinstance(name, bytes):
name = name.encode('ascii')
if name in args:
try:
return int(args[name][0])
@@ -69,7 +65,7 @@ def parse_boolean(request, name, default=None, required=False):
Args:
request: the twisted HTTP request.
name (bytes/unicode): the name of the query parameter.
name (str): the name of the query parameter.
default (bool|None): value to use if the parameter is absent, defaults
to None.
required (bool): whether to raise a 400 SynapseError if the
@@ -87,15 +83,11 @@ def parse_boolean(request, name, default=None, required=False):
def parse_boolean_from_args(args, name, default=None, required=False):
if not isinstance(name, bytes):
name = name.encode('ascii')
if name in args:
try:
return {
b"true": True,
b"false": False,
"true": True,
"false": False,
}[args[name][0]]
except Exception:
message = (
@@ -112,29 +104,21 @@ def parse_boolean_from_args(args, name, default=None, required=False):
def parse_string(request, name, default=None, required=False,
allowed_values=None, param_type="string", encoding='ascii'):
"""
Parse a string parameter from the request query string.
If encoding is not None, the content of the query param will be
decoded to Unicode using the encoding, otherwise it will be encoded
allowed_values=None, param_type="string"):
"""Parse a string parameter from the request query string.
Args:
request: the twisted HTTP request.
name (bytes/unicode): the name of the query parameter.
default (bytes/unicode|None): value to use if the parameter is absent,
defaults to None. Must be bytes if encoding is None.
name (str): the name of the query parameter.
default (str|None): value to use if the parameter is absent, defaults
to None.
required (bool): whether to raise a 400 SynapseError if the
parameter is absent, defaults to False.
allowed_values (list[bytes/unicode]): List of allowed values for the
string, or None if any value is allowed, defaults to None. Must be
the same type as name, if given.
encoding: The encoding to decode the name to, and decode the string
content with.
allowed_values (list[str]): List of allowed values for the string,
or None if any value is allowed, defaults to None
Returns:
bytes/unicode|None: A string value or the default. Unicode if encoding
was given, bytes otherwise.
str|None: A string value or the default.
Raises:
SynapseError if the parameter is absent and required, or if the
@@ -142,22 +126,14 @@ def parse_string(request, name, default=None, required=False,
is not one of those allowed values.
"""
return parse_string_from_args(
request.args, name, default, required, allowed_values, param_type, encoding
request.args, name, default, required, allowed_values, param_type,
)
def parse_string_from_args(args, name, default=None, required=False,
allowed_values=None, param_type="string", encoding='ascii'):
if not isinstance(name, bytes):
name = name.encode('ascii')
allowed_values=None, param_type="string"):
if name in args:
value = args[name][0]
if encoding:
value = value.decode(encoding)
if allowed_values is not None and value not in allowed_values:
message = "Query parameter %r must be one of [%s]" % (
name, ", ".join(repr(v) for v in allowed_values)
@@ -170,10 +146,6 @@ def parse_string_from_args(args, name, default=None, required=False,
message = "Missing %s query parameter %r" % (param_type, name)
raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
else:
if encoding and isinstance(default, bytes):
return default.decode(encoding)
return default

Some files were not shown because too many files have changed in this diff Show More