1
0

Compare commits

...

14 Commits

Author SHA1 Message Date
Andrew Morgan
0dd33f1803 Update dimension widget URL 2021-04-30 15:00:30 +01:00
Andrew Morgan
e2582a5dd7 Switch to dimension's URL 2021-04-28 18:42:52 +01:00
Andrew Morgan
a195efa1fe Add Element call information to well-known client json 2021-04-28 16:41:37 +01:00
Erik Johnston
802560211a Merge remote-tracking branch 'origin/release-v1.33.0' into develop 2021-04-28 14:43:10 +01:00
Erik Johnston
e4ab8676b4 Fix tight loop handling presence replication. (#9900)
Only affects workers. Introduced in #9819.

Fixes #9899.
2021-04-28 14:42:50 +01:00
Patrick Cloke
10a08ab88a Use the parent's logging context name for runWithConnection. (#9895)
This fixes a regression where the logging context for runWithConnection
was reported as runWithConnection instead of the connection name,
e.g. "POST-XYZ".
2021-04-28 07:44:52 -04:00
Andrew Morgan
fa6679e794 Merge tag 'v1.33.0rc1' into develop
Synapse 1.33.0rc1 (2021-04-28)
==============================

Features
--------

- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))

Bugfixes
--------

- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))

Improved Documentation
----------------------

- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))

Internal Changes
----------------

- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
2021-04-28 12:12:29 +01:00
Andrew Morgan
8ba086980d Reword account validity template change to sound less like a bugfix 2021-04-28 12:07:49 +01:00
Erik Johnston
391bfe9a7b Reduce memory footprint of caches (#9886) 2021-04-28 11:59:28 +01:00
Andrew Morgan
787de3190f 1.33.0rc1 2021-04-28 11:43:33 +01:00
Andrew Morgan
4e0fd35bc9 Revert "Experimental Federation Speedup (#9702)"
This reverts commit 05e8c70c05.
2021-04-28 11:38:33 +01:00
Erik Johnston
dd2d32dcdb Add type hints to presence handler (#9885) 2021-04-28 11:07:47 +01:00
Andrew Morgan
fe604a022a Remove various bits of compatibility code for Python <3.6 (#9879)
I went through and removed a bunch of cruft that was lying around for compatibility with old Python versions. This PR also will now prevent Synapse from starting unless you're running Python 3.6+.
2021-04-27 13:13:07 +01:00
Patrick Cloke
1350b053da Pass errors back to the client when trying multiple federation destinations. (#9868)
This ensures that something like an auth error (403) will be
returned to the requester instead of attempting to try more
servers, which will likely result in the same error, and then
passing back a generic 400 error.
2021-04-27 07:30:34 -04:00
62 changed files with 440 additions and 415 deletions

View File

@@ -1,3 +1,56 @@
Synapse 1.33.0rc1 (2021-04-28)
==============================
Features
--------
- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))
Bugfixes
--------
- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))
Improved Documentation
----------------------
- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))
Internal Changes
----------------
- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
Synapse 1.32.2 (2021-04-22)
===========================

View File

@@ -1 +0,0 @@
Add a dockerfile for running Synapse in worker-mode under Complement.

View File

@@ -1 +0,0 @@
Speed up federation transmission by using fewer database calls. Contributed by @ShadowJonathan.

View File

@@ -1 +0,0 @@
Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path.

View File

@@ -1 +0,0 @@
Apply `pyupgrade` across the codebase.

View File

@@ -1 +0,0 @@
Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg.

View File

@@ -1 +0,0 @@
Move some replication processing out of `generic_worker`.

View File

@@ -1 +0,0 @@
Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.

View File

@@ -1 +0,0 @@
Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms.

View File

@@ -1 +0,0 @@
Add some sanity checks to identity server passed to 3PID bind/unbind endpoints.

View File

@@ -1 +0,0 @@
Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.

View File

@@ -1 +0,0 @@
Replace `HomeServer.get_config()` with inline references.

View File

@@ -1 +0,0 @@
Rename some handlers and config modules to not duplicate the top-level module.

View File

@@ -1 +0,0 @@
Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Reduce CPU usage of the user directory by reusing existing calculated room membership.

View File

@@ -1 +0,0 @@
Small speed up for joining large remote rooms.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Don't return an error when a user attempts to renew their account multiple times with the same token. Instead, state when their account is set to expire. This change concerns the optional account validity feature.

View File

@@ -1 +0,0 @@
Limit the size of HTTP responses read over federation.

View File

@@ -1 +0,0 @@
Introduce flake8-bugbear to the test suite and fix some of its lint violations.

View File

@@ -1 +0,0 @@
Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores.

View File

@@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@@ -1 +0,0 @@
Limit length of accepted email addresses.

View File

@@ -1 +0,0 @@
Remove redundant `synapse.types.Collection` type definition.

View File

@@ -1 +0,0 @@
Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts.

View File

@@ -1 +0,0 @@
Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists.

View File

@@ -1 +0,0 @@
Disable invite rate-limiting by default when running the unit tests.

View File

@@ -1 +0,0 @@
Pass a reactor into `SynapseSite` to make testing easier.

