1
0

Compare commits

..

6 Commits

Author SHA1 Message Date
Erik Johnston
68b9eb694f Tidy up hs.get_federation_sender() calls 2021-04-15 14:13:45 +01:00
Erik Johnston
9f1a20f0c2 Remove the federation replication stream and associated commands 2021-04-15 14:13:18 +01:00
Erik Johnston
7575a686fd Fix module api 2021-04-15 13:49:21 +01:00
Erik Johnston
40bc8774c8 Newsfile 2021-04-15 12:04:13 +01:00
Erik Johnston
5c63b653c8 Add a presence federation replication stream 2021-04-15 12:03:23 +01:00
Erik Johnston
8f566077fb Always use send_presence_to_destinations rather than send_presence
This a) reduces the API surface and b) means that we calculate where to
send presence on the presence writer, rather than federation senders.
2021-04-15 11:33:54 +01:00
162 changed files with 1292 additions and 3097 deletions

View File

@@ -1,66 +1,11 @@
Synapse 1.32.2 (2021-04-22)
===========================
This release includes a fix for a regression introduced in 1.32.0.
Bugfixes
--------
- Fix a regression in Synapse 1.32.0 and 1.32.1 which caused `LoggingContext` errors in plugins. ([\#9857](https://github.com/matrix-org/synapse/issues/9857))
Synapse 1.32.1 (2021-04-21)
===========================
This release fixes [a regression](https://github.com/matrix-org/synapse/issues/9853)
in Synapse 1.32.0 that caused connected Prometheus instances to become unstable.
However, as this release is still subject to the `LoggingContext` change in 1.32.0,
it is recommended to remain on or downgrade to 1.31.0.
Bugfixes
--------
- Fix a regression in Synapse 1.32.0 which caused Synapse to report large numbers of Prometheus time series, potentially overwhelming Prometheus instances. ([\#9854](https://github.com/matrix-org/synapse/issues/9854))
Synapse 1.32.0 (2021-04-20)
===========================
**Note:** This release introduces [a regression](https://github.com/matrix-org/synapse/issues/9853)
that can overwhelm connected Prometheus instances. This issue was not present in
1.32.0rc1. If affected, it is recommended to downgrade to 1.31.0 in the meantime, and
follow [these instructions](https://github.com/matrix-org/synapse/pull/9854#issuecomment-823472183)
to clean up any excess writeahead logs.
**Note:** This release also mistakenly included a change that may affected Synapse
modules that import `synapse.logging.context.LoggingContext`, such as
[synapse-s3-storage-provider](https://github.com/matrix-org/synapse-s3-storage-provider).
This will be fixed in a later Synapse version.
Synapse 1.32.0rc1 (2021-04-13)
==============================
**Note:** This release requires Python 3.6+ and Postgres 9.6+ or SQLite 3.22+.
This release removes the deprecated `GET /_synapse/admin/v1/users/<user_id>` admin API. Please use the [v2 API](https://github.com/matrix-org/synapse/blob/develop/docs/admin_api/user_admin_api.rst#query-user-account) instead, which has improved capabilities.
This release requires Application Services to use type `m.login.application_service` when registering users via the `/_matrix/client/r0/register` endpoint to comply with the spec. Please ensure your Application Services are up to date.
If you are using the `packages.matrix.org` Debian repository for Synapse packages,
note that we have recently updated the expiry date on the gpg signing key. If you see an
error similar to `The following signatures were invalid: EXPKEYSIG F473DD4473365DE1`, you
will need to get a fresh copy of the keys. You can do so with:
```sh
sudo wget -O /usr/share/keyrings/matrix-org-archive-keyring.gpg https://packages.matrix.org/debian/matrix-org-archive-keyring.gpg
```
Bugfixes
--------
- Fix the log lines of nested logging contexts. Broke in 1.32.0rc1. ([\#9829](https://github.com/matrix-org/synapse/issues/9829))
Synapse 1.32.0rc1 (2021-04-13)
==============================
This release requires Application Services to use type `m.login.application_services` when registering users via the `/_matrix/client/r0/register` endpoint to comply with the spec. Please ensure your Application Services are up to date.
Features
--------

View File

@@ -85,52 +85,9 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.33.0
====================
Account Validity HTML templates can now display a user's expiration date
------------------------------------------------------------------------
This may affect you if you have enabled the account validity feature, and have made use of a
custom HTML template specified by the ``account_validity.template_dir`` or ``account_validity.account_renewed_html_path``
Synapse config options.
The template can now accept an ``expiration_ts`` variable, which represents the unix timestamp in milliseconds for the
future date of which their account has been renewed until. See the
`default template <https://github.com/matrix-org/synapse/blob/release-v1.33.0/synapse/res/templates/account_renewed.html>`_
for an example of usage.
ALso note that a new HTML template, ``account_previously_renewed.html``, has been added. This is is shown to users
when they attempt to renew their account with a valid renewal token that has already been used before. The default
template contents can been found
`here <https://github.com/matrix-org/synapse/blob/release-v1.33.0/synapse/res/templates/account_previously_renewed.html>`_,
and can also accept an ``expiration_ts`` variable. This template replaces the error message users would previously see
upon attempting to use a valid renewal token more than once.
Upgrading to v1.32.0
====================
Regression causing connected Prometheus instances to become overwhelmed
-----------------------------------------------------------------------
This release introduces `a regression <https://github.com/matrix-org/synapse/issues/9853>`_
that can overwhelm connected Prometheus instances. This issue is not present in
Synapse v1.32.0rc1.
If you have been affected, please downgrade to 1.31.0. You then may need to
remove excess writeahead logs in order for Prometheus to recover. Instructions
for doing so are provided
`here <https://github.com/matrix-org/synapse/pull/9854#issuecomment-823472183>`_.
Dropping support for old Python, Postgres and SQLite versions
-------------------------------------------------------------
In line with our `deprecation policy <https://github.com/matrix-org/synapse/blob/release-v1.32.0/docs/deprecation_policy.md>`_,
we've dropped support for Python 3.5 and PostgreSQL 9.5, as they are no longer supported upstream.
This release of Synapse requires Python 3.6+ and PostgresSQL 9.6+ or SQLite 3.22+.
Removal of old List Accounts Admin API
--------------------------------------
@@ -141,16 +98,6 @@ has been available since Synapse 1.7.0 (2019-12-13), and is accessible under ``G
The deprecation of the old endpoint was announced with Synapse 1.28.0 (released on 2021-02-25).
Application Services must use type ``m.login.application_service`` when registering users
-----------------------------------------------------------------------------------------
In compliance with the
`Application Service spec <https://matrix.org/docs/spec/application_service/r0.1.2#server-admin-style-permissions>`_,
Application Services are now required to use the ``m.login.application_service`` type when registering users via the
``/_matrix/client/r0/register`` endpoint. This behaviour was deprecated in Synapse v1.30.0.
Please ensure your Application Services are up to date.
Upgrading to v1.29.0
====================

View File

@@ -1 +0,0 @@
Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path.

View File

@@ -1 +0,0 @@
Add some sanity checks to identity server passed to 3PID bind/unbind endpoints.

View File

@@ -1 +0,0 @@
Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.

View File

@@ -1 +0,0 @@
Rename some handlers and config modules to not duplicate the top-level module.

View File

@@ -1 +0,0 @@
Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Reduce CPU usage of the user directory by reusing existing calculated room membership.

View File

@@ -1 +0,0 @@
Small speed up for joining large remote rooms.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Don't return an error when a user attempts to renew their account multiple times with the same token. Instead, state when their account is set to expire. This change concerns the optional account validity feature.

View File

@@ -1 +0,0 @@
Limit the size of HTTP responses read over federation.

View File

@@ -1 +0,0 @@
Introduce flake8-bugbear to the test suite and fix some of its lint violations.

View File

@@ -1 +0,0 @@
Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Limit length of accepted email addresses.

View File

@@ -1 +0,0 @@
Remove redundant `synapse.types.Collection` type definition.

View File

@@ -1 +0,0 @@
Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts.

View File

@@ -1 +0,0 @@
Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists.

View File

@@ -1 +0,0 @@
Disable invite rate-limiting by default when running the unit tests.

View File

@@ -1 +0,0 @@
Pass a reactor into `SynapseSite` to make testing easier.

View File

@@ -1 +0,0 @@
Make `DomainSpecificString` an `attrs` class.

View File

@@ -1 +0,0 @@
Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules.

View File

@@ -1 +0,0 @@
Remove redundant `_PushHTTPChannel` test class.

View File

@@ -1 +0,0 @@
Small performance improvement around handling new local presence updates.

20
debian/changelog vendored
View File

@@ -1,24 +1,8 @@
matrix-synapse-py3 (1.32.2) stable; urgency=medium
matrix-synapse-py3 (1.31.0+nmu1) UNRELEASED; urgency=medium
* New synapse release 1.32.2.
-- Synapse Packaging team <packages@matrix.org> Wed, 22 Apr 2021 12:43:52 +0100
matrix-synapse-py3 (1.32.1) stable; urgency=medium
* New synapse release 1.32.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 21 Apr 2021 14:00:55 +0100
matrix-synapse-py3 (1.32.0) stable; urgency=medium
[ Dan Callahan ]
* Skip tests when DEB_BUILD_OPTIONS contains "nocheck".
[ Synapse Packaging team ]
* New synapse release 1.32.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 20 Apr 2021 14:28:39 +0100
-- Dan Callahan <danc@element.io> Mon, 12 Apr 2021 13:07:36 +0000
matrix-synapse-py3 (1.31.0) stable; urgency=medium

View File

@@ -96,48 +96,18 @@ for port in 8080 8081 8082; do
# Check script parameters
if [ $# -eq 1 ]; then
if [ $1 = "--no-rate-limit" ]; then
# messages rate limit
echo 'rc_messages_per_second: 1000' >> $DIR/etc/$port.config
echo 'rc_message_burst_count: 1000' >> $DIR/etc/$port.config
# Disable any rate limiting
ratelimiting=$(cat <<-RC
rc_message:
per_second: 1000
burst_count: 1000
rc_registration:
per_second: 1000
burst_count: 1000
rc_login:
address:
per_second: 1000
burst_count: 1000
account:
per_second: 1000
burst_count: 1000
failed_attempts:
per_second: 1000
burst_count: 1000
rc_admin_redaction:
per_second: 1000
burst_count: 1000
rc_joins:
local:
per_second: 1000
burst_count: 1000
remote:
per_second: 1000
burst_count: 1000
rc_3pid_validation:
per_second: 1000
burst_count: 1000
rc_invites:
per_room:
per_second: 1000
burst_count: 1000
per_user:
per_second: 1000
burst_count: 1000
RC
)
echo "${ratelimiting}" >> $DIR/etc/$port.config
# registration rate limit
printf 'rc_registration:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
# login rate limit
echo 'rc_login:' >> $DIR/etc/$port.config
printf ' address:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
printf ' account:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
printf ' failed_attempts:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config
fi
fi

View File

@@ -184,18 +184,18 @@ stderr_logfile_maxbytes=0
"""
NGINX_LOCATION_CONFIG_BLOCK = """
location ~* {endpoint} {{
location ~* {endpoint} {
proxy_pass {upstream};
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host;
}}
}
"""
NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
upstream {upstream_worker_type} {
{body}
}}
}
"""

View File

@@ -1175,6 +1175,69 @@ url_preview_accept_language:
#
#enable_registration: false
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#
# Note that this is not currently compatible with guest logins.
@@ -1369,91 +1432,6 @@ account_threepid_delegates:
#auto_join_rooms_for_guests: false
## Account Validity ##
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
# The currently available templates are:
#
# * account_renewed.html: Displayed to the user after they have successfully
# renewed their account.
#
# * account_previously_renewed.html: Displayed to the user if they attempt to
# renew their account with a token that is valid, but that has already
# been used. In this case the account is not renewed again.
#
# * invalid_token.html: Displayed to the user when they try to renew an account
# with an unknown or invalid renewal token.
#
# See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for
# default template contents.
#
# The file name of some of these templates can be configured below for legacy
# reasons.
#
#template_dir: "res/templates"
# A custom file name for the 'account_renewed.html' template.
#
# If not set, the file is assumed to be named "account_renewed.html".
#
#account_renewed_html_path: "account_renewed.html"
# A custom file name for the 'invalid_token.html' template.
#
# If not set, the file is assumed to be named "invalid_token.html".
#
#invalid_token_html_path: "invalid_token.html"
## Metrics ###
# Enable collection and rendering of performance metrics
@@ -1900,7 +1878,7 @@ saml2_config:
# sub-properties:
#
# module: The class name of a custom mapping module. Default is
# 'synapse.handlers.oidc.JinjaOidcMappingProvider'.
# 'synapse.handlers.oidc_handler.JinjaOidcMappingProvider'.
# See https://github.com/matrix-org/synapse/blob/master/docs/sso_mapping_providers.md#openid-mapping-providers
# for information on implementing a custom mapping provider.
#

View File

@@ -106,7 +106,7 @@ A custom mapping provider must specify the following methods:
Synapse has a built-in OpenID mapping provider if a custom provider isn't
specified in the config. It is located at
[`synapse.handlers.oidc.JinjaOidcMappingProvider`](../synapse/handlers/oidc.py).
[`synapse.handlers.oidc_handler.JinjaOidcMappingProvider`](../synapse/handlers/oidc_handler.py).
## SAML Mapping Providers
@@ -190,4 +190,4 @@ A custom mapping provider must specify the following methods:
Synapse has a built-in SAML mapping provider if a custom provider isn't
specified in the config. It is located at
[`synapse.handlers.saml.DefaultSamlMappingProvider`](../synapse/handlers/saml.py).
[`synapse.handlers.saml_handler.DefaultSamlMappingProvider`](../synapse/handlers/saml_handler.py).

View File

@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
This is used when a worker is shutting down.
#### FEDERATION_ACK (C)
Acknowledge receipt of some federation data
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.

View File

@@ -140,7 +140,7 @@ if __name__ == "__main__":
definitions = {}
for directory in args.directories:
for root, _, files in os.walk(directory):
for root, dirs, files in os.walk(directory):
for filename in files:
if filename.endswith(".py"):
filepath = os.path.join(root, filename)

View File

@@ -48,7 +48,7 @@ args = parser.parse_args()
for directory in args.directories:
for root, _, files in os.walk(directory):
for root, dirs, files in os.walk(directory):
for filename in files:
if filename.endswith(".py"):
filepath = os.path.join(root, filename)

View File

@@ -634,11 +634,8 @@ class Porter(object):
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
await self._setup_sequence(
"account_data_sequence",
("room_account_data", "room_tags_revisions", "account_data"),
)
await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
await self._setup_auth_chain_sequence()
# Step 3. Get tables.

View File

@@ -18,7 +18,8 @@ ignore =
# E203: whitespace before ':' (which is contrary to pep8?)
# E731: do not assign a lambda expression, use a def
# E501: Line too long (black enforces this for us)
ignore=W503,W504,E203,E731,E501
# B007: Subsection of the bugbear suite (TODO: add in remaining fixes)
ignore=W503,W504,E203,E731,E501,B007
[isort]
line_length = 88

View File

@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.32.2"
__version__ = "1.32.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import List, Optional, Tuple
import pymacaroons
from netaddr import IPAddress
from twisted.web.server import Request
import synapse.types
from synapse import event_auth
from synapse.api.auth_blocking import AuthBlocking
from synapse.api.constants import EventTypes, HistoryVisibility, Membership
@@ -35,14 +36,11 @@ from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing as opentracing
from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import Requester, StateMap, UserID, create_requester
from synapse.types import StateMap, UserID
from synapse.util.caches.lrucache import LruCache
from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -67,10 +65,9 @@ class Auth:
"""
FIXME: This class contains a mix of functions for authenticating users
of our client-server API and authenticating events added to room graphs.
The latter should be moved to synapse.handlers.event_auth.EventAuthHandler.
"""
def __init__(self, hs: "HomeServer"):
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -82,21 +79,19 @@ class Auth:
self._auth_blocking = AuthBlocking(self.hs)
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
self._account_validity = hs.config.account_validity
self._track_appservice_user_ips = hs.config.track_appservice_user_ips
self._macaroon_secret_key = hs.config.macaroon_secret_key
async def check_from_context(
self, room_version: str, event, context, do_sig_check=True
) -> None:
):
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events_by_id = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
auth_events = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
event_auth.check(
@@ -153,11 +148,17 @@ class Auth:
raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
async def check_host_in_room(self, room_id: str, host: str) -> bool:
async def check_host_in_room(self, room_id, host):
with Measure(self.clock, "check_host_in_room"):
return await self.store.is_host_joined(room_id, host)
latest_event_ids = await self.store.is_host_joined(room_id, host)
return latest_event_ids
def get_public_keys(self, invite_event: EventBase) -> List[Dict[str, Any]]:
def can_federate(self, event, auth_events):
creation_event = auth_events.get((EventTypes.Create, ""))
return creation_event.content.get("m.federate", True) is True
def get_public_keys(self, invite_event):
return event_auth.get_public_keys(invite_event)
async def get_user_by_req(
@@ -166,7 +167,7 @@ class Auth:
allow_guest: bool = False,
rights: str = "access",
allow_expired: bool = False,
) -> Requester:
) -> synapse.types.Requester:
"""Get a registered user's ID.
Args:
@@ -192,7 +193,7 @@ class Auth:
access_token = self.get_access_token_from_request(request)
user_id, app_service = await self._get_appservice_user_id(request)
if user_id and app_service:
if user_id:
if ip_addr and self._track_appservice_user_ips:
await self.store.insert_client_ip(
user_id=user_id,
@@ -202,7 +203,9 @@ class Auth:
device_id="dummy-device", # stubbed
)
requester = create_requester(user_id, app_service=app_service)
requester = synapse.types.create_requester(
user_id, app_service=app_service
)
request.requester = user_id
opentracing.set_tag("authenticated_entity", user_id)
@@ -219,7 +222,7 @@ class Auth:
shadow_banned = user_info.shadow_banned
# Deny the request if the user account has expired.
if self._account_validity_enabled and not allow_expired:
if self._account_validity.enabled and not allow_expired:
if await self.store.is_account_expired(
user_info.user_id, self.clock.time_msec()
):
@@ -245,7 +248,7 @@ class Auth:
errcode=Codes.GUEST_ACCESS_FORBIDDEN,
)
requester = create_requester(
requester = synapse.types.create_requester(
user_info.user_id,
token_id,
is_guest,
@@ -265,9 +268,7 @@ class Auth:
except KeyError:
raise MissingClientTokenError()
async def _get_appservice_user_id(
self, request: Request
) -> Tuple[Optional[str], Optional[ApplicationService]]:
async def _get_appservice_user_id(self, request):
app_service = self.store.get_app_service_by_token(
self.get_access_token_from_request(request)
)
@@ -279,9 +280,6 @@ class Auth:
if ip_address not in app_service.ip_range_whitelist:
return None, None
# This will always be set by the time Twisted calls us.
assert request.args is not None
if b"user_id" not in request.args:
return app_service.sender, app_service
@@ -386,9 +384,7 @@ class Auth:
logger.warning("Invalid macaroon in auth: %s %s", type(e), e)
raise InvalidClientTokenError("Invalid macaroon passed.")
def _parse_and_validate_macaroon(
self, token: str, rights: str = "access"
) -> Tuple[str, bool]:
def _parse_and_validate_macaroon(self, token, rights="access"):
"""Takes a macaroon and tries to parse and validate it. This is cached
if and only if rights == access and there isn't an expiry.
@@ -433,16 +429,15 @@ class Auth:
return user_id, guest
def validate_macaroon(
self, macaroon: pymacaroons.Macaroon, type_string: str, user_id: str
) -> None:
def validate_macaroon(self, macaroon, type_string, user_id):
"""
validate that a Macaroon is understood by and was signed by this server.
Args:
macaroon: The macaroon to validate
type_string: The kind of token required (e.g. "access", "delete_pusher")
user_id: The user_id required
macaroon(pymacaroons.Macaroon): The macaroon to validate
type_string(str): The kind of token required (e.g. "access",
"delete_pusher")
user_id (str): The user_id required
"""
v = pymacaroons.Verifier()
@@ -467,7 +462,9 @@ class Auth:
if not service:
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
request.requester = create_requester(service.sender, app_service=service)
request.requester = synapse.types.create_requester(
service.sender, app_service=service
)
return service
async def is_server_admin(self, user: UserID) -> bool:
@@ -519,7 +516,7 @@ class Auth:
return auth_ids
async def check_can_change_room_list(self, room_id: str, user: UserID) -> bool:
async def check_can_change_room_list(self, room_id: str, user: UserID):
"""Determine whether the user is allowed to edit the room's entry in the
published room list.
@@ -554,11 +551,11 @@ class Auth:
return user_level >= send_level
@staticmethod
def has_access_token(request: Request) -> bool:
def has_access_token(request: Request):
"""Checks if the request has an access_token.
Returns:
False if no access_token was given, True otherwise.
bool: False if no access_token was given, True otherwise.
"""
# This will always be set by the time Twisted calls us.
assert request.args is not None
@@ -568,13 +565,13 @@ class Auth:
return bool(query_params) or bool(auth_headers)
@staticmethod
def get_access_token_from_request(request: Request) -> str:
def get_access_token_from_request(request: Request):
"""Extracts the access_token from the request.
Args:
request: The http request.
Returns:
The access_token
unicode: The access_token
Raises:
MissingClientTokenError: If there isn't a single access_token in the
request
@@ -649,5 +646,5 @@ class Auth:
% (user_id, room_id),
)
async def check_auth_blocking(self, *args, **kwargs) -> None:
await self._auth_blocking.check_auth_blocking(*args, **kwargs)
def check_auth_blocking(self, *args, **kwargs):
return self._auth_blocking.check_auth_blocking(*args, **kwargs)

View File

@@ -13,21 +13,18 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Optional
from typing import Optional
from synapse.api.constants import LimitBlockingTypes, UserTypes
from synapse.api.errors import Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
from synapse.types import Requester
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class AuthBlocking:
def __init__(self, hs: "HomeServer"):
def __init__(self, hs):
self.store = hs.get_datastore()
self._server_notices_mxid = hs.config.server_notices_mxid
@@ -46,7 +43,7 @@ class AuthBlocking:
threepid: Optional[dict] = None,
user_type: Optional[str] = None,
requester: Optional[Requester] = None,
) -> None:
):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag

View File

@@ -17,9 +17,6 @@
"""Contains constants from the specification."""
# the max size of a (canonical-json-encoded) event
MAX_PDU_SIZE = 65536
# the "depth" field on events is limited to 2**63 - 1
MAX_DEPTH = 2 ** 63 - 1

View File

@@ -30,10 +30,9 @@ from twisted.internet import defer, error, reactor
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
from synapse.api.constants import MAX_PDU_SIZE
from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config.homeserver import HomeServerConfig
from synapse.config.server import ListenerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -289,7 +288,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")
async def start(hs: "synapse.server.HomeServer"):
async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
"""
Start a Synapse server or worker.
@@ -301,6 +300,7 @@ async def start(hs: "synapse.server.HomeServer"):
Args:
hs: homeserver instance
listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
@@ -336,7 +336,7 @@ async def start(hs: "synapse.server.HomeServer"):
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa
# It is now safe to start your Synapse.
hs.start_listening()
hs.start_listening(listeners)
hs.get_datastore().db_pool.start_profiling()
hs.get_pusherpool().start()
@@ -530,25 +530,3 @@ def sdnotify(state):
# this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
# unless systemd is expecting us to notify it.
logger.warning("Unable to send notification to systemd: %s", e)
def max_request_body_size(config: HomeServerConfig) -> int:
"""Get a suitable maximum size for incoming HTTP requests"""
# Other than media uploads, the biggest request we expect to see is a fully-loaded
# /federation/v1/send request.
#
# The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are
# limited to 65536 bytes (possibly slightly more if the sender didn't use canonical
# json encoding); there is no specced limit to EDUs (see
# https://github.com/matrix-org/matrix-doc/issues/3121).
#
# in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M)
#
max_request_size = 200 * MAX_PDU_SIZE
# if we have a media repo enabled, we may need to allow larger uploads than that
if config.media.can_load_media_repo:
max_request_size = max(max_request_size, config.media.max_upload_size)
return max_request_size

View File

@@ -70,6 +70,12 @@ class AdminCmdSlavedStore(
class AdminCmdServer(HomeServer):
DATASTORE_CLASS = AdminCmdSlavedStore
def _listen_http(self, listener_config):
pass
def start_listening(self, listeners):
pass
async def export_data_command(hs, args):
"""Export data for a user.
@@ -226,7 +232,7 @@ def start(config_options):
async def run():
with LoggingContext("command"):
_base.start(ss)
_base.start(ss, [])
await args.func(ss, args)
_base.start_worker_reactor(

View File

@@ -15,7 +15,7 @@
# limitations under the License.
import logging
import sys
from typing import Dict, Optional
from typing import Dict, Iterable, Optional
from twisted.internet import address
from twisted.web.resource import IResource
@@ -32,7 +32,7 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
from synapse.app._base import max_request_body_size, register_start
from synapse.app._base import register_start
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -55,6 +55,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -63,7 +64,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events, login, presence, room
from synapse.rest.client.v1 import events, login, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
@@ -109,7 +110,6 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
@@ -121,6 +121,26 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.generic_worker")
class PresenceStatusStubServlet(RestServlet):
"""If presence is disabled this servlet can be used to stub out setting
presence status.
"""
PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
async def on_GET(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {"presence": "offline"}
async def on_PUT(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {}
class KeyUploadServlet(RestServlet):
"""An implementation of the `KeyUploadServlet` that responds to read only
requests, but otherwise proxies through to the master instance.
@@ -221,7 +241,6 @@ class GenericWorkerSlavedStore(
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
@@ -240,6 +259,7 @@ class GenericWorkerSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedPresenceStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
@@ -307,7 +327,10 @@ class GenericWorkerServer(HomeServer):
user_directory.register_servlets(self, resource)
presence.register_servlets(self, resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
groups.register_servlets(self, resource)
@@ -367,16 +390,14 @@ class GenericWorkerServer(HomeServer):
listener_config,
root_resource,
self.version_string,
max_request_body_size=max_request_body_size(self.config),
reactor=self.get_reactor(),
),
reactor=self.get_reactor(),
)
logger.info("Synapse worker now listening on port %d", port)
def start_listening(self):
for listener in self.config.worker_listeners:
def start_listening(self, listeners: Iterable[ListenerConfig]):
for listener in listeners:
if listener.type == "http":
self._listen_http(listener)
elif listener.type == "manhole":
@@ -469,7 +490,7 @@ def start(config_options):
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
register_start(_base.start, hs)
register_start(_base.start, hs, config.worker_listeners)
_base.start_worker_reactor("synapse-generic-worker", config)

View File

@@ -17,7 +17,7 @@
import logging
import os
import sys
from typing import Iterator
from typing import Iterable, Iterator
from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
@@ -36,13 +36,7 @@ from synapse.api.urls import (
WEB_CLIENT_PREFIX,
)
from synapse.app import _base
from synapse.app._base import (
listen_ssl,
listen_tcp,
max_request_body_size,
quit_with_error,
register_start,
)
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
from synapse.config._base import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
@@ -132,21 +126,19 @@ class SynapseHomeServer(HomeServer):
else:
root_resource = OptionsResource()
site = SynapseSite(
"synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
site_tag,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
max_request_body_size=max_request_body_size(self.config),
reactor=self.get_reactor(),
)
root_resource = create_resource_tree(resources, root_resource)
if tls:
ports = listen_ssl(
bind_addresses,
port,
site,
SynapseSite(
"synapse.access.https.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
self.tls_server_context_factory,
reactor=self.get_reactor(),
)
@@ -156,7 +148,13 @@ class SynapseHomeServer(HomeServer):
ports = listen_tcp(
bind_addresses,
port,
site,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d", port)
@@ -275,14 +273,14 @@ class SynapseHomeServer(HomeServer):
return resources
def start_listening(self):
def start_listening(self, listeners: Iterable[ListenerConfig]):
if self.config.redis_enabled:
# If redis is enabled we connect via the replication command handler
# in the same way as the workers (since we're effectively a client
# rather than a server).
self.get_tcp_replication().start_replication(self)
for listener in self.config.server.listeners:
for listener in listeners:
if listener.type == "http":
self._listening_services.extend(
self._listener_http(self.config, listener)
@@ -414,7 +412,7 @@ def setup(config_options):
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
await _base.start(hs)
await _base.start(hs, config.listeners)
hs.get_datastore().db_pool.updates.start_doing_background_updates()

View File

@@ -1,22 +1,21 @@
from typing import Any, Iterable, List, Optional
from synapse.config import (
account_validity,
api,
appservice,
auth,
captcha,
cas,
consent,
consent_config,
database,
emailconfig,
experimental,
groups,
jwt,
jwt_config,
key,
logger,
metrics,
oidc,
oidc_config,
password_auth_providers,
push,
ratelimiting,
@@ -24,9 +23,9 @@ from synapse.config import (
registration,
repository,
room_directory,
saml2,
saml2_config,
server,
server_notices,
server_notices_config,
spam_checker,
sso,
stats,
@@ -60,16 +59,15 @@ class RootConfig:
captcha: captcha.CaptchaConfig
voip: voip.VoipConfig
registration: registration.RegistrationConfig
account_validity: account_validity.AccountValidityConfig
metrics: metrics.MetricsConfig
api: api.ApiConfig
appservice: appservice.AppServiceConfig
key: key.KeyConfig
saml2: saml2.SAML2Config
saml2: saml2_config.SAML2Config
cas: cas.CasConfig
sso: sso.SSOConfig
oidc: oidc.OIDCConfig
jwt: jwt.JWTConfig
oidc: oidc_config.OIDCConfig
jwt: jwt_config.JWTConfig
auth: auth.AuthConfig
email: emailconfig.EmailConfig
worker: workers.WorkerConfig
@@ -78,9 +76,9 @@ class RootConfig:
spamchecker: spam_checker.SpamCheckerConfig
groups: groups.GroupsConfig
userdirectory: user_directory.UserDirectoryConfig
consent: consent.ConsentConfig
consent: consent_config.ConsentConfig
stats: stats.StatsConfig
servernotices: server_notices.ServerNoticesConfig
servernotices: server_notices_config.ServerNoticesConfig
roomdirectory: room_directory.RoomDirectoryConfig
thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig
tracer: tracer.TracerConfig

View File

@@ -1,165 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config, ConfigError
class AccountValidityConfig(Config):
section = "account_validity"
def read_config(self, config, **kwargs):
account_validity_config = config.get("account_validity") or {}
self.account_validity_enabled = account_validity_config.get("enabled", False)
self.account_validity_renew_by_email_enabled = (
"renew_at" in account_validity_config
)
if self.account_validity_enabled:
if "period" in account_validity_config:
self.account_validity_period = self.parse_duration(
account_validity_config["period"]
)
else:
raise ConfigError("'period' is required when using account validity")
if "renew_at" in account_validity_config:
self.account_validity_renew_at = self.parse_duration(
account_validity_config["renew_at"]
)
if "renew_email_subject" in account_validity_config:
self.account_validity_renew_email_subject = account_validity_config[
"renew_email_subject"
]
else:
self.account_validity_renew_email_subject = "Renew your %(app)s account"
self.account_validity_startup_job_max_delta = (
self.account_validity_period * 10.0 / 100.0
)
if self.account_validity_renew_by_email_enabled:
if not self.public_baseurl:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
# Load account validity templates.
account_validity_template_dir = account_validity_config.get("template_dir")
account_renewed_template_filename = account_validity_config.get(
"account_renewed_html_path", "account_renewed.html"
)
invalid_token_template_filename = account_validity_config.get(
"invalid_token_html_path", "invalid_token.html"
)
# Read and store template content
(
self.account_validity_account_renewed_template,
self.account_validity_account_previously_renewed_template,
self.account_validity_invalid_token_template,
) = self.read_templates(
[
account_renewed_template_filename,
"account_previously_renewed.html",
invalid_token_template_filename,
],
account_validity_template_dir,
)
def generate_config_section(self, **kwargs):
return """\
## Account Validity ##
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
# The currently available templates are:
#
# * account_renewed.html: Displayed to the user after they have successfully
# renewed their account.
#
# * account_previously_renewed.html: Displayed to the user if they attempt to
# renew their account with a token that is valid, but that has already
# been used. In this case the account is not renewed again.
#
# * invalid_token.html: Displayed to the user when they try to renew an account
# with an unknown or invalid renewal token.
#
# See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for
# default template contents.
#
# The file name of some of these templates can be configured below for legacy
# reasons.
#
#template_dir: "res/templates"
# A custom file name for the 'account_renewed.html' template.
#
# If not set, the file is assumed to be named "account_renewed.html".
#
#account_renewed_html_path: "account_renewed.html"
# A custom file name for the 'invalid_token.html' template.
#
# If not set, the file is assumed to be named "invalid_token.html".
#
#invalid_token_html_path: "invalid_token.html"
"""

View File

@@ -299,7 +299,7 @@ class EmailConfig(Config):
"client_base_url", email_config.get("riot_base_url", None)
)
if self.account_validity_renew_by_email_enabled:
if self.account_validity.renew_by_email_enabled:
expiry_template_html = email_config.get(
"expiry_template_html", "notice_expiry.html"
)

View File

@@ -12,25 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import RootConfig
from .account_validity import AccountValidityConfig
from .api import ApiConfig
from .appservice import AppServiceConfig
from .auth import AuthConfig
from .cache import CacheConfig
from .captcha import CaptchaConfig
from .cas import CasConfig
from .consent import ConsentConfig
from .consent_config import ConsentConfig
from .database import DatabaseConfig
from .emailconfig import EmailConfig
from .experimental import ExperimentalConfig
from .federation import FederationConfig
from .groups import GroupsConfig
from .jwt import JWTConfig
from .jwt_config import JWTConfig
from .key import KeyConfig
from .logger import LoggingConfig
from .metrics import MetricsConfig
from .oidc import OIDCConfig
from .oidc_config import OIDCConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .push import PushConfig
from .ratelimiting import RatelimitConfig
@@ -39,9 +39,9 @@ from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
from .room import RoomConfig
from .room_directory import RoomDirectoryConfig
from .saml2 import SAML2Config
from .saml2_config import SAML2Config
from .server import ServerConfig
from .server_notices import ServerNoticesConfig
from .server_notices_config import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
from .sso import SSOConfig
from .stats import StatsConfig
@@ -68,7 +68,6 @@ class HomeServerConfig(RootConfig):
CaptchaConfig,
VoipConfig,
RegistrationConfig,
AccountValidityConfig,
MetricsConfig,
ApiConfig,
AppServiceConfig,

View File

@@ -31,6 +31,7 @@ from twisted.logger import (
)
import synapse
from synapse.app import _base as appbase
from synapse.logging._structured import setup_structured_logging
from synapse.logging.context import LoggingContextFilter
from synapse.logging.filter import MetadataFilter
@@ -317,8 +318,6 @@ def setup_logging(
# Perform one-time logging configuration.
_setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
# Add a SIGHUP handler to reload the logging configuration, if one is available.
from synapse.app import _base as appbase
appbase.register_sighup(_reload_logging_config, log_config_path)
# Log immediately so we can grep backwards.

View File

@@ -14,23 +14,20 @@
# limitations under the License.
from collections import Counter
from typing import Collection, Iterable, List, Mapping, Optional, Tuple, Type
from typing import Iterable, List, Mapping, Optional, Tuple, Type
import attr
from synapse.config._util import validate_config
from synapse.config.sso import SsoAttributeRequirement
from synapse.python_dependencies import DependencyException, check_requirements
from synapse.types import JsonDict
from synapse.types import Collection, JsonDict
from synapse.util.module_loader import load_module
from synapse.util.stringutils import parse_and_validate_mxc_uri
from ._base import Config, ConfigError, read_file
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc.JinjaOidcMappingProvider"
# The module that JinjaOidcMappingProvider is in was renamed, we want to
# transparently handle both the same.
LEGACY_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider"
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider"
class OIDCConfig(Config):
@@ -406,8 +403,6 @@ def _parse_oidc_config_dict(
"""
ump_config = oidc_config.get("user_mapping_provider", {})
ump_config.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
if ump_config.get("module") == LEGACY_USER_MAPPING_PROVIDER:
ump_config["module"] = DEFAULT_USER_MAPPING_PROVIDER
ump_config.setdefault("config", {})
(

View File

@@ -12,12 +12,74 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pkg_resources
from synapse.api.constants import RoomCreationPreset
from synapse.config._base import Config, ConfigError
from synapse.types import RoomAlias, UserID
from synapse.util.stringutils import random_string_with_symbols, strtobool
class AccountValidityConfig(Config):
section = "accountvalidity"
def __init__(self, config, synapse_config):
if config is None:
return
super().__init__()
self.enabled = config.get("enabled", False)
self.renew_by_email_enabled = "renew_at" in config
if self.enabled:
if "period" in config:
self.period = self.parse_duration(config["period"])
else:
raise ConfigError("'period' is required when using account validity")
if "renew_at" in config:
self.renew_at = self.parse_duration(config["renew_at"])
if "renew_email_subject" in config:
self.renew_email_subject = config["renew_email_subject"]
else:
self.renew_email_subject = "Renew your %(app)s account"
self.startup_job_max_delta = self.period * 10.0 / 100.0
if self.renew_by_email_enabled:
if "public_baseurl" not in synapse_config:
raise ConfigError("Can't send renewal emails without 'public_baseurl'")
template_dir = config.get("template_dir")
if not template_dir:
template_dir = pkg_resources.resource_filename("synapse", "res/templates")
if "account_renewed_html_path" in config:
file_path = os.path.join(template_dir, config["account_renewed_html_path"])
self.account_renewed_html_content = self.read_file(
file_path, "account_validity.account_renewed_html_path"
)
else:
self.account_renewed_html_content = (
"<html><body>Your account has been successfully renewed.</body><html>"
)
if "invalid_token_html_path" in config:
file_path = os.path.join(template_dir, config["invalid_token_html_path"])
self.invalid_token_html_content = self.read_file(
file_path, "account_validity.invalid_token_html_path"
)
else:
self.invalid_token_html_content = (
"<html><body>Invalid renewal token.</body><html>"
)
class RegistrationConfig(Config):
section = "registration"
@@ -30,6 +92,10 @@ class RegistrationConfig(Config):
str(config["disable_registration"])
)
self.account_validity = AccountValidityConfig(
config.get("account_validity") or {}, config
)
self.registrations_require_3pid = config.get("registrations_require_3pid", [])
self.allowed_local_3pids = config.get("allowed_local_3pids", [])
self.enable_3pid_lookup = config.get("enable_3pid_lookup", True)
@@ -141,6 +207,69 @@ class RegistrationConfig(Config):
#
#enable_registration: false
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
# This means that, if a validity period is set, and Synapse is restarted (it will
# then derive an expiration date from the current validity period), and some time
# after that the validity period changes and Synapse is restarted, the users'
# expiration dates won't be updated unless their account is manually renewed. This
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10%% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %%(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#
# Note that this is not currently compatible with guest logins.

View File

@@ -25,10 +25,7 @@ from ._util import validate_config
logger = logging.getLogger(__name__)
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.saml.DefaultSamlMappingProvider"
# The module that DefaultSamlMappingProvider is in was renamed, we want to
# transparently handle both the same.
LEGACY_USER_MAPPING_PROVIDER = (
DEFAULT_USER_MAPPING_PROVIDER = (
"synapse.handlers.saml_handler.DefaultSamlMappingProvider"
)
@@ -100,8 +97,6 @@ class SAML2Config(Config):
# Use the default user mapping provider if not set
ump_dict.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
if ump_dict.get("module") == LEGACY_USER_MAPPING_PROVIDER:
ump_dict["module"] = DEFAULT_USER_MAPPING_PROVIDER
# Ensure a config is present
ump_dict["config"] = ump_dict.get("config") or {}

View File

@@ -235,11 +235,7 @@ class ServerConfig(Config):
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl")
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
# Whether to enable user presence.
presence_config = config.get("presence") or {}
@@ -411,6 +407,10 @@ class ServerConfig(Config):
config_path=("federation_ip_range_blacklist",),
)
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
# sending out any replication updates.

View File

@@ -64,14 +64,6 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream. Currently
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
"""
events = attr.ib(
@@ -93,11 +85,6 @@ class WriterLocations:
type=List[str],
converter=_instance_to_list_converter,
)
presence = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
class WorkerConfig(Config):
@@ -201,14 +188,7 @@ class WorkerConfig(Config):
# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in (
"events",
"typing",
"to_device",
"account_data",
"receipts",
"presence",
):
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
@@ -235,11 +215,6 @@ class WorkerConfig(Config):
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")
if len(self.writers.presence) != 1:
raise ConfigError(
"Must only specify one instance to handle `presence` messages."
)
self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)

View File

@@ -14,14 +14,14 @@
# limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import List, Optional, Set, Tuple
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
@@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None:
too_big("type")
if len(event.event_id) > 255:
too_big("event_id")
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
if len(encode_canonical_json(event.get_pdu_json())) > 65536:
too_big("event")
@@ -670,7 +670,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
public_key = public_key_object["public_key"]
try:
for server, signature_block in signed["signatures"].items():
for key_name in signature_block.keys():
for key_name, encoded_signature in signature_block.items():
if not key_name.startswith("ed25519:"):
continue
verify_key = decode_verify_key_bytes(
@@ -688,7 +688,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
return False
def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]:
def get_public_keys(invite_event):
public_keys = []
if "public_key" in invite_event.content:
o = {"public_key": invite_event.content["public_key"]}

View File

@@ -15,11 +15,12 @@
import inspect
import logging
from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
from synapse.spam_checker_api import RegistrationBehaviour
from synapse.types import Collection
from synapse.util.async_helpers import maybe_awaitable
if TYPE_CHECKING:

View File

@@ -1,510 +0,0 @@
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A federation sender that forwards things to be sent across replication to
a worker process.
It assumes there is a single worker process feeding off of it.
Each row in the replication stream consists of a type and some json, where the
types indicate whether they are presence, or edus, etc.
Ephemeral or non-event data are queued up in-memory. When the worker requests
updates since a particular point, all in-memory data since before that point is
dropped. We also expire things in the queue after 5 minutes, to ensure that a
dead worker doesn't cause the queues to grow limitlessly.
Events are replicated via a separate events stream.
"""
import logging
from collections import namedtuple
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
List,
Optional,
Sized,
Tuple,
Type,
)
from sortedcontainers import SortedDict
from synapse.api.presence import UserPresenceState
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure
from .units import Edu
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
self._sender_positions = {} # type: Dict[str, int]
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
self.presence_destinations = (
SortedDict()
) # type: SortedDict[int, Tuple[str, Iterable[str]]]
# (destination, key) -> EDU
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
# stream position -> (destination, key)
self.keyed_edu_changed = (
SortedDict()
) # type: SortedDict[int, Tuple[str, tuple]]
self.edus = SortedDict() # type: SortedDict[int, Edu]
# stream ID for the next entry into keyed_edu_changed/edus.
self.pos = 1
# map from stream ID to the time that stream entry was generated, so that we
# can clear out entries after a while
self.pos_time = SortedDict() # type: SortedDict[int, int]
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
"synapse_federation_send_queue_%s_size" % (queue_name,),
"",
[],
lambda: len(queue),
)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)
def _next_pos(self) -> int:
pos = self.pos
self.pos += 1
self.pos_time[self.clock.time_msec()] = pos
return pos
def _clear_queue(self) -> None:
"""Clear the queues for anything older than N minutes"""
FIVE_MINUTES_AGO = 5 * 60 * 1000
now = self.clock.time_msec()
keys = self.pos_time.keys()
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
position_to_delete = max(keys[:time])
for key in keys[:time]:
del self.pos_time[key]
self._clear_queue_before_pos(position_to_delete)
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]
user_ids = {user_id for user_id, _ in self.presence_destinations.values()}
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
for user_id in to_del:
del self.presence_map[user_id]
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
live_keys = set()
for edu_key in self.keyed_edu_changed.values():
live_keys.add(edu_key)
keys_to_del = [
edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
]
for edu_key in keys_to_del:
del self.keyed_edu[edu_key]
# Delete things out of edu map
keys = self.edus.keys()
i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""As per FederationSender"""
# This should never get called.
raise NotImplementedError()
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[Hashable] = None,
) -> None:
"""As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
pos = self._next_pos()
edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)
if key:
assert isinstance(key, tuple)
self.keyed_edu[(destination, key)] = edu
self.keyed_edu_changed[pos] = (destination, key)
else:
self.edus[pos] = edu
self.notifier.on_new_replication_data()
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""As per FederationSender
Args:
receipt:
"""
# nothing to do here: the replication listener will handle it.
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender
Args:
states
destinations
"""
for state in states:
pos = self._next_pos()
self.presence_map.update({state.user_id: state for state in states})
self.presence_destinations[pos] = (state.user_id, destinations)
self.notifier.on_new_replication_data()
def send_device_messages(self, destination: str) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
def wake_destination(self, server: str) -> None:
pass
def get_current_token(self) -> int:
return self.pos - 1
def federation_ack(self, instance_name: str, token: int) -> None:
if self._sender_instances:
# If we have configured multiple federation sender instances we need
# to track their positions separately, and only clear the queue up
# to the token all instances have acked.
self._sender_positions[instance_name] = token
token = min(self._sender_positions.values())
self._clear_queue_before_pos(token)
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
"""Get rows to be sent over federation between the two tokens
Args:
instance_name: the name of the current process
from_token: the previous stream token: the starting point for fetching the
updates
to_token: the new stream token: the point to get updates up to
target_row_count: a target for the number of rows to be returned.
Returns: a triplet `(updates, new_last_token, limited)`, where:
* `updates` is a list of `(token, row)` entries.
* `new_last_token` is the new position in stream.
* `limited` is whether there are more updates to fetch.
"""
# TODO: Handle target_row_count.
# To handle restarts where we wrap around
if from_token > self.pos:
from_token = -1
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
rows = [] # type: List[Tuple[int, BaseFederationRow]]
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
rows.append(
(
pos,
PresenceDestinationsRow(
state=self.presence_map[user_id], destinations=list(dests)
),
)
)
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in keyed_edus.items():
rows.append(
(
pos,
KeyedEduRow(
key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
),
)
)
# Fetch changed edus
i = self.edus.bisect_right(from_token)
j = self.edus.bisect_right(to_token) + 1
edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Sort rows based on pos
rows.sort()
return (
[(pos, (row.TypeId, row.to_data())) for pos, row in rows],
to_token,
False,
)
class BaseFederationRow:
"""Base class for rows to be sent in the federation stream.
Specifies how to identify, serialize and deserialize the different types.
"""
TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
@staticmethod
def from_data(data):
"""Parse the data from the federation stream into a row.
Args:
data: The value of ``data`` from FederationStreamRow.data, type
depends on the type of stream
"""
raise NotImplementedError()
def to_data(self):
"""Serialize this row to be sent over the federation stream.
Returns:
The value to be sent in FederationStreamRow.data. The type depends
on the type of stream.
"""
raise NotImplementedError()
def add_to_buffer(self, buff):
"""Add this row to the appropriate field in the buffer ready for this
to be sent over federation.
We use a buffer so that we can batch up events that have come in at
the same time and send them all at once.
Args:
buff (BufferedToSend)
"""
raise NotImplementedError()
class PresenceDestinationsRow(
BaseFederationRow,
namedtuple(
"PresenceDestinationsRow",
("state", "destinations"), # UserPresenceState # list[str]
),
):
TypeId = "pd"
@staticmethod
def from_data(data):
return PresenceDestinationsRow(
state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
)
def to_data(self):
return {"state": self.state.as_dict(), "dests": self.destinations}
def add_to_buffer(self, buff):
buff.presence_destinations.append((self.state, self.destinations))
class KeyedEduRow(
BaseFederationRow,
namedtuple(
"KeyedEduRow",
("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
),
):
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""
TypeId = "k"
@staticmethod
def from_data(data):
return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
def to_data(self):
return {"key": self.key, "edu": self.edu.get_internal_dict()}
def add_to_buffer(self, buff):
buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
"""Streams EDUs that don't have keys. See KeyedEduRow"""
TypeId = "e"
@staticmethod
def from_data(data):
return EduRow(Edu(**data))
def to_data(self):
return self.edu.get_internal_dict()
def add_to_buffer(self, buff):
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
_rowtypes = (
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
) # type: Tuple[Type[BaseFederationRow], ...]
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
ParsedFederationStreamData = namedtuple(
"ParsedFederationStreamData",
(
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
),
)
def process_rows_for_federation(
transaction_queue: FederationSender,
rows: List[FederationStream.FederationStreamRow],
) -> None:
"""Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args:
transaction_queue
rows
"""
# The federation stream contains a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
presence_destinations=[],
keyed_edus={},
edus={},
)
# Parse the rows in the stream and add to the buffer
for row in rows:
if row.type not in TypeToRow:
logger.error("Unrecognized federation row type %r", row.type)
continue
RowType = TypeToRow[row.type]
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
)
for edu_map in buff.keyed_edus.values():
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
for edu_list in buff.edus.values():
for edu in edu_list:
transaction_queue.send_edu(edu, None)

View File

@@ -14,17 +14,7 @@
import abc
import logging
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Hashable,
Iterable,
List,
Optional,
Set,
Tuple,
)
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter
@@ -41,7 +31,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -133,10 +123,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
def get_current_token(self) -> int:
raise NotImplementedError()
@abc.abstractmethod
def federation_ack(self, instance_name: str, token: int) -> None:
raise NotImplementedError()
@abc.abstractmethod
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
@@ -549,10 +535,6 @@ class FederationSender(AbstractFederationSender):
# No-op if presence is disabled.
return
# Ensure we only send out presence states for local users.
for state in states:
assert self.is_mine_id(state.user_id)
for destination in destinations:
if destination == self.server_name:
continue
@@ -649,10 +631,6 @@ class FederationSender(AbstractFederationSender):
# to a worker.
return 0
def federation_ack(self, instance_name: str, token: int) -> None:
# It is not expected that this gets called on FederationSender.
raise NotImplementedError()
@staticmethod
async def get_replication_rows(
instance_name: str, from_token: int, to_token: int, target_row_count: int

View File

@@ -17,7 +17,7 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional
from synapse.api.errors import StoreError, SynapseError
from synapse.logging.context import make_deferred_yieldable
@@ -39,44 +39,28 @@ class AccountValidityHandler:
self.sendmail = self.hs.get_sendmail()
self.clock = self.hs.get_clock()
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
self._account_validity_renew_by_email_enabled = (
hs.config.account_validity.account_validity_renew_by_email_enabled
)
self._account_validity_period = None
if self._account_validity_enabled:
self._account_validity_period = (
hs.config.account_validity.account_validity_period
)
self._account_validity = self.hs.config.account_validity
if (
self._account_validity_enabled
and self._account_validity_renew_by_email_enabled
self._account_validity.enabled
and self._account_validity.renew_by_email_enabled
):
# Don't do email-specific configuration if renewal by email is disabled.
self._template_html = (
hs.config.account_validity.account_validity_template_html
)
self._template_text = (
hs.config.account_validity.account_validity_template_text
)
account_validity_renew_email_subject = (
hs.config.account_validity.account_validity_renew_email_subject
)
self._template_html = self.config.account_validity_template_html
self._template_text = self.config.account_validity_template_text
try:
app_name = hs.config.email_app_name
app_name = self.hs.config.email_app_name
self._subject = account_validity_renew_email_subject % {"app": app_name}
self._subject = self._account_validity.renew_email_subject % {
"app": app_name
}
self._from_string = hs.config.email_notif_from % {"app": app_name}
self._from_string = self.hs.config.email_notif_from % {"app": app_name}
except Exception:
# If substitution failed, fall back to the bare strings.
self._subject = account_validity_renew_email_subject
self._from_string = hs.config.email_notif_from
self._subject = self._account_validity.renew_email_subject
self._from_string = self.hs.config.email_notif_from
self._raw_from = email.utils.parseaddr(self._from_string)[1]
@@ -236,87 +220,50 @@ class AccountValidityHandler:
attempts += 1
raise StoreError(500, "Couldn't generate a unique string as refresh string.")
async def renew_account(self, renewal_token: str) -> Tuple[bool, bool, int]:
async def renew_account(self, renewal_token: str) -> bool:
"""Renews the account attached to a given renewal token by pushing back the
expiration date by the current validity period in the server's configuration.
If it turns out that the token is valid but has already been used, then the
token is considered stale. A token is stale if the 'token_used_ts_ms' db column
is non-null.
Args:
renewal_token: Token sent with the renewal request.
Returns:
A tuple containing:
* A bool representing whether the token is valid and unused.
* A bool which is `True` if the token is valid, but stale.
* An int representing the user's expiry timestamp as milliseconds since the
epoch, or 0 if the token was invalid.
Whether the provided token is valid.
"""
try:
(
user_id,
current_expiration_ts,
token_used_ts,
) = await self.store.get_user_from_renewal_token(renewal_token)
user_id = await self.store.get_user_from_renewal_token(renewal_token)
except StoreError:
return False, False, 0
# Check whether this token has already been used.
if token_used_ts:
logger.info(
"User '%s' attempted to use previously used token '%s' to renew account",
user_id,
renewal_token,
)
return False, True, current_expiration_ts
return False
logger.debug("Renewing an account for user %s", user_id)
await self.renew_account_for_user(user_id)
# Renew the account. Pass the renewal_token here so that it is not cleared.
# We want to keep the token around in case the user attempts to renew their
# account with the same token twice (clicking the email link twice).
#
# In that case, the token will be accepted, but the account's expiration ts
# will remain unchanged.
new_expiration_ts = await self.renew_account_for_user(
user_id, renewal_token=renewal_token
)
return True, False, new_expiration_ts
return True
async def renew_account_for_user(
self,
user_id: str,
expiration_ts: Optional[int] = None,
email_sent: bool = False,
renewal_token: Optional[str] = None,
) -> int:
"""Renews the account attached to a given user by pushing back the
expiration date by the current validity period in the server's
configuration.
Args:
user_id: The ID of the user to renew.
renewal_token: Token sent with the renewal request.
expiration_ts: New expiration date. Defaults to now + validity period.
email_sent: Whether an email has been sent for this validity period.
renewal_token: Token sent with the renewal request. The user's token
will be cleared if this is None.
email_sen: Whether an email has been sent for this validity period.
Defaults to False.
Returns:
New expiration date for this account, as a timestamp in
milliseconds since epoch.
"""
now = self.clock.time_msec()
if expiration_ts is None:
expiration_ts = now + self._account_validity_period
expiration_ts = self.clock.time_msec() + self._account_validity.period
await self.store.set_account_validity_for_user(
user_id=user_id,
expiration_ts=expiration_ts,
email_sent=email_sent,
renewal_token=renewal_token,
token_used_ts=now,
user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
)
return expiration_ts

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from prometheus_client import Counter
@@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.metrics import Measure
if TYPE_CHECKING:

View File

@@ -1248,7 +1248,7 @@ class AuthHandler(BaseHandler):
# see if any of our auth providers want to know about this
for provider in self.password_providers:
for token, _, device_id in tokens_and_devices:
for token, token_id, device_id in tokens_and_devices:
await provider.on_logged_out(
user_id=user_id, device_id=device_id, access_token=token
)

View File

@@ -49,9 +49,7 @@ class DeactivateAccountHandler(BaseHandler):
if hs.config.run_background_tasks:
hs.get_reactor().callWhenRunning(self._start_user_parting)
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
self._account_validity_enabled = hs.config.account_validity.enabled
async def deactivate_account(
self,

View File

@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api import errors
from synapse.api.constants import EventTypes
@@ -28,6 +28,7 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
Collection,
JsonDict,
StreamToken,
UserID,
@@ -155,7 +156,8 @@ class DeviceWorkerHandler(BaseHandler):
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
for etype, state_key in current_state_ids.keys():
for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
@@ -177,7 +179,8 @@ class DeviceWorkerHandler(BaseHandler):
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for etype, state_key in current_state_ids.keys():
for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
@@ -195,7 +198,8 @@ class DeviceWorkerHandler(BaseHandler):
for state_dict in prev_state_ids.values():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for etype, state_key in current_state_ids.keys():
for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
@@ -480,7 +484,7 @@ class DeviceHandler(DeviceWorkerHandler):
"device_list_key", position, users=[user_id], rooms=room_ids
)
if hosts:
if hosts and self.federation_sender:
logger.info(
"Sending device list update notif for %r to: %r", user_id, hosts
)
@@ -710,7 +714,7 @@ class DeviceListUpdater:
# This can happen since we batch updates
return
for device_id, stream_id, prev_ids, _ in pending_updates:
for device_id, stream_id, prev_ids, content in pending_updates:
logger.debug(
"Handling update %r/%r, ID: %r, prev: %r ",
user_id,
@@ -736,7 +740,7 @@ class DeviceListUpdater:
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
for device_id, stream_id, _, content in pending_updates:
for device_id, stream_id, prev_ids, content in pending_updates:
await self.store.update_remote_device_list_cache_entry(
user_id, device_id, content, stream_id
)
@@ -925,10 +929,6 @@ class DeviceListUpdater:
else:
cached_devices = await self.store.get_cached_devices_for_user(user_id)
if cached_devices == {d["device_id"]: d for d in devices}:
logging.info(
"Skipping device list resync for %s, as our cache matches already",
user_id,
)
devices = []
ignore_devices = True
@@ -944,9 +944,6 @@ class DeviceListUpdater:
await self.store.update_remote_device_list_cache(
user_id, devices, stream_id
)
# mark the cache as valid, whether or not we actually processed any device
# list updates.
await self.store.mark_remote_user_device_cache_as_valid(user_id)
device_ids = [device["device_id"] for device in devices]
# Handle cross-signing keys.

View File

@@ -51,9 +51,7 @@ class DeviceMessageHandler:
# same instance. Other federation sender instances will get notified by
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
# in the to-device replication stream.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
self.federation_sender = hs.get_federation_sender()
# If we can handle the to device EDUs we do so, otherwise we route them
# to the appropriate worker.

View File

@@ -1,86 +0,0 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.room_versions import RoomVersion
from synapse.types import StateMap
if TYPE_CHECKING:
from synapse.server import HomeServer
class EventAuthHandler:
"""
This class contains methods for authenticating events added to room graphs.
"""
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
async def can_join_without_invite(
self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str
) -> bool:
"""
Check whether a user can join a room without an invite.
When joining a room with restricted joined rules (as defined in MSC3083),
the membership of spaces must be checked during join.
Args:
state_ids: The state of the room as it currently is.
room_version: The room version of the room being joined.
user_id: The user joining the room.
Returns:
True if the user can join the room, false otherwise.
"""
# This only applies to room versions which support the new join rule.
if not room_version.msc3083_join_rules:
return True
# If there's no join rule, then it defaults to invite (so this doesn't apply).
join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
if not join_rules_event_id:
return True
# If the join rule is not restricted, this doesn't apply.
join_rules_event = await self._store.get_event(join_rules_event_id)
if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED:
return True
# If allowed is of the wrong form, then only allow invited users.
allowed_spaces = join_rules_event.content.get("allow", [])
if not isinstance(allowed_spaces, list):
return False
# Get the list of joined rooms and see if there's an overlap.
joined_rooms = await self._store.get_rooms_for_user(user_id)
# Pull out the other room IDs, invalid data gets filtered.
for space in allowed_spaces:
if not isinstance(space, dict):
continue
space_id = space.get("space")
if not isinstance(space_id, str):
continue
# The user was joined to one of the spaces specified, they can join
# this room!
if space_id in joined_rooms:
return True
# The user was not in any of the required spaces.
return False

View File

@@ -146,7 +146,6 @@ class FederationHandler(BaseHandler):
self.is_mine_id = hs.is_mine_id
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._message_handler = hs.get_message_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
@@ -1674,40 +1673,8 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
# Calculate the event context.
context = await self.state_handler.compute_event_context(event)
# Get the state before the new event.
prev_state_ids = await context.get_prev_state_ids()
# Check if the user is already in the room or invited to the room.
user_id = event.state_key
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
newly_joined = True
user_is_invited = False
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
user_is_invited = prev_member_event.membership == Membership.INVITE
# If the member is not already in the room, and not invited, check if
# they should be allowed access via membership in a space.
if (
newly_joined
and not user_is_invited
and not await self._event_auth_handler.can_join_without_invite(
prev_state_ids,
event.room_version,
user_id,
)
):
raise AuthError(
403,
"You do not belong to any of the required spaces to join this room.",
)
# Persist the event.
await self._auth_and_persist_event(origin, event, context)
context = await self._auth_and_persist_event(origin, event, context)
logger.debug(
"on_send_join_request: After _auth_and_persist_event: %s, sigs: %s",
@@ -1715,6 +1682,8 @@ class FederationHandler(BaseHandler):
event.signatures,
)
prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
auth_chain = await self.store.get_auth_chain(event.room_id, state_ids)
@@ -2037,7 +2006,7 @@ class FederationHandler(BaseHandler):
state: Optional[Iterable[EventBase]] = None,
auth_events: Optional[MutableStateMap[EventBase]] = None,
backfilled: bool = False,
) -> None:
) -> EventContext:
"""
Process an event by performing auth checks and then persisting to the database.
@@ -2059,6 +2028,9 @@ class FederationHandler(BaseHandler):
event is an outlier), may be the auth events claimed by the remote
server.
backfilled: True if the event was backfilled.
Returns:
The event context.
"""
context = await self._check_event_auth(
origin,
@@ -2088,6 +2060,8 @@ class FederationHandler(BaseHandler):
)
raise
return context
async def _auth_and_persist_events(
self,
origin: str,
@@ -2982,7 +2956,7 @@ class FederationHandler(BaseHandler):
try:
# for each sig on the third_party_invite block of the actual invite
for server, signature_block in signed["signatures"].items():
for key_name in signature_block.keys():
for key_name, encoded_signature in signature_block.items():
if not key_name.startswith("ed25519:"):
continue

View File

@@ -15,6 +15,7 @@
# limitations under the License.
"""Utilities for interacting with Identity Servers"""
import logging
import urllib.parse
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
@@ -33,11 +34,7 @@ from synapse.http.site import SynapseRequest
from synapse.types import JsonDict, Requester
from synapse.util import json_decoder
from synapse.util.hash import sha256_and_url_safe_base64
from synapse.util.stringutils import (
assert_valid_client_secret,
random_string,
valid_id_server_location,
)
from synapse.util.stringutils import assert_valid_client_secret, random_string
from ._base import BaseHandler
@@ -175,11 +172,6 @@ class IdentityHandler(BaseHandler):
server with, if necessary. Required if use_v2 is true
use_v2: Whether to use v2 Identity Service API endpoints. Defaults to True
Raises:
SynapseError: On any of the following conditions
- the supplied id_server is not a valid identity server name
- we failed to contact the supplied identity server
Returns:
The response from the identity server
"""
@@ -189,12 +181,6 @@ class IdentityHandler(BaseHandler):
if id_access_token is None:
use_v2 = False
if not valid_id_server_location(id_server):
raise SynapseError(
400,
"id_server must be a valid hostname with optional port and path components",
)
# Decide which API endpoint URLs to use
headers = {}
bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
@@ -283,21 +269,12 @@ class IdentityHandler(BaseHandler):
id_server: Identity server to unbind from
Raises:
SynapseError: On any of the following conditions
- the supplied id_server is not a valid identity server name
- we failed to contact the supplied identity server
SynapseError: If we failed to contact the identity server
Returns:
True on success, otherwise False if the identity
server doesn't support unbinding
"""
if not valid_id_server_location(id_server):
raise SynapseError(
400,
"id_server must be a valid hostname with optional port and path components",
)
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")

View File

@@ -15,7 +15,7 @@
import inspect
import logging
from typing import TYPE_CHECKING, Dict, Generic, List, Optional, TypeVar, Union
from urllib.parse import urlencode, urlparse
from urllib.parse import urlencode
import attr
import pymacaroons
@@ -37,7 +37,10 @@ from twisted.web.client import readBody
from twisted.web.http_headers import Headers
from synapse.config import ConfigError
from synapse.config.oidc import OidcProviderClientSecretJwtKey, OidcProviderConfig
from synapse.config.oidc_config import (
OidcProviderClientSecretJwtKey,
OidcProviderConfig,
)
from synapse.handlers.sso import MappingException, UserAttributes
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
@@ -68,8 +71,8 @@ logger = logging.getLogger(__name__)
#
# Here we have the names of the cookies, and the options we use to set them.
_SESSION_COOKIES = [
(b"oidc_session", b"HttpOnly; Secure; SameSite=None"),
(b"oidc_session_no_samesite", b"HttpOnly"),
(b"oidc_session", b"Path=/_synapse/client/oidc; HttpOnly; Secure; SameSite=None"),
(b"oidc_session_no_samesite", b"Path=/_synapse/client/oidc; HttpOnly"),
]
#: A token exchanged from the token endpoint, as per RFC6749 sec 5.1. and
@@ -279,13 +282,6 @@ class OidcProvider:
self._config = provider
self._callback_url = hs.config.oidc_callback_url # type: str
# Calculate the prefix for OIDC callback paths based on the public_baseurl.
# We'll insert this into the Path= parameter of any session cookies we set.
public_baseurl_path = urlparse(hs.config.server.public_baseurl).path
self._callback_path_prefix = (
public_baseurl_path.encode("utf-8") + b"_synapse/client/oidc"
)
self._oidc_attribute_requirements = provider.attribute_requirements
self._scopes = provider.scopes
self._user_profile_method = provider.user_profile_method
@@ -786,13 +782,8 @@ class OidcProvider:
for cookie_name, options in _SESSION_COOKIES:
request.cookies.append(
b"%s=%s; Max-Age=3600; Path=%s; %s"
% (
cookie_name,
cookie.encode("utf-8"),
self._callback_path_prefix,
options,
)
b"%s=%s; Max-Age=3600; %s"
% (cookie_name, cookie.encode("utf-8"), options)
)
metadata = await self.load_metadata()
@@ -969,11 +960,6 @@ class OidcProvider:
# and attempt to match it.
attributes = await oidc_response_to_user_attributes(failures=0)
if attributes.localpart is None:
# If no localpart is returned then we will generate one, so
# there is no need to search for existing users.
return None
user_id = UserID(attributes.localpart, self._server_name).to_string()
users = await self._store.get_users_by_id_case_insensitive(user_id)
if users:

View File

@@ -28,7 +28,6 @@ from bisect import bisect
from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
Collection,
Dict,
FrozenSet,
Iterable,
@@ -58,8 +57,9 @@ from synapse.replication.http.presence import (
from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
@@ -121,21 +121,13 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and presence
writer"""
"""Parts of the PresenceHandler that are shared between workers and master"""
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.presence_router = hs.get_presence_router()
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
self._federation = None
if hs.should_send_federation():
self._federation = hs.get_federation_sender()
self._federation_queue = PresenceFederationQueue(hs, self)
self.federation_queue = PresenceFederationQueue(hs, self)
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
@@ -261,39 +253,14 @@ class BasePresenceHandler(abc.ABC):
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
"""Process streams received over replication."""
await self._federation_queue.process_replication_rows(
"""Process presence stream rows received over replication."""
await self.federation_queue.process_replication_rows(
stream_name, instance_name, token, rows
)
def get_federation_queue(self) -> "PresenceFederationQueue":
"""Get the presence federation queue."""
return self._federation_queue
async def maybe_send_presence_to_interested_destinations(
self, states: List[UserPresenceState]
):
"""If this instance is a federation sender, send the states to all
destinations that are interested. Filters out any states for remote
users.
"""
if not self._federation:
return
states = [s for s in states if self.is_mine_id(s.user_id)]
if not states:
return
hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
)
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
"""Get the presence federation queue, if any."""
return self.federation_queue
class _NullContextManager(ContextManager[None]):
@@ -307,16 +274,11 @@ class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
self._presence_writer_instance = hs.config.worker.writers.presence[0]
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
"m.presence",
hs.config.worker.writers.presence,
)
self.state = hs.get_state_handler()
# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
@@ -325,8 +287,10 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self._federation = hs.get_federation_sender()
# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
self.users_going_offline = {}
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
@@ -359,23 +323,22 @@ class WorkerPresenceHandler(BasePresenceHandler):
)
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
"""A user has started syncing. Send a UserSync to the master, unless they
had recently stopped syncing.
Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
# Safe to skip because we haven't yet told the presence writer they
# were offline
# Safe to skip because we haven't yet told the master they were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
def mark_as_going_offline(self, user_id):
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
"""A user has stopped syncing. We wait before notifying the master as
its likely they'll come back soon. This allows us to avoid sending
a stopped syncing immediately followed by a started syncing notification
to the master
Args:
user_id (str)
@@ -383,8 +346,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.users_going_offline[user_id] = self.clock.time_msec()
def send_stop_syncing(self):
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""Check if there are any users who have stopped syncing a while ago
and haven't come back yet. If there are poke the master about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
@@ -441,9 +404,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
users=users_to_states.keys(),
)
# If this is a federation sender, notify about presence updates.
await self.maybe_send_presence_to_interested_destinations(states)
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
@@ -471,6 +431,20 @@ class WorkerPresenceHandler(BasePresenceHandler):
stream_id = token
await self.notify_from_replication(states, stream_id)
# Handle poking the local federation sender, if there is one.
if not self._federation:
return
hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
user_id
@@ -500,12 +474,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not self.hs.config.use_presence:
return
# Proxy request to instance that writes presence
# Proxy request to master
await self._set_state_client(
instance_name=self._presence_writer_instance,
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
)
async def bump_presence_active_time(self, user):
@@ -516,22 +487,25 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not self.hs.config.use_presence:
return
# Proxy request to instance that writes presence
# Proxy request to master
user_id = user.to_string()
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
)
await self._bump_active_client(user_id=user_id)
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.server_name = hs.hostname
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
self.federation_sender = hs.get_federation_sender()
federation_registry = hs.get_federation_registry()
federation_registry.register_edu_handler("m.presence", self.incoming_presence)
@@ -736,13 +710,6 @@ class PresenceHandler(BasePresenceHandler):
self.unpersisted_users_changes |= {s.user_id for s in new_states}
self.unpersisted_users_changes -= set(to_notify.keys())
# Check if we need to resend any presence states to remote hosts. We
# only do this for states that haven't been updated in a while to
# ensure that the remote host doesn't time the presence state out.
#
# Note that since these are states that have *not* been updated,
# they won't get sent down the normal presence replication stream,
# and so we have to explicitly send them via the federation stream.
to_federation_ping = {
user_id: state
for user_id, state in to_federation_ping.items()
@@ -755,10 +722,11 @@ class PresenceHandler(BasePresenceHandler):
self.store,
self.presence_router,
list(to_federation_ping.values()),
self.state,
)
for destinations, states in hosts_and_states:
self._federation_queue.send_presence_to_destinations(
self.federation_queue.send_presence_to_destinations(
states, destinations
)
@@ -1000,10 +968,21 @@ class PresenceHandler(BasePresenceHandler):
users=[UserID.from_string(u) for u in users_to_states],
)
# We only want to poke the local federation sender, if any, as other
# workers will receive the presence updates via the presence replication
# stream (which is updated by `store.update_presence`).
await self.maybe_send_presence_to_interested_destinations(states)
# We only need to tell the local federation sender, if any, that new
# presence has happened. Other federation senders will get notified via
# the presence replication stream.
if not self.federation_sender:
return
hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
states,
self.state,
)
for destinations, states in hosts_and_states:
self.federation_sender.send_presence_to_destinations(states, destinations)
async def incoming_presence(self, origin, content):
"""Called when we receive a `m.presence` EDU from a remote server."""
@@ -1241,7 +1220,7 @@ class PresenceHandler(BasePresenceHandler):
# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self._federation_queue.send_presence_to_destinations(
self.federation_queue.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)
@@ -1381,6 +1360,7 @@ class PresenceEventSource:
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@log_function
async def get_new_events(
@@ -1849,6 +1829,7 @@ async def get_interested_remotes(
store: DataStore,
presence_router: PresenceRouter,
states: List[UserPresenceState],
state_handler: StateHandler,
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
"""Given a list of presence states figure out which remote servers
should be sent which.
@@ -1859,6 +1840,7 @@ async def get_interested_remotes(
store: The homeserver's data store.
presence_router: A module for augmenting the destinations for presence updates.
states: A list of incoming user presence updates.
state_handler:
Returns:
A list of 2-tuples of destinations and states, where for
@@ -1875,8 +1857,7 @@ async def get_interested_remotes(
)
for room_id, states in room_ids_to_states.items():
user_ids = await store.get_users_in_room(room_id)
hosts = {get_domain_from_id(user_id) for user_id in user_ids}
hosts = await state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))
for user_id, states in users_to_states.items():
@@ -1894,10 +1875,6 @@ class PresenceFederationQueue:
Only the last N minutes will be queued, so if a federation sender instance
is down for longer then some updates will be dropped. This is OK as presence
is ephemeral, and so it will self correct eventually.
On workers the class tracks the last received position of the stream from
replication, and handles querying for missed updates over HTTP replication,
c.f. `get_current_token` and `get_replication_rows`.
"""
# How long to keep entries in the queue for. Workers that are down for
@@ -1918,15 +1895,10 @@ class PresenceFederationQueue:
# another process may be handling federation sending.
self._queue_presence_updates = True
# Whether this instance is a presence writer.
self._presence_writer = self._instance_name in hs.config.worker.writers.presence
# The FederationSender instance, if this process sends federation traffic directly.
self._federation = None
if hs.should_send_federation():
self._federation = hs.get_federation_sender()
# The federation sender if this instance is a federation sender.
self._federation = hs.get_federation_sender()
if self._federation:
# We don't bother queuing up presence states if only this instance
# is sending federation.
if hs.config.worker.federation_shard_config.instances == [
@@ -1966,18 +1938,10 @@ class PresenceFederationQueue:
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).
Must only be called on the presence writer process.
"""
# This should only be called on a presence writer.
assert self._presence_writer
if self._federation:
self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
self._federation.send_presence_to_destinations(states, destinations)
if not self._queue_presence_updates:
return
@@ -1992,10 +1956,6 @@ class PresenceFederationQueue:
self._notifier.notify_replication()
def get_current_token(self, instance_name: str) -> int:
"""Get the current position of the stream.
On workers this returns the last stream ID received from replication.
"""
if instance_name == self._instance_name:
return self._next_id - 1
else:
@@ -2012,12 +1972,9 @@ class PresenceFederationQueue:
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).
On workers this will query the presence writer process via HTTP replication.
"""
if instance_name != self._instance_name:
# If not local we query over http replication from the presence
# writer
# If not local we query over replication.
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
@@ -2061,7 +2018,7 @@ class PresenceFederationQueue:
if stream_name != PresenceFederationStream.NAME:
return
# We keep track of the current tokens (so that we can catch up with anything we missed after a disconnect)
# We keep track of the current tokens
self._current_tokens[instance_name] = token
# If we're a federation sender we pull out the presence states to send
@@ -2075,7 +2032,4 @@ class PresenceFederationQueue:
for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
self._federation.send_presence_to_destinations(states.values(), [host])

View File

@@ -36,9 +36,7 @@ class ReceiptsHandler(BaseHandler):
# same instance. Other federation sender instances will get notified by
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
# in the receipts stream.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
self.federation_sender = hs.get_federation_sender()
# If we can handle the receipt EDUs we do so, otherwise we route them
# to the appropriate worker.

View File

@@ -19,7 +19,7 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse import types
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -28,6 +28,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
@@ -63,7 +64,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
self.member_linearizer = Linearizer(name="member")
@@ -178,6 +178,62 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id)
async def _can_join_without_invite(
self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str
) -> bool:
"""
Check whether a user can join a room without an invite.
When joining a room with restricted joined rules (as defined in MSC3083),
the membership of spaces must be checked during join.
Args:
state_ids: The state of the room as it currently is.
room_version: The room version of the room being joined.
user_id: The user joining the room.
Returns:
True if the user can join the room, false otherwise.
"""
# This only applies to room versions which support the new join rule.
if not room_version.msc3083_join_rules:
return True
# If there's no join rule, then it defaults to public (so this doesn't apply).
join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
if not join_rules_event_id:
return True
# If the join rule is not restricted, this doesn't apply.
join_rules_event = await self.store.get_event(join_rules_event_id)
if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED:
return True
# If allowed is of the wrong form, then only allow invited users.
allowed_spaces = join_rules_event.content.get("allow", [])
if not isinstance(allowed_spaces, list):
return False
# Get the list of joined rooms and see if there's an overlap.
joined_rooms = await self.store.get_rooms_for_user(user_id)
# Pull out the other room IDs, invalid data gets filtered.
for space in allowed_spaces:
if not isinstance(space, dict):
continue
space_id = space.get("space")
if not isinstance(space_id, str):
continue
# The user was joined to one of the spaces specified, they can join
# this room!
if space_id in joined_rooms:
return True
# The user was not in any of the required spaces.
return False
async def _local_membership_update(
self,
requester: Requester,
@@ -246,7 +302,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if (
newly_joined
and not user_is_invited
and not await self.event_auth_handler.can_join_without_invite(
and not await self._can_join_without_invite(
prev_state_ids, event.room_version, user_id
)
):

View File

@@ -18,7 +18,6 @@ from typing import (
Any,
Awaitable,
Callable,
Collection,
Dict,
Iterable,
List,
@@ -41,7 +40,7 @@ from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http import get_request_user_agent
from synapse.http.server import respond_with_html, respond_with_redirect
from synapse.http.site import SynapseRequest
from synapse.types import JsonDict, UserID, contains_invalid_mxid_characters
from synapse.types import Collection, JsonDict, UserID, contains_invalid_mxid_characters
from synapse.util.async_helpers import Linearizer
from synapse.util.stringutils import random_string

View File

@@ -14,17 +14,7 @@
# limitations under the License.
import itertools
import logging
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
FrozenSet,
List,
Optional,
Set,
Tuple,
)
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
import attr
from prometheus_client import Counter
@@ -38,6 +28,7 @@ from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
JsonDict,
MutableStateMap,
Requester,

View File

@@ -57,9 +57,7 @@ class FollowerTypingHandler:
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
self.federation = None
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
self.federation = hs.get_federation_sender()
if hs.config.worker.writers.typing != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(

View File

@@ -44,6 +44,7 @@ class UserDirectoryHandler(StateDeltasHandler):
super().__init__(hs)
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@@ -301,12 +302,10 @@ class UserDirectoryHandler(StateDeltasHandler):
# ignore the change
return
other_users_in_room_with_profiles = (
await self.store.get_users_in_room_with_profiles(room_id)
)
users_with_profile = await self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in other_users_in_room_with_profiles.keys():
for user_id in users_with_profile.keys():
await self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
@@ -315,7 +314,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# which when ran over an entire room, will result in the same values
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
for user_id, profile in other_users_in_room_with_profiles.items():
for user_id, profile in users_with_profile.items():
await self._handle_new_user(room_id, user_id, profile)
async def _handle_new_user(
@@ -337,7 +336,7 @@ class UserDirectoryHandler(StateDeltasHandler):
room_id
)
# Now we update users who share rooms with users.
other_users_in_room = await self.store.get_users_in_room(room_id)
users_with_profile = await self.state.get_current_users_in_room(room_id)
if is_public:
await self.store.add_users_in_public_rooms(room_id, (user_id,))
@@ -353,14 +352,14 @@ class UserDirectoryHandler(StateDeltasHandler):
# We don't care about appservice users.
if not is_appservice:
for other_user_id in other_users_in_room:
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue
to_insert.add((user_id, other_user_id))
# Next we need to update for every local user in the room
for other_user_id in other_users_in_room:
for other_user_id in users_with_profile:
if user_id == other_user_id:
continue

View File

@@ -33,7 +33,6 @@ import treq
from canonicaljson import encode_canonical_json
from netaddr import AddrFormatError, IPAddress, IPSet
from prometheus_client import Counter
from typing_extensions import Protocol
from zope.interface import implementer, provider
from OpenSSL import SSL
@@ -755,16 +754,6 @@ def _timeout_to_request_timed_out_error(f: Failure):
return f
class ByteWriteable(Protocol):
"""The type of object which must be passed into read_body_with_max_size.
Typically this is a file object.
"""
def write(self, data: bytes) -> int:
pass
class BodyExceededMaxSize(Exception):
"""The maximum allowed size of the HTTP body was exceeded."""
@@ -801,7 +790,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
transport = None # type: Optional[ITCPTransport]
def __init__(
self, stream: ByteWriteable, deferred: defer.Deferred, max_size: Optional[int]
self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
):
self.stream = stream
self.deferred = deferred
@@ -841,7 +830,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
def read_body_with_max_size(
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
response: IResponse, stream: BinaryIO, max_size: Optional[int]
) -> defer.Deferred:
"""
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.

View File

@@ -1,4 +1,5 @@
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,13 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
import codecs
import logging
import random
import sys
import typing
import urllib.parse
from io import BytesIO, StringIO
from io import BytesIO
from typing import Callable, Dict, List, Optional, Tuple, Union
import attr
@@ -73,9 +72,6 @@ incoming_responses_counter = Counter(
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
)
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
# need a generous limit here.
MAX_RESPONSE_SIZE = 100 * 1024 * 1024
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
@@ -171,27 +167,12 @@ async def _handle_json_response(
try:
check_content_type_is_json(response.headers)
buf = StringIO()
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
# Use the custom JSON decoder (partially re-implements treq.json_content).
d = treq.text_content(response, encoding="utf-8")
d.addCallback(json_decoder.decode)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
def parse(_len: int):
return json_decoder.decode(buf.getvalue())
d.addCallback(parse)
body = await make_deferred_yieldable(d)
except BodyExceededMaxSize as e:
# The response was too big.
logger.warning(
"{%s} [%s] JSON response exceeded max size %i - %s %s",
request.txn_id,
request.destination,
MAX_RESPONSE_SIZE,
request.method,
request.uri.decode("ascii"),
)
raise RequestSendFailed(e, can_retry=False) from e
except ValueError as e:
# The JSON content was invalid.
logger.warning(
@@ -237,18 +218,6 @@ async def _handle_json_response(
return body
class BinaryIOWrapper:
"""A wrapper for a TextIO which converts from bytes on the fly."""
def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
self.decoder = codecs.getincrementaldecoder(encoding)(errors)
self.file = file
def write(self, b: Union[bytes, bytearray]) -> int:
self.file.write(self.decoder.decode(b))
return len(b)
class MatrixFederationHttpClient:
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.

View File

@@ -14,14 +14,13 @@
import contextlib
import logging
import time
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, Type, Union
import attr
from zope.interface import implementer
from twisted.internet.interfaces import IAddress, IReactorTime
from twisted.internet.interfaces import IAddress
from twisted.python.failure import Failure
from twisted.web.resource import IResource
from twisted.web.server import Request, Site
from synapse.config.server import ListenerConfig
@@ -50,7 +49,6 @@ class SynapseRequest(Request):
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
* A limit to the size of request which will be accepted
It also provides a method `processing`, which returns a context manager. If this
method is called, the request won't be logged until the context manager is closed;
@@ -61,9 +59,8 @@ class SynapseRequest(Request):
logcontext: the log context for this request
"""
def __init__(self, channel, *args, max_request_body_size=1024, **kw):
def __init__(self, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self._max_request_body_size = max_request_body_size
self.site = channel.site # type: SynapseSite
self._channel = channel # this is used by the tests
self.start_time = 0.0
@@ -100,18 +97,6 @@ class SynapseRequest(Request):
self.site.site_tag,
)
def handleContentChunk(self, data):
# we should have a `content` by now.
assert self.content, "handleContentChunk() called before gotLength()"
if self.content.tell() + len(data) > self._max_request_body_size:
logger.warning(
"Aborting connection from %s because the request exceeds maximum size",
self.client,
)
self.transport.abortConnection()
return
super().handleContentChunk(data)
@property
def requester(self) -> Optional[Union[Requester, str]]:
return self._requester
@@ -500,55 +485,29 @@ class _XForwardedForAddress:
class SynapseSite(Site):
"""
Synapse-specific twisted http Site
This does two main things.
First, it replaces the requestFactory in use so that we build SynapseRequests
instead of regular t.w.server.Requests. All of the constructor params are really
just parameters for SynapseRequest.
Second, it inhibits the log() method called by Request.finish, since SynapseRequest
does its own logging.
Subclass of a twisted http Site that does access logging with python's
standard logging
"""
def __init__(
self,
logger_name: str,
site_tag: str,
logger_name,
site_tag,
config: ListenerConfig,
resource: IResource,
resource,
server_version_string,
max_request_body_size: int,
reactor: IReactorTime,
*args,
**kwargs,
):
"""
Args:
logger_name: The name of the logger to use for access logs.
site_tag: A tag to use for this site - mostly in access logs.
config: Configuration for the HTTP listener corresponding to this site
resource: The base of the resource tree to be used for serving requests on
this site
server_version_string: A string to present for the Server header
max_request_body_size: Maximum request body length to allow before
dropping the connection
reactor: reactor to be used to manage connection timeouts
"""
Site.__init__(self, resource, reactor=reactor)
Site.__init__(self, resource, *args, **kwargs)
self.site_tag = site_tag
assert config.http_options is not None
proxied = config.http_options.x_forwarded
request_class = XForwardedForRequest if proxied else SynapseRequest
def request_factory(channel, queued) -> Request:
return request_class(
channel, max_request_body_size=max_request_body_size, queued=queued
)
self.requestFactory = request_factory # type: ignore
self.requestFactory = (
XForwardedForRequest if proxied else SynapseRequest
) # type: Type[Request]
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii")

View File

@@ -226,11 +226,11 @@ class RemoteHandler(logging.Handler):
old_buffer = self._buffer
self._buffer = deque()
for _ in range(buffer_split):
for i in range(buffer_split):
self._buffer.append(old_buffer.popleft())
end_buffer = []
for _ in range(buffer_split):
for i in range(buffer_split):
end_buffer.append(old_buffer.pop())
self._buffer.extend(reversed(end_buffer))

View File

@@ -258,8 +258,7 @@ class LoggingContext:
child to the parent
Args:
name: Name for the context for logging. If this is omitted, it is
inherited from the parent context.
name (str): Name for the context for debugging.
parent_context (LoggingContext|None): The parent of the new context
"""
@@ -283,6 +282,7 @@ class LoggingContext:
request: Optional[ContextRequest] = None,
) -> None:
self.previous_context = current_context()
self.name = name
# track the resources used by this context so far
self._resource_usage = ContextResourceUsage()
@@ -314,17 +314,10 @@ class LoggingContext:
# the request param overrides the request from the parent context
self.request = request
# if we don't have a `name`, but do have a parent context, use its name.
if self.parent_context and name is None:
name = str(self.parent_context)
if name is None:
raise ValueError(
"LoggingContext must be given either a name or a parent context"
)
self.name = name
def __str__(self) -> str:
return self.name
if self.request:
return self.request.request_id
return "%s@%x" % (self.name, id(self))
@classmethod
def current_context(cls) -> LoggingContextOrSentinel:
@@ -701,13 +694,17 @@ def nested_logging_context(suffix: str) -> LoggingContext:
"Starting nested logging context from sentinel context: metrics will be lost"
)
parent_context = None
prefix = ""
request = None
else:
assert isinstance(curr_context, LoggingContext)
parent_context = curr_context
prefix = str(curr_context)
prefix = str(parent_context.name)
request = parent_context.request
return LoggingContext(
prefix + "-" + suffix,
parent_context=parent_context,
request=request,
)
@@ -898,7 +895,7 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
parent_context = curr_context
def g():
with LoggingContext(str(curr_context), parent_context=parent_context):
with LoggingContext(parent_context=parent_context):
return f(*args, **kwargs)
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))

View File

@@ -241,24 +241,19 @@ class BackgroundProcessLoggingContext(LoggingContext):
processes.
"""
__slots__ = ["_proc"]
__slots__ = ["_id", "_proc"]
def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None):
"""
def __init__(self, name: str, id: Optional[Union[int, str]] = None):
super().__init__(name)
self._id = id
Args:
name: The name of the background process. Each distinct `name` gets a
separate prometheus time series.
instance_id: an identifer to add to `name` to distinguish this instance of
the named background process in the logs. If this is `None`, one is
made up based on id(self).
"""
if instance_id is None:
instance_id = id(self)
super().__init__("%s-%s" % (name, instance_id))
self._proc = _BackgroundProcess(name, self)
def __str__(self) -> str:
if self._id is not None:
return "%s-%s" % (self.name, self._id)
return "%s@%x" % (self.name, id(self))
def start(self, rusage: "Optional[resource._RUsage]"):
"""Log context has started running (again)."""

View File

@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Generator, Iterable, List, Optional, Tupl
from twisted.internet import defer
from synapse.events import EventBase
from synapse.handlers.presence import get_interested_remotes
from synapse.http.client import SimpleHttpClient
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -51,6 +52,9 @@ class ModuleApi:
self._server_name = hs.hostname
self._presence_stream = hs.get_event_sources().sources["presence"]
self._state = hs.get_state_handler()
self._presence_router = hs.get_presence_router()
self._federation = self._hs.get_federation_sender()
# We expose these as properties below in order to attach a helpful docstring.
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
@@ -424,21 +428,26 @@ class ModuleApi:
# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
else:
if not self._federation:
continue
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)
# Send to remote destinations.
# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()
await presence_handler.maybe_send_presence_to_interested_destinations(
presence_events
# Send to remote destinations
hosts_and_states = await get_interested_remotes(
self._store,
self._presence_router,
presence_events,
self._state,
)
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room

View File

@@ -17,7 +17,6 @@ from collections import namedtuple
from typing import (
Awaitable,
Callable,
Collection,
Dict,
Iterable,
List,
@@ -43,7 +42,13 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
from synapse.types import (
Collection,
PersistedEventPosition,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -222,9 +227,7 @@ class Notifier:
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()
self.federation_sender = hs.get_federation_sender()
self.state_handler = hs.get_state_handler()

View File

@@ -106,10 +106,6 @@ class BulkPushRuleEvaluator:
self.store = hs.get_datastore()
self.auth = hs.get_auth()
# Used by `RulesForRoom` to ensure only one thing mutates the cache at a
# time. Keyed off room_id.
self._rules_linearizer = Linearizer(name="rules_for_room")
self.room_push_rule_cache_metrics = register_cache(
"cache",
"room_push_rule_cache",
@@ -127,16 +123,7 @@ class BulkPushRuleEvaluator:
dict of user_id -> push_rules
"""
room_id = event.room_id
rules_for_room_data = self._get_rules_for_room(room_id)
rules_for_room = RulesForRoom(
hs=self.hs,
room_id=room_id,
rules_for_room_cache=self._get_rules_for_room.cache,
room_push_rule_cache_metrics=self.room_push_rule_cache_metrics,
linearizer=self._rules_linearizer,
cached_data=rules_for_room_data,
)
rules_for_room = self._get_rules_for_room(room_id)
rules_by_user = await rules_for_room.get_rules(event, context)
@@ -155,12 +142,17 @@ class BulkPushRuleEvaluator:
return rules_by_user
@lru_cache()
def _get_rules_for_room(self, room_id: str) -> "RulesForRoomData":
"""Get the current RulesForRoomData object for the given room id"""
# It's important that the RulesForRoomData object gets added to self._get_rules_for_room.cache
def _get_rules_for_room(self, room_id: str) -> "RulesForRoom":
"""Get the current RulesForRoom object for the given room id"""
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
return RulesForRoomData()
return RulesForRoom(
self.hs,
room_id,
self._get_rules_for_room.cache,
self.room_push_rule_cache_metrics,
)
async def _get_power_levels_and_sender_level(
self, event: EventBase, context: EventContext
@@ -290,49 +282,11 @@ def _condition_checker(
return True
@attr.s(slots=True)
class RulesForRoomData:
"""The data stored in the cache by `RulesForRoom`.
We don't store `RulesForRoom` directly in the cache as we want our caches to
*only* include data, and not references to e.g. the data stores.
"""
# event_id -> (user_id, state)
member_map = attr.ib(type=Dict[str, Tuple[str, str]], factory=dict)
# user_id -> rules
rules_by_user = attr.ib(type=Dict[str, List[Dict[str, dict]]], factory=dict)
# The last state group we updated the caches for. If the state_group of
# a new event comes along, we know that we can just return the cached
# result.
# On invalidation of the rules themselves (if the user changes them),
# we invalidate everything and set state_group to `object()`
state_group = attr.ib(type=Union[object, int], factory=object)
# A sequence number to keep track of when we're allowed to update the
# cache. We bump the sequence number when we invalidate the cache. If
# the sequence number changes while we're calculating stuff we should
# not update the cache with it.
sequence = attr.ib(type=int, default=0)
# A cache of user_ids that we *know* aren't interesting, e.g. user_ids
# owned by AS's, or remote users, etc. (I.e. users we will never need to
# calculate push for)
# These never need to be invalidated as we will never set up push for
# them.
uninteresting_user_set = attr.ib(type=Set[str], factory=set)
class RulesForRoom:
"""Caches push rules for users in a room.
This efficiently handles users joining/leaving the room by not invalidating
the entire cache for the room.
A new instance is constructed for each call to
`BulkPushRuleEvaluator._get_rules_for_event`, with the cached data from
previous calls passed in.
"""
def __init__(
@@ -341,8 +295,6 @@ class RulesForRoom:
room_id: str,
rules_for_room_cache: LruCache,
room_push_rule_cache_metrics: CacheMetric,
linearizer: Linearizer,
cached_data: RulesForRoomData,
):
"""
Args:
@@ -351,21 +303,38 @@ class RulesForRoom:
rules_for_room_cache: The cache object that caches these
RoomsForUser objects.
room_push_rule_cache_metrics: The metrics object
linearizer: The linearizer used to ensure only one thing mutates
the cache at a time. Keyed off room_id
cached_data: Cached data from previous calls to `self.get_rules`,
can be mutated.
"""
self.room_id = room_id
self.is_mine_id = hs.is_mine_id
self.store = hs.get_datastore()
self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
# Used to ensure only one thing mutates the cache at a time. Keyed off
# room_id.
self.linearizer = linearizer
self.linearizer = Linearizer(name="rules_for_room")
self.data = cached_data
# event_id -> (user_id, state)
self.member_map = {} # type: Dict[str, Tuple[str, str]]
# user_id -> rules
self.rules_by_user = {} # type: Dict[str, List[Dict[str, dict]]]
# The last state group we updated the caches for. If the state_group of
# a new event comes along, we know that we can just return the cached
# result.
# On invalidation of the rules themselves (if the user changes them),
# we invalidate everything and set state_group to `object()`
self.state_group = object()
# A sequence number to keep track of when we're allowed to update the
# cache. We bump the sequence number when we invalidate the cache. If
# the sequence number changes while we're calculating stuff we should
# not update the cache with it.
self.sequence = 0
# A cache of user_ids that we *know* aren't interesting, e.g. user_ids
# owned by AS's, or remote users, etc. (I.e. users we will never need to
# calculate push for)
# These never need to be invalidated as we will never set up push for
# them.
self.uninteresting_user_set = set() # type: Set[str]
# We need to be clever on the invalidating caches callbacks, as
# otherwise the invalidation callback holds a reference to the object,
@@ -383,25 +352,25 @@ class RulesForRoom:
"""
state_group = context.state_group
if state_group and self.data.state_group == state_group:
if state_group and self.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
self.room_push_rule_cache_metrics.inc_hits()
return self.data.rules_by_user
return self.rules_by_user
with (await self.linearizer.queue(self.room_id)):
if state_group and self.data.state_group == state_group:
with (await self.linearizer.queue(())):
if state_group and self.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
self.room_push_rule_cache_metrics.inc_hits()
return self.data.rules_by_user
return self.rules_by_user
self.room_push_rule_cache_metrics.inc_misses()
ret_rules_by_user = {}
missing_member_event_ids = {}
if state_group and self.data.state_group == context.prev_group:
if state_group and self.state_group == context.prev_group:
# If we have a simple delta then we can reuse most of the previous
# results.
ret_rules_by_user = self.data.rules_by_user
ret_rules_by_user = self.rules_by_user
current_state_ids = context.delta_ids
push_rules_delta_state_cache_metric.inc_hits()
@@ -424,24 +393,24 @@ class RulesForRoom:
if typ != EventTypes.Member:
continue
if user_id in self.data.uninteresting_user_set:
if user_id in self.uninteresting_user_set:
continue
if not self.is_mine_id(user_id):
self.data.uninteresting_user_set.add(user_id)
self.uninteresting_user_set.add(user_id)
continue
if self.store.get_if_app_services_interested_in_user(user_id):
self.data.uninteresting_user_set.add(user_id)
self.uninteresting_user_set.add(user_id)
continue
event_id = current_state_ids[key]
res = self.data.member_map.get(event_id, None)
res = self.member_map.get(event_id, None)
if res:
user_id, state = res
if state == Membership.JOIN:
rules = self.data.rules_by_user.get(user_id, None)
rules = self.rules_by_user.get(user_id, None)
if rules:
ret_rules_by_user[user_id] = rules
continue
@@ -461,7 +430,7 @@ class RulesForRoom:
else:
# The push rules didn't change but lets update the cache anyway
self.update_cache(
self.data.sequence,
self.sequence,
members={}, # There were no membership changes
rules_by_user=ret_rules_by_user,
state_group=state_group,
@@ -492,7 +461,7 @@ class RulesForRoom:
for. Used when updating the cache.
event: The event we are currently computing push rules for.
"""
sequence = self.data.sequence
sequence = self.sequence
rows = await self.store.get_membership_from_event_ids(member_event_ids.values())
@@ -532,11 +501,23 @@ class RulesForRoom:
self.update_cache(sequence, members, ret_rules_by_user, state_group)
def invalidate_all(self) -> None:
# Note: Don't hand this function directly to an invalidation callback
# as it keeps a reference to self and will stop this instance from being
# GC'd if it gets dropped from the rules_to_user cache. Instead use
# `self.invalidate_all_cb`
logger.debug("Invalidating RulesForRoom for %r", self.room_id)
self.sequence += 1
self.state_group = object()
self.member_map = {}
self.rules_by_user = {}
push_rules_invalidation_counter.inc()
def update_cache(self, sequence, members, rules_by_user, state_group) -> None:
if sequence == self.data.sequence:
self.data.member_map.update(members)
self.data.rules_by_user = rules_by_user
self.data.state_group = state_group
if sequence == self.sequence:
self.member_map.update(members)
self.rules_by_user = rules_by_user
self.state_group = state_group
@attr.attrs(slots=True, frozen=True)
@@ -554,10 +535,6 @@ class _Invalidation:
room_id = attr.ib(type=str)
def __call__(self) -> None:
rules_data = self.cache.get(self.room_id, None, update_metrics=False)
if rules_data:
rules_data.sequence += 1
rules_data.state_group = object()
rules_data.member_map = {}
rules_data.rules_by_user = {}
push_rules_invalidation_counter.inc()
rules = self.cache.get(self.room_id, None, update_metrics=False)
if rules:
rules.invalidate_all()

View File

@@ -19,9 +19,8 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from twisted.internet.interfaces import IDelayedCall
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException, ThrottleParams
from synapse.push import Pusher, PusherConfig, ThrottleParams
from synapse.push.mailer import Mailer
from synapse.util.threepids import validate_email
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -72,12 +71,6 @@ class EmailPusher(Pusher):
self._is_processing = False
# Make sure that the email is valid.
try:
validate_email(self.email)
except ValueError:
raise PusherConfigException("Invalid email")
def on_started(self, should_check_for_notifs: bool) -> None:
"""Called when this pusher has been started.

View File

@@ -62,9 +62,7 @@ class PusherPool:
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
)
self._account_validity = hs.config.account_validity
# We shard the handling of push notifications by user ID.
self._pusher_shard_config = hs.config.push.pusher_shard_config
@@ -238,7 +236,7 @@ class PusherPool:
for u in users_affected:
# Don't push if the user account has expired
if self._account_validity_enabled:
if self._account_validity.enabled:
expired = await self.store.is_account_expired(
u, self.clock.time_msec()
)
@@ -268,7 +266,7 @@ class PusherPool:
for u in users_affected:
# Don't push if the user account has expired
if self._account_validity_enabled:
if self._account_validity.enabled:
expired = await self.store.is_account_expired(
u, self.clock.time_msec()
)

View File

@@ -158,10 +158,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
def make_client(cls, hs):
"""Create a client that makes requests.
Returns a callable that accepts the same parameters as
`_serialize_payload`, and also accepts an optional `instance_name`
parameter to specify which instance to hit (the instance must be in
the `instance_map` config).
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
client = hs.get_simple_http_client()

View File

@@ -0,0 +1,50 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.replication.tcp.streams import PresenceStream
from synapse.storage import DataStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.presence import PresenceStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
class SlavedPresenceStore(BaseSlavedStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")
self._presence_on_startup = self._get_active_presence(db_conn) # type: ignore
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)
_get_active_presence = DataStore._get_active_presence
take_presence_startup_info = DataStore.take_presence_startup_info
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(instance_name, token)
for row in rows:
self.presence_stream_cache.entity_has_changed(row.user_id, token)
self._get_presence_for_user.invalidate((row.user_id,))
return super().process_replication_rows(stream_name, instance_name, token, rows)

View File

@@ -20,10 +20,8 @@ from twisted.internet.defer import Deferred
from twisted.internet.protocol import ReconnectingClientFactory
from synapse.api.constants import EventTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import (
AccountDataStream,
@@ -339,6 +337,7 @@ class FederationSenderHandler:
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self._hs = hs
self._presence_handler = hs.get_presence_handler()
# We need to make a temporary value to ensure that mypy picks up the
# right type. We know we should have a federation sender instance since
@@ -357,14 +356,8 @@ class FederationSenderHandler:
self.federation_sender.wake_destination(server)
async def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
if stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
# ... as well as device updates and messages
@@ -402,54 +395,3 @@ class FederationSenderHandler:
receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)
async def update_token(self, token):
"""Update the record of where we have processed to in the federation stream.
Called after we have processed a an update received over replication. Sends
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
self.federation_position = token
# We save and send the ACK to master asynchronously, so we don't block
# processing on persistence. We don't need to do this operation for
# every single RDATA we receive, we just need to do it periodically.
if self._fed_position_linearizer.is_queued(None):
# There is already a task queued up to save and send the token, so
# no need to queue up another task.
return
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
async def _save_and_send_ack(self):
"""Save the current federation position in the database and send an ACK
to master with where we're up to.
"""
# We should only be calling this once we've got a token.
assert self.federation_position is not None
try:
# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
# has a linearizer which ensures that we only process one line of
# replication data at a time. Should we remove it, or is it doing useful
# service for robustness? Or could we replace it with an assertion that
# we're not being re-entered?
with (await self._fed_position_linearizer.queue(None)):
# We persist and ack the same position, so we take a copy of it
# here as otherwise it can get modified from underneath us.
current_position = self.federation_position
await self.store.update_federation_out_pos(
"federation", current_position
)
# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(current_position)
except Exception:
logger.exception("Error updating federation stream position")

View File

@@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command):
return self.instance_id
class FederationAckCommand(Command):
"""Sent by the client when it has processed up to a given point in the
federation stream. This allows the master to drop in-memory caches of the
federation stream.
This must only be sent from one worker (i.e. the one sending federation)
Format::
FEDERATION_ACK <instance_name> <token>
"""
NAME = "FEDERATION_ACK"
def __init__(self, instance_name: str, token: int):
self.instance_name = instance_name
self.token = token
@classmethod
def from_line(cls, line: str) -> "FederationAckCommand":
instance_name, token = line.split(" ")
return cls(instance_name, int(token))
def to_line(self) -> str:
return "%s %s" % (self.instance_name, self.token)
class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
@@ -389,7 +362,6 @@ _COMMANDS = (
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
@@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = (
PingCommand.NAME,
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,

View File

@@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
FederationAckCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
@@ -54,9 +53,6 @@ from synapse.replication.tcp.streams import (
BackfillStream,
CachesStream,
EventsStream,
FederationStream,
PresenceFederationStream,
PresenceStream,
ReceiptsStream,
Stream,
TagAccountDataStream,
@@ -75,7 +71,6 @@ inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
@@ -101,10 +96,6 @@ class ReplicationCommandHandler:
self._instance_id = hs.get_instance_id()
self._instance_name = hs.get_instance_name()
self._is_presence_writer = (
hs.get_instance_name() in hs.config.worker.writers.presence
)
self._streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]
@@ -159,23 +150,10 @@ class ReplicationCommandHandler:
continue
if isinstance(stream, (PresenceStream, PresenceFederationStream)):
# Only add PresenceStream as a source on the instance in charge
# of presence.
if self._is_presence_writer:
self._streams_to_replicate.append(stream)
continue
# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue
if stream.NAME == FederationStream.NAME and hs.config.send_federation:
# We only support federation stream if federation sending
# has been disabled on the master.
continue
self._streams_to_replicate.append(stream)
# Map of stream name to batched updates. See RdataCommand for info on
@@ -364,7 +342,7 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_sync_counter.inc()
if self._is_presence_writer:
if self._is_master:
return self._presence_handler.update_external_syncs_row(
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
@@ -374,19 +352,11 @@ class ReplicationCommandHandler:
def on_CLEAR_USER_SYNC(
self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
) -> Optional[Awaitable[None]]:
if self._is_presence_writer:
if self._is_master:
return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
else:
return None
def on_FEDERATION_ACK(
self, conn: IReplicationConnection, cmd: FederationAckCommand
):
federation_ack_counter.inc()
if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
def on_USER_IP(
self, conn: IReplicationConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
@@ -669,12 +639,6 @@ class ReplicationCommandHandler:
else:
logger.warning("Dropping command as not connected: %r", cmd.NAME)
def send_federation_ack(self, token: int):
"""Ack data for the federation stream. This allows the master to drop
data stored purely in memory.
"""
self.send_command(FederationAckCommand(self._instance_name, token))
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
):

View File

@@ -49,7 +49,7 @@ import fcntl
import logging
import struct
from inspect import isawaitable
from typing import TYPE_CHECKING, Collection, List, Optional
from typing import TYPE_CHECKING, List, Optional
from prometheus_client import Counter
from zope.interface import Interface, implementer
@@ -76,6 +76,7 @@ from synapse.replication.tcp.commands import (
ServerCommand,
parse_command_from_line,
)
from synapse.types import Collection
from synapse.util import Clock
from synapse.util.stringutils import random_string

View File

@@ -43,7 +43,6 @@ from synapse.replication.tcp.streams._base import (
UserSignatureStream,
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
STREAMS_MAP = {
stream.NAME: stream
@@ -60,7 +59,6 @@ STREAMS_MAP = {
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
FederationStream,
TagAccountDataStream,
AccountDataStream,
GroupServerStream,

View File

@@ -272,22 +272,15 @@ class PresenceStream(Stream):
NAME = "presence"
ROW_TYPE = PresenceStreamRow
def __init__(self, hs: "HomeServer"):
def __init__(self, hs):
store = hs.get_datastore()
if hs.get_instance_name() in hs.config.worker.writers.presence:
# on the presence writer, query the presence handler
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
from synapse.handlers.presence import PresenceHandler
assert isinstance(presence_handler, PresenceHandler)
update_function = (
presence_handler.get_all_presence_updates
) # type: UpdateFunction
update_function = presence_handler.get_all_presence_updates
else:
# Query presence writer process
# Query master process
update_function = make_http_update_function(hs, self.NAME)
super().__init__(

View File

@@ -1,80 +0,0 @@
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
from synapse.replication.tcp.streams._base import (
Stream,
current_token_without_instance,
make_http_update_function,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs: "HomeServer"):
if hs.config.worker_app is None:
# master process: get updates from the FederationRemoteSendQueue.
# (if the master is configured to send federation itself, federation_sender
# will be a real FederationSender, which has stubs for current_token and
# get_replication_rows.)
federation_sender = hs.get_federation_sender()
current_token = current_token_without_instance(
federation_sender.get_current_token
)
update_function = (
federation_sender.get_replication_rows
) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
elif hs.should_send_federation():
# federation sender: Query master process
update_function = make_http_update_function(hs, self.NAME)
current_token = self._stub_current_token
else:
# other worker: stub out the update function (we're not interested in
# any updates so when we get a POSITION we do nothing)
update_function = self._stub_update_function
current_token = self._stub_current_token
super().__init__(hs.get_instance_name(), current_token, update_function)
@staticmethod
def _stub_current_token(instance_name: str) -> int:
# dummy current-token method for use on workers
return 0
@staticmethod
async def _stub_update_function(
instance_name: str, from_token: int, upto_token: int, limit: int
) -> Tuple[list, int, bool]:
return [], upto_token, False

Some files were not shown because too many files have changed in this diff Show More