View File

@@ -1 +0,0 @@
Make `DomainSpecificString` an `attrs` class.

View File

@@ -1 +0,0 @@
Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules.

View File

@@ -1 +0,0 @@
Remove redundant `_PushHTTPChannel` test class.

1
changelog.d/9885.misc Normal file
View File

@@ -0,0 +1 @@
Add type hints to presence handler.

1
changelog.d/9886.misc Normal file
View File

@@ -0,0 +1 @@
Reduce memory usage of the LRU caches.

View File

@@ -1 +0,0 @@
Small performance improvement around handling new local presence updates.

1
changelog.d/9895.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements.

1
changelog.d/9900.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.

View File

@@ -224,16 +224,14 @@ class HomeServer(ReplicationHandler):
destinations = yield self.get_servers_for_context(room_name)
try:
yield self.replication_layer.send_pdus(
[
Pdu.create_new(
context=room_name,
pdu_type="sy.room.message",
content={"sender": sender, "body": body},
origin=self.server_name,
destinations=destinations,
)
]
yield self.replication_layer.send_pdu(
Pdu.create_new(
context=room_name,
pdu_type="sy.room.message",
content={"sender": sender, "body": body},
origin=self.server_name,
destinations=destinations,
)
)
except Exception as e:
logger.exception(e)
@@ -255,7 +253,7 @@ class HomeServer(ReplicationHandler):
origin=self.server_name,
destinations=destinations,
)
yield self.replication_layer.send_pdus([pdu])
yield self.replication_layer.send_pdu(pdu)
except Exception as e:
logger.exception(e)
@@ -267,18 +265,16 @@ class HomeServer(ReplicationHandler):
destinations = yield self.get_servers_for_context(room_name)
try:
yield self.replication_layer.send_pdus(
[
Pdu.create_new(
context=room_name,
is_state=True,
pdu_type="sy.room.member",
state_key=invitee,
content={"membership": "invite"},
origin=self.server_name,
destinations=destinations,
)
]
yield self.replication_layer.send_pdu(
Pdu.create_new(
context=room_name,
is_state=True,
pdu_type="sy.room.member",
state_key=invitee,
content={"membership": "invite"},
origin=self.server_name,
destinations=destinations,
)
)
except Exception as e:
logger.exception(e)

View File

@@ -41,7 +41,6 @@ files =
synapse/push,
synapse/replication,
synapse/rest,
synapse/secrets.py,
synapse/server.py,
synapse/server_notices,
synapse/spam_checker_api,

View File

@@ -21,8 +21,8 @@ import os
import sys
# Check that we're not running on an unsupported Python version.
if sys.version_info < (3, 5):
print("Synapse requires Python 3.5 or above.")
if sys.version_info < (3, 6):
print("Synapse requires Python 3.6 or above.")
sys.exit(1)
# Twisted and canonicaljson will fail to import when this file is executed to
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.32.2"
__version__ = "1.33.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -451,6 +451,28 @@ class FederationClient(FederationBase):
return signed_auth
def _is_unknown_endpoint(
self, e: HttpResponseException, synapse_error: Optional[SynapseError] = None
) -> bool:
"""
Returns true if the response was due to an endpoint being unimplemented.
Args:
e: The error response received from the remote server.
synapse_error: The above error converted to a SynapseError. This is
automatically generated if not provided.
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
# There is no good way to detect an "unknown" endpoint.
#
# Dendrite returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
return e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
)
async def _try_destination_list(
self,
description: str,
@@ -468,9 +490,9 @@ class FederationClient(FederationBase):
callback: Function to run for each server. Passed a single
argument: the server_name to try.
If the callback raises a CodeMessageException with a 300/400 code,
attempts to perform the operation stop immediately and the exception is
reraised.
If the callback raises a CodeMessageException with a 300/400 code or
an UnsupportedRoomVersionError, attempts to perform the operation
stop immediately and the exception is reraised.
Otherwise, if the callback raises an Exception the error is logged and the
next server tried. Normally the stacktrace is logged but this is
@@ -492,8 +514,7 @@ class FederationClient(FederationBase):
continue
try:
res = await callback(destination)
return res
return await callback(destination)
except InvalidResponseError as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
except UnsupportedRoomVersionError:
@@ -502,17 +523,15 @@ class FederationClient(FederationBase):
synapse_error = e.to_synapse_error()
failover = False
# Failover on an internal server error, or if the destination
# doesn't implemented the endpoint for some reason.
if 500 <= e.code < 600:
failover = True
elif failover_on_unknown_endpoint:
# there is no good way to detect an "unknown" endpoint. Dendrite
# returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
if e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
):
failover = True
elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
e, synapse_error
):
failover = True
if not failover:
raise synapse_error from e
@@ -570,9 +589,8 @@ class FederationClient(FederationBase):
UnsupportedRoomVersionError: if remote responds with
a room version we don't understand.
SynapseError: if the chosen remote server returns a 300/400 code.
RuntimeError: if no servers were reachable.
SynapseError: if the chosen remote server returns a 300/400 code, or
no servers successfully handle the request.
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
@@ -642,9 +660,8 @@ class FederationClient(FederationBase):
``auth_chain``.
Raises:
SynapseError: if the chosen remote server returns a 300/400 code.
RuntimeError: if no servers were reachable.
SynapseError: if the chosen remote server returns a 300/400 code, or
no servers successfully handle the request.
"""
async def send_request(destination) -> Dict[str, Any]:
@@ -673,7 +690,7 @@ class FederationClient(FederationBase):
if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
raise InvalidResponseError("No create event in state")
# the room version should be sane.
create_room_version = create_event.content.get(
@@ -746,16 +763,11 @@ class FederationClient(FederationBase):
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise consider it a legitmate error
# and raise.
if not self._is_unknown_endpoint(e):
raise
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
@@ -802,6 +814,11 @@ class FederationClient(FederationBase):
Returns:
The event as a dict as returned by the remote server
Raises:
SynapseError: if the remote server returns an error or if the server
only supports the v1 endpoint and a room version other than "1"
or "2" is requested.
"""
time_now = self._clock.time_msec()
@@ -817,28 +834,19 @@ class FederationClient(FederationBase):
},
)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, we
# assume that the remote understands the v2 invite API and this
# is a legitimate error.
if err.errcode != Codes.UNKNOWN:
raise err
# Otherwise, we assume that the remote server doesn't understand
# the v2 invite API. That's ok provided the room uses old-style event
# IDs.
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint if the room uses old-style event IDs.
# Otherwise consider it a legitmate error and raise.
err = e.to_synapse_error()
if self._is_unknown_endpoint(e, err):
if room_version.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
elif e.code in (403, 429):
raise e.to_synapse_error()
else:
raise
raise err
# Didn't work, try v1 API.
# Note the v1 API returns a tuple of `(200, content)`
@@ -865,9 +873,8 @@ class FederationClient(FederationBase):
pdu: event to be sent
Raises:
SynapseError if the chosen remote server returns a 300/400 code.
RuntimeError if no servers were reachable.
SynapseError: if the chosen remote server returns a 300/400 code, or
no servers successfully handle the request.
"""
async def send_request(destination: str) -> None:
@@ -889,16 +896,11 @@ class FederationClient(FederationBase):
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise consider it a legitmate error
# and raise.
if not self._is_unknown_endpoint(e):
raise
logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")

View File

@@ -14,26 +14,19 @@
import abc
import logging
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Hashable,
Iterable,
List,
Optional,
Set,
Tuple,
)
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender):
if not events and next_token >= self._last_poked_id:
break
async def get_destinations_for_event(
event: EventBase,
) -> Collection[str]:
"""Computes the destinations to which this event must be sent.
This returns an empty tuple when there are no destinations to send to,
or if this event is not from this homeserver and it is not sending
it on behalf of another server.
Will also filter out destinations which this sender is not responsible for,
if multiple federation senders exist.
"""
async def handle_event(event: EventBase) -> None:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return ()
return
if not event.internal_metadata.should_proactively_send():
return ()
return
destinations = None # type: Optional[Set[str]]
if not event.prev_event_ids():
@@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender):
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return ()
return
destinations = {
d
@@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender):
)
}
destinations.discard(self.server_name)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
if destinations:
await self._send_pdu(event, destinations)
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).observe((now - ts) / 1000)
return destinations
return ()
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
for event in events:
await handle_event(event)
async def get_federatable_events_and_destinations(
events: Iterable[EventBase],
) -> List[Tuple[EventBase, Collection[str]]]:
with Measure(self.clock, "get_destinations_for_events"):
# Fetch federation destinations per event,
# skip if get_destinations_for_event returns an empty collection,
# return list of event->destinations pairs.
return [
(event, dests)
for (event, dests) in [
(event, await get_destinations_for_event(event))
for event in events
]
if dests
]
events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
events_and_dests = await get_federatable_events_and_destinations(events)
# Send corresponding events to each destination queue
await self._distribute_events(events_and_dests)
await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
for evs in events_by_room.values()
],
consumeErrors=True,
)
)
await self.store.update_federation_out_pos("events", next_token)
@@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender):
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels("federation_sender").inc(
len({event.room_id for event in events})
len(events_by_room)
)
event_processing_loop_counter.labels("federation_sender").inc()
@@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender):
finally:
self._is_processing = False
async def _distribute_events(
self,
events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
) -> None:
"""Distribute events to the respective per_destination queues.
async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
Also persists last-seen per-room stream_ordering to 'destination_rooms'.
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
Args:
events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
Every event is paired with its intended destinations (in federation).
"""
# Tuples of room_id + destination to their max-seen stream_ordering
room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]
if not destinations:
return
# List of events to send to each destination
events_by_dest = {} # type: Dict[str, List[EventBase]]
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
# For each event-destinations pair...
for event, destinations in events_and_dests:
assert pdu.internal_metadata.stream_ordering
# (we got this from the database, it's filled)
assert event.internal_metadata.stream_ordering
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
# ...iterate over those destinations..
for destination in destinations:
# ...update their stream-ordering...
room_with_dest_stream_ordering[(event.room_id, destination)] = max(
event.internal_metadata.stream_ordering,
room_with_dest_stream_ordering.get((event.room_id, destination), 0),
)
# ...and add the event to each destination queue.
events_by_dest.setdefault(destination, []).append(event)
# Bulk-store destination_rooms stream_ids
await self.store.bulk_store_destination_rooms_entries(
room_with_dest_stream_ordering
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
pdu.internal_metadata.stream_ordering,
)
for destination, pdus in events_by_dest.items():
logger.debug("Sending %d pdus to %s", len(pdus), destination)
self._get_per_destination_queue(destination).send_pdus(pdus)
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room

View File

@@ -154,22 +154,19 @@ class PerDestinationQueue:
+ len(self._pending_edus_keyed)
)
def send_pdus(self, pdus: Iterable[EventBase]) -> None:
"""Add PDUs to the queue, and start the transmission loop if necessary
def send_pdu(self, pdu: EventBase) -> None:
"""Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdus: pdus to send
pdu: pdu to send
"""
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
self._pending_pdus.extend(pdus)
self._pending_pdus.append(pdu)
else:
self._catchup_last_skipped = max(
pdu.internal_metadata.stream_ordering
for pdu in pdus
if pdu.internal_metadata.stream_ordering is not None
)
assert pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
self.attempt_new_transaction()

View File

@@ -28,6 +28,7 @@ from bisect import bisect
from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
Callable,
Collection,
Dict,
FrozenSet,
@@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC):
"""
async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
):
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
) -> None:
"""Update the syncing users for an external process as a delta.
This is a no-op when presence is handled by a different worker.
Args:
process_id (str): An identifier for the process the users are
process_id: An identifier for the process the users are
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing
user_id: The user who has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
pass
async def update_external_syncs_clear(self, process_id):
async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process
as offline.
@@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]):
class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
@@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline = {}
self.users_going_offline = {} # type: Dict[str, int]
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
@@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler):
self._on_shutdown,
)
def _on_shutdown(self):
def _on_shutdown(self) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
def mark_as_coming_online(self, user_id):
def mark_as_coming_online(self, user_id: str) -> None:
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
@@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
def mark_as_going_offline(self, user_id):
def mark_as_going_offline(self, user_id: str) -> None:
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
Args:
user_id (str)
"""
self.users_going_offline[user_id] = self.clock.time_msec()
def send_stop_syncing(self):
def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
@@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
return _user_syncing()
async def notify_from_replication(self, states, stream_id):
async def notify_from_replication(
self, states: List[UserPresenceState], stream_id: int
) -> None:
parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
@@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
if count > 0
]
async def set_state(self, target_user, state, ignore_status_msg=False):
async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
) -> None:
"""Set the presence state of the user."""
presence = state["presence"]
@@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
ignore_status_msg=ignore_status_msg,
)
async def bump_presence_active_time(self, user):
async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler):
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[int, int]
self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[str, int]
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
@@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler):
self._event_pos = self.store.get_current_events_token()
self._event_processing = False
async def _on_shutdown(self):
async def _on_shutdown(self) -> None:
"""Gets called when shutting down. This lets us persist any updates that
we haven't yet persisted, e.g. updates that only changes some internal
timers. This allows changes to persist across startup without having to
@@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler):
)
logger.info("Finished _on_shutdown")
async def _persist_unpersisted_changes(self):
async def _persist_unpersisted_changes(self) -> None:
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
@@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler):
states, destinations
)
async def _handle_timeouts(self):
async def _handle_timeouts(self) -> None:
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
@@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler):
return await self._update_states(changes)
async def bump_presence_active_time(self, user):
async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler):
return []
async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
):
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
) -> None:
"""Update the syncing users for an external process as a delta.
Args:
process_id (str): An identifier for the process the users are
process_id: An identifier for the process the users are
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing
user_id: The user who has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
with (await self.external_sync_linearizer.queue(process_id)):
prev_state = await self.current_state_for_user(user_id)
@@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler):
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
async def update_external_syncs_clear(self, process_id):
async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process
as offline.
@@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler):
)
self.external_process_last_updated_ms.pop(process_id, None)
async def current_state_for_user(self, user_id):
async def current_state_for_user(self, user_id: str) -> UserPresenceState:
"""Get the current presence state for a user."""
res = await self.current_state_for_users([user_id])
return res[user_id]
async def _persist_and_notify(self, states):
async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
"""Persist states in the database, poke the notifier and send to
interested remote servers
"""
@@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler):
# stream (which is updated by `store.update_presence`).
await self.maybe_send_presence_to_interested_destinations(states)
async def incoming_presence(self, origin, content):
async def incoming_presence(self, origin: str, content: JsonDict) -> None:
"""Called when we receive a `m.presence` EDU from a remote server."""
if not self._presence_enabled:
return
@@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler):
federation_presence_counter.inc(len(updates))
await self._update_states(updates)
async def set_state(self, target_user, state, ignore_status_msg=False):
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
) -> None:
"""Set the presence state of the user."""
status_msg = state.get("status_msg", None)
presence = state["presence"]
@@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler):
await self._update_states([prev_state.copy_and_replace(**new_fields)])
async def is_visible(self, observed_user, observer_user):
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
"""Returns whether a user can see another user's presence."""
observer_room_ids = await self.store.get_rooms_for_user(
observer_user.to_string()
@@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler):
)
return rows
def notify_new_event(self):
def notify_new_event(self) -> None:
"""Called when new events have happened. Handles users and servers
joining rooms and require being sent presence.
"""
@@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler):
run_as_background_process("presence.notify_new_event", _process_presence)
async def _unsafe_process(self):
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
@@ -1188,7 +1192,7 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
async def _handle_state_delta(self, deltas):
async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
"""Process current state deltas to find new joins that need to be
handled.
"""
@@ -1311,7 +1315,7 @@ class PresenceHandler(BasePresenceHandler):
return [remote_host], states
def should_notify(old_state, new_state):
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
"""Decides if a presence state change should be sent to interested parties."""
if old_state == new_state:
return False
@@ -1347,7 +1351,9 @@ def should_notify(old_state, new_state):
return False
def format_user_presence_state(state, now, include_user_id=True):
def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
@@ -1385,11 +1391,11 @@ class PresenceEventSource:
@log_function
async def get_new_events(
self,
user,
from_key,
room_ids=None,
include_offline=True,
explicit_room_id=None,
user: UserID,
from_key: Optional[int],
room_ids: Optional[List[str]] = None,
include_offline: bool = True,
explicit_room_id: Optional[str] = None,
**kwargs,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
@@ -1594,7 +1600,7 @@ class PresenceEventSource:
if update.state != PresenceState.OFFLINE
]
def get_current_key(self):
def get_current_key(self) -> int:
return self.store.get_current_presence_token()
@cached(num_args=2, cache_context=True)
@@ -1654,15 +1660,20 @@ class PresenceEventSource:
return users_interested_in
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
syncing_user_ids: Set[str],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as
appropriate.
Args:
user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours
syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours
syncing_user_ids: Set of user_ids with active syncs.
now: Current time in ms.
Returns:
List of UserPresenceState updates
@@ -1679,14 +1690,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
return list(changes.values())
def handle_timeout(state, is_mine, syncing_user_ids, now):
def handle_timeout(
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state (UserPresenceState)
is_mine (bool): Whether the user is ours
syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
state
is_mine: Whether the user is ours
syncing_user_ids: Set of user_ids with active syncs.
now: Current time in ms.
Returns:
A UserPresenceState update or None if no update.
@@ -1738,23 +1751,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
return state if changed else None
def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
def handle_update(
prev_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
wheel_timer: WheelTimer,
now: int,
) -> Tuple[UserPresenceState, bool, bool]:
"""Given a presence update:
1. Add any appropriate timers.
2. Check if we should notify anyone.
Args:
prev_state (UserPresenceState)
new_state (UserPresenceState)
is_mine (bool): Whether the user is ours
wheel_timer (WheelTimer)
now (int): Time now in ms
prev_state
new_state
is_mine: Whether the user is ours
wheel_timer
now: Time now in ms
Returns:
3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
- new_state: is the state to actually persist
- persist_and_notify (bool): whether to persist and notify people
- federation_ping (bool): whether we should send a ping over federation
- persist_and_notify: whether to persist and notify people
- federation_ping: whether we should send a ping over federation
"""
user_id = new_state.user_id
@@ -2026,18 +2045,40 @@ class PresenceFederationQueue:
)
return result["updates"], result["upto_token"], result["limited"]
# If the from_token is the current token then there's nothing to return
# and we can trivially no-op.
if from_token == self._next_id - 1:
return [], upto_token, False
# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
# Since we are returning all states in the range `from_token < stream_id
# <= upto_token` we look for the index with a `stream_id` of `from_token
# + 1`.
#
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
start_idx = max(from_token - self._next_id, -len(self._queue))
start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
if stream_id <= from_token:
# Paranoia check that we are actually only sending states that
# are have stream_id strictly greater than from_token. We should
# never hit this.
logger.warning(
"Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
stream_id,
from_token,
self._next_id,
len(self._queue),
)
continue
if stream_id > upto_token:
break

View File

@@ -85,7 +85,7 @@ REQUIREMENTS = [
"typing-extensions>=3.7.4",
# We enforce that we have a `cryptography` version that bundles an `openssl`
# with the latest security patches.
"cryptography>=3.4.7;python_version>='3.6'",
"cryptography>=3.4.7",
]
CONDITIONAL_REQUIREMENTS = {
@@ -100,14 +100,9 @@ CONDITIONAL_REQUIREMENTS = {
# that use the protocol, such as Let's Encrypt.
"acme": [
"txacme>=0.9.2",
# txacme depends on eliot. Eliot 1.8.0 is incompatible with
# python 3.5.2, as per https://github.com/itamarst/eliot/issues/418
"eliot<1.8.0;python_version<'3.5.3'",
],
"saml2": [
# pysaml2 6.4.0 is incompatible with Python 3.5 (see https://github.com/IdentityPython/pysaml2/issues/749)
"pysaml2>=4.5.0,<6.4.0;python_version<'3.6'",
"pysaml2>=4.5.0;python_version>='3.6'",
"pysaml2>=4.5.0",
],
"oidc": ["authlib>=0.14.0"],
# systemd-python is necessary for logging to the systemd journal via

View File

@@ -14,6 +14,7 @@
import hashlib
import hmac
import logging
import secrets
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
@@ -375,7 +376,7 @@ class UserRegisterServlet(RestServlet):
"""
self._clear_old_nonces()
nonce = self.hs.get_secrets().token_hex(64)
nonce = secrets.token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
return 200, {"nonce": nonce}

View File

@@ -32,14 +32,6 @@ TEMPLATE_LANGUAGE = "en"
logger = logging.getLogger(__name__)
# use hmac.compare_digest if we have it (python 2.7.7), else just use equality
if hasattr(hmac, "compare_digest"):
compare_digest = hmac.compare_digest
else:
def compare_digest(a, b):
return a == b
class ConsentResource(DirectServeHtmlResource):
"""A twisted Resource to display a privacy policy and gather consent to it
@@ -209,5 +201,5 @@ class ConsentResource(DirectServeHtmlResource):
.encode("ascii")
)
if not compare_digest(want_mac, userhmac):
if not hmac.compare_digest(want_mac, userhmac):
raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect")

View File

@@ -21,7 +21,7 @@ from typing import Callable, List
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
def _wrap_in_base_path(func: "Callable[..., str]") -> "Callable[..., str]":
def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]:
"""Takes a function that returns a relative path and turns it into an
absolute path based on the location of the primary media store
"""

View File

@@ -39,6 +39,13 @@ class WellKnownBuilder:
result = {"m.homeserver": {"base_url": self._config.public_baseurl}}
# Point to BigBlueButton widget for calls
result.update({
"io.element.call_behaviour": {
"widget_build_url": "http://localhost:8184/api/v1/dimension/bigbluebutton/widget_state",
}
})
if self._config.default_identity_server:
result["m.identity_server"] = {
"base_url": self._config.default_identity_server

View File

@@ -1,44 +0,0 @@
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Injectable secrets module for Synapse.
See https://docs.python.org/3/library/secrets.html#module-secrets for the API
used in Python 3.6, and the API emulated in Python 2.7.
"""
import sys
# secrets is available since python 3.6
if sys.version_info[0:2] >= (3, 6):
import secrets
class Secrets:
def token_bytes(self, nbytes: int = 32) -> bytes:
return secrets.token_bytes(nbytes)
def token_hex(self, nbytes: int = 32) -> str:
return secrets.token_hex(nbytes)
else:
import binascii
import os
class Secrets:
def token_bytes(self, nbytes: int = 32) -> bytes:
return os.urandom(nbytes)
def token_hex(self, nbytes: int = 32) -> str:
return binascii.hexlify(self.token_bytes(nbytes)).decode("ascii")

View File

@@ -126,7 +126,6 @@ from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
)
from synapse.secrets import Secrets
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import (
@@ -641,10 +640,6 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_groups_attestation_renewer(self) -> GroupAttestionRenewer:
return GroupAttestionRenewer(self)
@cache_in_self
def get_secrets(self) -> Secrets:
return Secrets()
@cache_in_self
def get_stats_handler(self) -> StatsHandler:
return StatsHandler(self)

View File

@@ -114,7 +114,7 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
db_content = db_content.tobytes()
# Decode it to a Unicode string before feeding it to the JSON decoder, since
# Python 3.5 does not support deserializing bytes.
# it only supports handling strings
if isinstance(db_content, (bytes, bytearray)):
db_content = db_content.decode("utf8")

View File

@@ -171,10 +171,7 @@ class LoggingDatabaseConnection:
# The type of entry which goes on our after_callbacks and exception_callbacks lists.
#
# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
# that mypy sees the type but the runtime python doesn't.
_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]]
R = TypeVar("R")
@@ -221,7 +218,7 @@ class LoggingTransaction:
self.after_callbacks = after_callbacks
self.exception_callbacks = exception_callbacks
def call_after(self, callback: "Callable[..., None]", *args: Any, **kwargs: Any):
def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
@@ -233,7 +230,7 @@ class LoggingTransaction:
self.after_callbacks.append((callback, args, kwargs))
def call_on_exception(
self, callback: "Callable[..., None]", *args: Any, **kwargs: Any
self, callback: Callable[..., None], *args: Any, **kwargs: Any
):
# if self.exception_callbacks is None, that means that whatever constructed the
# LoggingTransaction isn't expecting there to be any callbacks; assert that
@@ -485,7 +482,7 @@ class DatabasePool:
desc: str,
after_callbacks: List[_CallbackListEntry],
exception_callbacks: List[_CallbackListEntry],
func: "Callable[..., R]",
func: Callable[..., R],
*args: Any,
**kwargs: Any,
) -> R:
@@ -618,7 +615,7 @@ class DatabasePool:
async def runInteraction(
self,
desc: str,
func: "Callable[..., R]",
func: Callable[..., R],
*args: Any,
db_autocommit: bool = False,
**kwargs: Any,
@@ -678,7 +675,7 @@ class DatabasePool:
async def runWithConnection(
self,
func: "Callable[..., R]",
func: Callable[..., R],
*args: Any,
db_autocommit: bool = False,
**kwargs: Any,
@@ -718,7 +715,9 @@ class DatabasePool:
# pool).
assert not self.engine.in_transaction(conn)
with LoggingContext("runWithConnection", parent_context) as context:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)

View File

@@ -14,7 +14,7 @@
import logging
from collections import namedtuple
from typing import Dict, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple
from canonicaljson import encode_canonical_json
@@ -295,33 +295,37 @@ class TransactionStore(TransactionWorkerStore):
},
)
async def bulk_store_destination_rooms_entries(
self, room_and_destination_to_ordering: Dict[Tuple[str, str], int]
):
async def store_destination_rooms_entries(
self,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:
"""
Updates or creates `destination_rooms` entries for a number of events.
Updates or creates `destination_rooms` entries in batch for a single event.
Args:
room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id
destinations: list of destinations
room_id: the room_id of the event
stream_ordering: the stream_ordering of the event
"""
await self.db_pool.simple_upsert_many(
table="destinations",
key_names=("destination",),
key_values={(d,) for _, d in room_and_destination_to_ordering.keys()},
key_values=[(d,) for d in destinations],
value_names=[],
value_values=[],
desc="store_destination_rooms_entries_dests",
)
rows = [(destination, room_id) for destination in destinations]
await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("room_id", "destination"),
key_values=list(room_and_destination_to_ordering.keys()),
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[
(stream_id,) for stream_id in room_and_destination_to_ordering.values()
],
value_values=[(stream_ordering,)] * len(rows),
desc="store_destination_rooms_entries_rooms",
)

View File

@@ -17,8 +17,10 @@ from functools import wraps
from typing import (
Any,
Callable,
Collection,
Generic,
Iterable,
List,
Optional,
Type,
TypeVar,
@@ -57,13 +59,56 @@ class _Node:
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
def __init__(
self, prev_node, next_node, key, value, callbacks: Optional[set] = None
self,
prev_node,
next_node,
key,
value,
callbacks: Collection[Callable[[], None]] = (),
):
self.prev_node = prev_node
self.next_node = next_node
self.key = key
self.value = value
self.callbacks = callbacks or set()
# Set of callbacks to run when the node gets deleted. We store as a list
# rather than a set to keep memory usage down (and since we expect few
# entries per node, the performance of checking for duplication in a
# list vs using a set is negligible).
#
# Note that we store this as an optional list to keep the memory
# footprint down. Storing `None` is free as its a singleton, while empty
# lists are 56 bytes (and empty sets are 216 bytes, if we did the naive
# thing and used sets).
self.callbacks = None # type: Optional[List[Callable[[], None]]]
self.add_callbacks(callbacks)
def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
"""Add to stored list of callbacks, removing duplicates."""
if not callbacks:
return
if not self.callbacks:
self.callbacks = []
for callback in callbacks:
if callback not in self.callbacks:
self.callbacks.append(callback)
def run_and_clear_callbacks(self) -> None:
"""Run all callbacks and clear the stored list of callbacks. Used when
the node is being deleted.
"""
if not self.callbacks:
return
for callback in self.callbacks:
callback()
self.callbacks = None
class LruCache(Generic[KT, VT]):
@@ -177,10 +222,10 @@ class LruCache(Generic[KT, VT]):
self.len = synchronized(cache_len)
def add_node(key, value, callbacks: Optional[set] = None):
def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()):
prev_node = list_root
next_node = prev_node.next_node
node = _Node(prev_node, next_node, key, value, callbacks or set())
node = _Node(prev_node, next_node, key, value, callbacks)
prev_node.next_node = node
next_node.prev_node = node
cache[key] = node
@@ -211,16 +256,15 @@ class LruCache(Generic[KT, VT]):
deleted_len = size_callback(node.value)
cached_cache_len[0] -= deleted_len
for cb in node.callbacks:
cb()
node.callbacks.clear()
node.run_and_clear_callbacks()
return deleted_len
@overload
def cache_get(
key: KT,
default: Literal[None] = None,
callbacks: Iterable[Callable[[], None]] = ...,
callbacks: Collection[Callable[[], None]] = ...,
update_metrics: bool = ...,
) -> Optional[VT]:
...
@@ -229,7 +273,7 @@ class LruCache(Generic[KT, VT]):
def cache_get(
key: KT,
default: T,
callbacks: Iterable[Callable[[], None]] = ...,
callbacks: Collection[Callable[[], None]] = ...,
update_metrics: bool = ...,
) -> Union[T, VT]:
...
@@ -238,13 +282,13 @@ class LruCache(Generic[KT, VT]):
def cache_get(
key: KT,
default: Optional[T] = None,
callbacks: Iterable[Callable[[], None]] = (),
callbacks: Collection[Callable[[], None]] = (),
update_metrics: bool = True,
):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
node.callbacks.update(callbacks)
node.add_callbacks(callbacks)
if update_metrics and metrics:
metrics.inc_hits()
return node.value
@@ -260,10 +304,8 @@ class LruCache(Generic[KT, VT]):
# We sometimes store large objects, e.g. dicts, which cause
# the inequality check to take a long time. So let's only do
# the check if we have some callbacks to call.
if node.callbacks and value != node.value:
for cb in node.callbacks:
cb()
node.callbacks.clear()
if value != node.value:
node.run_and_clear_callbacks()
# We don't bother to protect this by value != node.value as
# generally size_callback will be cheap compared with equality
@@ -273,7 +315,7 @@ class LruCache(Generic[KT, VT]):
cached_cache_len[0] -= size_callback(node.value)
cached_cache_len[0] += size_callback(value)
node.callbacks.update(callbacks)
node.add_callbacks(callbacks)
move_node_to_front(node)
node.value = value
@@ -326,8 +368,7 @@ class LruCache(Generic[KT, VT]):
list_root.next_node = list_root
list_root.prev_node = list_root
for node in cache.values():
for cb in node.callbacks:
cb()
node.run_and_clear_callbacks()
cache.clear()
if size_callback:
cached_cache_len[0] = 0

View File

@@ -110,7 +110,7 @@ class ResponseCache(Generic[T]):
return result.observe()
def wrap(
self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
self, key: T, callback: Callable[..., Any], *args: Any, **kwargs: Any
) -> defer.Deferred:
"""Wrap together a *get* and *set* call, taking care of logcontexts

View File

@@ -509,6 +509,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(rows, expected_rows)
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", upto_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
self.assertCountEqual(rows, [])
def test_send_and_get_split(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
@@ -538,6 +546,20 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(rows, expected_rows)
now_token = self.queue.get_current_token(self.instance_name)
rows, upto_token, limited = self.get_success(
self.queue.get_replication_rows("master", upto_token, now_token, 10)
)
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
expected_rows = [
(2, ("dest3", "@user3:test")),
]
self.assertCountEqual(rows, expected_rows)
def test_clear_queue_all(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")

View File

@@ -18,7 +18,7 @@ import json
import urllib.parse
from binascii import unhexlify
from typing import List, Optional
from unittest.mock import Mock
from unittest.mock import Mock, patch
import synapse.rest.admin
from synapse.api.constants import UserTypes
@@ -54,8 +54,6 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.datastore = Mock(return_value=Mock())
self.datastore.get_current_state_deltas = Mock(return_value=(0, []))
self.secrets = Mock()
self.hs = self.setup_test_homeserver()
self.hs.config.registration_shared_secret = "shared"
@@ -84,14 +82,13 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
Calling GET on the endpoint will return a randomised nonce, using the
homeserver's secrets provider.
"""
secrets = Mock()
secrets.token_hex = Mock(return_value="abcd")
with patch("secrets.token_hex") as token_hex:
# Patch secrets.token_hex for the duration of this context
token_hex.return_value = "abcd"
self.hs.get_secrets = Mock(return_value=secrets)
channel = self.make_request("GET", self.url)
channel = self.make_request("GET", self.url)
self.assertEqual(channel.json_body, {"nonce": "abcd"})
self.assertEqual(channel.json_body, {"nonce": "abcd"})
def test_expired_nonce(self):
"""

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import secrets
from tests import unittest
@@ -21,7 +22,7 @@ class UpsertManyTests(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.storage = hs.get_datastore()
self.table_name = "table_" + hs.get_secrets().token_hex(6)
self.table_name = "table_" + secrets.token_hex(6)
self.get_success(
self.storage.db_pool.runInteraction(
"create",

View File

@@ -18,6 +18,7 @@ import hashlib
import hmac
import inspect
import logging
import secrets
import time
from typing import Callable, Dict, Iterable, Optional, Tuple, Type, TypeVar, Union
from unittest.mock import Mock, patch
@@ -626,7 +627,6 @@ class HomeserverTestCase(TestCase):
str: The new event's ID.
"""
event_creator = self.hs.get_event_creation_handler()
secrets = self.hs.get_secrets()
requester = create_requester(user)
event, context = self.get_success(

View File

@@ -21,13 +21,11 @@ deps =
# installed on that).
#
# anyway, make sure that we have a recent enough setuptools.
setuptools>=18.5 ; python_version >= '3.6'
setuptools>=18.5,<51.0.0 ; python_version < '3.6'
setuptools>=18.5
# we also need a semi-recent version of pip, because old ones fail to
# install the "enum34" dependency of cryptography.
pip>=10 ; python_version >= '3.6'
pip>=10,<21.0 ; python_version < '3.6'
pip>=10
# directories/files we run the linters on.
# if you update this list, make sure to do the same in scripts-dev/lint.sh
@@ -168,8 +166,7 @@ skip_install = true
usedevelop = false
deps =
coverage
pip>=10 ; python_version >= '3.6'
pip>=10,<21.0 ; python_version < '3.6'
pip>=10
commands=
coverage combine
coverage report