1
0

Compare commits

..

4 Commits

Author SHA1 Message Date
David Robertson
4d343db081 Get rid of my home dir, whoops 2021-11-16 16:37:43 +00:00
David Robertson
a1367dcf8c Require networkx 2021-11-16 16:34:33 +00:00
David Robertson
9e361c8550 Changelog 2021-11-16 13:52:59 +00:00
David Robertson
51fec1a534 Commit hacky script to visualise store inheritance
Use e.g. with `scripts-dev/storage_inheritance.py DataStore --show`.
2021-11-16 13:51:50 +00:00
70 changed files with 612 additions and 750 deletions

View File

@@ -1,19 +1,3 @@
Synapse 1.47.0 (2021-11-17)
===========================
No significant changes since 1.47.0rc3.
Synapse 1.47.0rc3 (2021-11-16)
==============================
Bugfixes
--------
- Fix a bug introduced in 1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations. ([\#11346](https://github.com/matrix-org/synapse/issues/11346))
- Fix a bug introduced in 1.47.0rc1 which prevented the 'remove deleted devices from `device_inbox` column' background process from running when updating from a recent Synapse version. ([\#11303](https://github.com/matrix-org/synapse/issues/11303), [\#11353](https://github.com/matrix-org/synapse/issues/11353))
Synapse 1.47.0rc2 (2021-11-10)
==============================
@@ -8694,14 +8678,14 @@ General:
Federation:
- Add key distribution mechanisms for fetching public keys of unavailable remote homeservers. See [Retrieving Server Keys](https://github.com/matrix-org/matrix-doc/blob/6f2698/specification/30_server_server_api.rst#retrieving-server-keys) in the spec.
- Add key distribution mechanisms for fetching public keys of unavailable remote home servers. See [Retrieving Server Keys](https://github.com/matrix-org/matrix-doc/blob/6f2698/specification/30_server_server_api.rst#retrieving-server-keys) in the spec.
Configuration:
- Add support for multiple config files.
- Add support for dictionaries in config files.
- Remove support for specifying config options on the command line, except for:
- `--daemonize` - Daemonize the homeserver.
- `--daemonize` - Daemonize the home server.
- `--manhole` - Turn on the twisted telnet manhole service on the given port.
- `--database-path` - The path to a sqlite database to use.
- `--verbose` - The verbosity level.
@@ -8906,7 +8890,7 @@ This version adds support for using a TURN server. See docs/turn-howto.rst on ho
Homeserver:
- Add support for redaction of messages.
- Fix bug where inviting a user on a remote homeserver could take up to 20-30s.
- Fix bug where inviting a user on a remote home server could take up to 20-30s.
- Implement a get current room state API.
- Add support specifying and retrieving turn server configuration.
@@ -8996,7 +8980,7 @@ Changes in synapse 0.2.3 (2014-09-12)
Homeserver:
- Fix bug where we stopped sending events to remote homeservers if a user from that homeserver left, even if there were some still in the room.
- Fix bug where we stopped sending events to remote home servers if a user from that home server left, even if there were some still in the room.
- Fix bugs in the state conflict resolution where it was incorrectly rejecting events.
Webclient:

View File

@@ -1 +0,0 @@
Add type annotations to `synapse.metrics`.

View File

@@ -1 +0,0 @@
Add support for the `/_matrix/client/v3` APIs from Matrix v1.1.

View File

@@ -1 +0,0 @@
Changed the word 'Home server' as one word 'homeserver' in documentation.

View File

@@ -1 +0,0 @@
Add type hints to `synapse.util`.

View File

@@ -1 +0,0 @@
Improve type annotations in Synapse's test suite.

View File

@@ -1 +0,0 @@
Fix a bug, introduced in Synapse 1.46.0, which caused the `check_3pid_auth` and `on_logged_out` callbacks in legacy password authentication provider modules to not be registered. Modules using the generic module API were not affected.

View File

@@ -1 +0,0 @@
Add admin API to un-shadow-ban a user.

View File

@@ -1 +0,0 @@
Fix a bug introduced in 1.41.0 where space hierarchy responses would be incorrectly reused if multiple users were to make the same request at the same time.

1
changelog.d/11357.misc Normal file
View File

@@ -0,0 +1 @@
Add a development script for visualising the storage class inheritance hierarchy.

View File

@@ -1 +0,0 @@
Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.

View File

@@ -1 +0,0 @@
Fix running `scripts-dev/complement.sh`, which was broken in v1.47.0rc1.

View File

@@ -1 +0,0 @@
Rename `get_access_token_for_user_id` to `create_access_token_for_user_id` to better reflect what it does.

View File

@@ -1 +0,0 @@
Add support for the `/_matrix/media/v3` APIs from Matrix v1.1.

View File

@@ -1 +0,0 @@
Trim redundant DataStore inheritance.

12
debian/changelog vendored
View File

@@ -1,15 +1,3 @@
matrix-synapse-py3 (1.47.0) stable; urgency=medium
* New synapse release 1.47.0.
-- Synapse Packaging team <packages@matrix.org> Wed, 17 Nov 2021 13:09:43 +0000
matrix-synapse-py3 (1.47.0~rc3) stable; urgency=medium
* New synapse release 1.47.0~rc3.
-- Synapse Packaging team <packages@matrix.org> Tue, 16 Nov 2021 14:32:47 +0000
matrix-synapse-py3 (1.47.0~rc2) stable; urgency=medium
[ Dan Callahan ]

View File

@@ -48,7 +48,7 @@ WORKERS_CONFIG = {
"app": "synapse.app.user_dir",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
"^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$"
],
"shared_extra_conf": {"update_user_directory": False},
"worker_extra_conf": "",
@@ -85,10 +85,10 @@ WORKERS_CONFIG = {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
"^/_matrix/client/(v2_alpha|r0)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0)/events$",
"^/_matrix/client/(api/v1|r0)/initialSync$",
"^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
@@ -146,11 +146,11 @@ WORKERS_CONFIG = {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|unstable)/join/",
"^/_matrix/client/(api/v1|r0|unstable)/profile/",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
@@ -158,7 +158,7 @@ WORKERS_CONFIG = {
"frontend_proxy": {
"app": "synapse.app.frontend_proxy",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": (
"worker_main_http_uri: http://127.0.0.1:%d"

View File

@@ -948,7 +948,7 @@ The following fields are returned in the JSON response body:
See also the
[Client-Server API Spec on pushers](https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-pushers).
## Controlling whether a user is shadow-banned
## Shadow-banning users
Shadow-banning is a useful tool for moderating malicious or egregiously abusive users.
A shadow-banned users receives successful responses to their client-server API requests,
@@ -961,22 +961,16 @@ or broken behaviour for the client. A shadow-banned user will not receive any
notification and it is generally more appropriate to ban or kick abusive users.
A shadow-banned user will be unable to contact anyone on the server.
To shadow-ban a user the API is:
The API is:
```
POST /_synapse/admin/v1/users/<user_id>/shadow_ban
```
To un-shadow-ban a user the API is:
```
DELETE /_synapse/admin/v1/users/<user_id>/shadow_ban
```
To use it, you will need to authenticate by providing an `access_token` for a
server admin: [Admin API](../usage/administration/admin_api)
An empty JSON dict is returned in both cases.
An empty JSON dict is returned.
**Parameters**

View File

@@ -7,7 +7,7 @@
## Server to Server Stack
To use the server to server stack, homeservers should only need to
To use the server to server stack, home servers should only need to
interact with the Messaging layer.
The server to server side of things is designed into 4 distinct layers:
@@ -23,7 +23,7 @@ Server with a domain specific API.
1. **Messaging Layer**
This is what the rest of the homeserver hits to send messages, join rooms,
This is what the rest of the Home Server hits to send messages, join rooms,
etc. It also allows you to register callbacks for when it get's notified by
lower levels that e.g. a new message has been received.
@@ -45,7 +45,7 @@ Server with a domain specific API.
For incoming PDUs, it has to check the PDUs it references to see
if we have missed any. If we have go and ask someone (another
homeserver) for it.
home server) for it.
3. **Transaction Layer**

View File

@@ -1,7 +1,7 @@
<h2 style="color:red">
This page of the Synapse documentation is now deprecated. For up to date
documentation on setting up or writing a password auth provider module, please see
<a href="modules/index.md">this page</a>.
<a href="modules.md">this page</a>.
</h2>
# Password auth provider modules

View File

@@ -1,12 +1,12 @@
# Overview
This document explains how to enable VoIP relaying on your homeserver with
This document explains how to enable VoIP relaying on your Home Server with
TURN.
The synapse Matrix homeserver supports integration with TURN server via the
The synapse Matrix Home Server supports integration with TURN server via the
[TURN server REST API](<https://tools.ietf.org/html/draft-uberti-behave-turn-rest-00>). This
allows the homeserver to generate credentials that are valid for use on the
TURN server through the use of a secret shared between the homeserver and the
allows the Home Server to generate credentials that are valid for use on the
TURN server through the use of a secret shared between the Home Server and the
TURN server.
The following sections describe how to install [coturn](<https://github.com/coturn/coturn>) (which implements the TURN REST API) and integrate it with synapse.
@@ -165,18 +165,18 @@ This will install and start a systemd service called `coturn`.
## Synapse setup
Your homeserver configuration file needs the following extra keys:
Your home server configuration file needs the following extra keys:
1. "`turn_uris`": This needs to be a yaml list of public-facing URIs
for your TURN server to be given out to your clients. Add separate
entries for each transport your TURN server supports.
2. "`turn_shared_secret`": This is the secret shared between your
homeserver and your TURN server, so you should set it to the same
Home server and your TURN server, so you should set it to the same
string you used in turnserver.conf.
3. "`turn_user_lifetime`": This is the amount of time credentials
generated by your homeserver are valid for (in milliseconds).
generated by your Home Server are valid for (in milliseconds).
Shorter times offer less potential for abuse at the expense of
increased traffic between web clients and your homeserver to
increased traffic between web clients and your home server to
refresh credentials. The TURN REST API specification recommends
one day (86400000).
4. "`turn_allow_guests`": Whether to allow guest users to use the
@@ -220,7 +220,7 @@ Here are a few things to try:
anyone who has successfully set this up.
* Check that you have opened your firewall to allow TCP and UDP traffic to the
TURN ports (normally 3478 and 5349).
TURN ports (normally 3478 and 5479).
* Check that you have opened your firewall to allow UDP traffic to the UDP
relay ports (49152-65535 by default).

View File

@@ -182,10 +182,10 @@ This worker can handle API requests matching the following regular
expressions:
# Sync requests
^/_matrix/client/(v2_alpha|r0|v3)/sync$
^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$
^/_matrix/client/(api/v1|r0|v3)/initialSync$
^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$
^/_matrix/client/(v2_alpha|r0)/sync$
^/_matrix/client/(api/v1|v2_alpha|r0)/events$
^/_matrix/client/(api/v1|r0)/initialSync$
^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$
# Federation requests
^/_matrix/federation/v1/event/
@@ -216,40 +216,40 @@ expressions:
^/_matrix/federation/v1/send/
# Client API requests
^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$
^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$
^/_matrix/client/(api/v1|r0|unstable)/createRoom$
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/spaces$
^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/hierarchy$
^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$
^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$
^/_matrix/client/(api/v1|r0|v3|unstable)/devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/query$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/changes$
^/_matrix/client/(api/v1|r0|unstable)/account/3pid$
^/_matrix/client/(api/v1|r0|unstable)/devices$
^/_matrix/client/(api/v1|r0|unstable)/keys/query$
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|v3|unstable)/publicised_groups$
^/_matrix/client/(api/v1|r0|v3|unstable)/publicised_groups/
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/event/
^/_matrix/client/(api/v1|r0|unstable)/joined_rooms$
^/_matrix/client/(api/v1|r0|unstable)/search$
# Registration/login requests
^/_matrix/client/(api/v1|r0|v3|unstable)/login$
^/_matrix/client/(r0|v3|unstable)/register$
^/_matrix/client/(api/v1|r0|unstable)/login$
^/_matrix/client/(r0|unstable)/register$
^/_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity$
# Event sending requests
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state/
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|v3|unstable)/join/
^/_matrix/client/(api/v1|r0|v3|unstable)/profile/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
Additionally, the following REST endpoints can be handled for GET requests:
@@ -261,14 +261,14 @@ room must be routed to the same instance. Additionally, care must be taken to
ensure that the purge history admin API is not used while pagination requests
for the room are in flight:
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$
Additionally, the following endpoints should be included if Synapse is configured
to use SSO (you only need to include the ones for whichever SSO provider you're
using):
# for all SSO providers
^/_matrix/client/(api/v1|r0|v3|unstable)/login/sso/redirect
^/_matrix/client/(api/v1|r0|unstable)/login/sso/redirect
^/_synapse/client/pick_idp$
^/_synapse/client/pick_username
^/_synapse/client/new_user_consent$
@@ -281,7 +281,7 @@ using):
^/_synapse/client/saml2/authn_response$
# CAS requests.
^/_matrix/client/(api/v1|r0|v3|unstable)/login/cas/ticket$
^/_matrix/client/(api/v1|r0|unstable)/login/cas/ticket$
Ensure that all SSO logins go to a single process.
For multiple workers not handling the SSO endpoints properly, see
@@ -465,7 +465,7 @@ Note that if a reverse proxy is used , then `/_matrix/media/` must be routed for
Handles searches in the user directory. It can handle REST endpoints matching
the following regular expressions:
^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$
^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$
When using this worker you must also set `update_user_directory: False` in the
shared configuration file to stop the main synapse running background
@@ -477,12 +477,12 @@ Proxies some frequently-requested client endpoints to add caching and remove
load from the main synapse. It can handle REST endpoints matching the following
regular expressions:
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
If `use_presence` is False in the homeserver config, it can also handle REST
endpoints matching the following regular expressions:
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/[^/]+/status
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status
This "stub" presence handler will pass through `GET` request but make the
`PUT` effectively a no-op.

View File

@@ -160,9 +160,6 @@ disallow_untyped_defs = True
[mypy-synapse.handlers.*]
disallow_untyped_defs = True
[mypy-synapse.metrics.*]
disallow_untyped_defs = True
[mypy-synapse.push.*]
disallow_untyped_defs = True
@@ -199,11 +196,92 @@ disallow_untyped_defs = True
[mypy-synapse.streams.*]
disallow_untyped_defs = True
[mypy-synapse.util.*]
[mypy-synapse.util.batching_queue]
disallow_untyped_defs = True
[mypy-synapse.util.caches.treecache]
disallow_untyped_defs = False
[mypy-synapse.util.caches.cached_call]
disallow_untyped_defs = True
[mypy-synapse.util.caches.dictionary_cache]
disallow_untyped_defs = True
[mypy-synapse.util.caches.lrucache]
disallow_untyped_defs = True
[mypy-synapse.util.caches.response_cache]
disallow_untyped_defs = True
[mypy-synapse.util.caches.stream_change_cache]
disallow_untyped_defs = True
[mypy-synapse.util.caches.ttl_cache]
disallow_untyped_defs = True
[mypy-synapse.util.daemonize]
disallow_untyped_defs = True
[mypy-synapse.util.file_consumer]
disallow_untyped_defs = True
[mypy-synapse.util.frozenutils]
disallow_untyped_defs = True
[mypy-synapse.util.hash]
disallow_untyped_defs = True
[mypy-synapse.util.httpresourcetree]
disallow_untyped_defs = True
[mypy-synapse.util.iterutils]
disallow_untyped_defs = True
[mypy-synapse.util.linked_list]
disallow_untyped_defs = True
[mypy-synapse.util.logcontext]
disallow_untyped_defs = True
[mypy-synapse.util.logformatter]
disallow_untyped_defs = True
[mypy-synapse.util.macaroons]
disallow_untyped_defs = True
[mypy-synapse.util.manhole]
disallow_untyped_defs = True
[mypy-synapse.util.module_loader]
disallow_untyped_defs = True
[mypy-synapse.util.msisdn]
disallow_untyped_defs = True
[mypy-synapse.util.patch_inline_callbacks]
disallow_untyped_defs = True
[mypy-synapse.util.ratelimitutils]
disallow_untyped_defs = True
[mypy-synapse.util.retryutils]
disallow_untyped_defs = True
[mypy-synapse.util.rlimit]
disallow_untyped_defs = True
[mypy-synapse.util.stringutils]
disallow_untyped_defs = True
[mypy-synapse.util.templates]
disallow_untyped_defs = True
[mypy-synapse.util.threepids]
disallow_untyped_defs = True
[mypy-synapse.util.wheel_timer]
disallow_untyped_defs = True
[mypy-synapse.util.versionstring]
disallow_untyped_defs = True
[mypy-tests.handlers.test_user_directory]
disallow_untyped_defs = True

View File

@@ -24,7 +24,7 @@
set -e
# Change to the repository root
cd "$(dirname $0)/.."
cd "$(dirname "$0")/.."
# Check for a user-specified Complement checkout
if [[ -z "$COMPLEMENT_DIR" ]]; then
@@ -61,8 +61,8 @@ cd "$COMPLEMENT_DIR"
EXTRA_COMPLEMENT_ARGS=""
if [[ -n "$1" ]]; then
# A test name regex has been set, supply it to Complement
EXTRA_COMPLEMENT_ARGS+="-run $1 "
EXTRA_COMPLEMENT_ARGS=(-run "$1")
fi
# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 "${EXTRA_COMPLEMENT_ARGS[@]}" ./tests/...

View File

@@ -0,0 +1,179 @@
#! /usr/bin/env python3
import argparse
import os
import re
import subprocess
import sys
import tempfile
from typing import Iterable, Optional, Set
import networkx
def scrape_storage_classes() -> str:
"""Grep the for classes ending with "Store" and extract their list of parents.
Returns the stdout from `rg` as a single string."""
# TODO: this is a big hack which assumes that each Store class has a unique name.
# That assumption is wrong: there are two DirectoryStores, one in
# synapse/replication/slave/storage/directory.py and the other in
# synapse/storage/databases/main/directory.py
# Would be nice to have a way to account for this.
return subprocess.check_output(
[
"rg",
"-o",
"--no-line-number",
"--no-filename",
"--multiline",
r"class .*Store\((.|\n)*?\):$",
"synapse",
"tests",
],
).decode()
oneline_class_pattern = re.compile(r"^class (.*)\((.*)\):$")
opening_class_pattern = re.compile(r"^class (.*)\($")
def load_graph(lines: Iterable[str]) -> networkx.DiGraph:
"""Process the output of scrape_storage_classes to build an inheritance graph.
Every time a class C is created that explicitly inherits from a parent P, we add an
edge C -> P.
"""
G = networkx.DiGraph()
child: Optional[str] = None
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
if (match := oneline_class_pattern.match(line)) is not None:
child, parents = match.groups()
for parent in parents.split(", "):
if "metaclass" not in parent:
G.add_edge(child, parent)
child = None
elif (match := opening_class_pattern.match(line)) is not None:
(child,) = match.groups()
elif line == "):":
child = None
else:
assert child is not None, repr(line)
parent = line.strip(",")
if "metaclass" not in parent:
G.add_edge(child, parent)
return G
def select_vertices_of_interest(G: networkx.DiGraph, target: Optional[str]) -> Set[str]:
"""Find all nodes we want to visualise.
If no TARGET is given, we visualise all of G. Otherwise we visualise a given
TARGET, its parents, and all of their parents recursively.
Requires that G is a DAG.
If not None, the TARGET must belong to G.
"""
assert networkx.is_directed_acyclic_graph(G)
if target is not None:
component: Set[str] = networkx.descendants(G, target)
component.add(target)
else:
component = set(G.nodes)
return component
def generate_dot_source(G: networkx.DiGraph, nodes: Set[str]) -> str:
output = """\
strict digraph {
rankdir="LR";
node [shape=box];
"""
for (child, parent) in G.edges:
if child in nodes and parent in nodes:
output += f" {child} -> {parent};\n"
output += "}\n"
return output
def render_png(dot_source: str, destination: Optional[str]) -> str:
if destination is None:
handle, destination = tempfile.mkstemp()
os.close(handle)
print("Warning: writing to", destination, "which will persist", file=sys.stderr)
subprocess.run(
[
"dot",
"-o",
destination,
"-Tpng",
],
input=dot_source,
encoding="utf-8",
check=True,
)
return destination
def show_graph(location: str) -> None:
subprocess.run(
["xdg-open", location],
check=True,
)
def main(parser: argparse.ArgumentParser, args: argparse.Namespace) -> int:
if not (args.output or args.show):
parser.print_help(file=sys.stderr)
print("Must either --output or --show, or both.", file=sys.stderr)
return os.EX_USAGE
lines = scrape_storage_classes().split("\n")
G = load_graph(lines)
nodes = select_vertices_of_interest(G, args.target)
dot_source = generate_dot_source(G, nodes)
output_location = render_png(dot_source, args.output)
if args.show:
show_graph(output_location)
return os.EX_OK
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Visualise the inheritance of Synapse's storage classes. Requires "
"ripgrep (https://github.com/BurntSushi/ripgrep) as 'rg'; graphviz "
"(https://graphviz.org/) for the 'dot' program; and networkx "
"(https://networkx.org/). Requires Python 3.8+ for the walrus"
"operator."
)
parser.add_argument(
"target",
nargs="?",
help="Show only TARGET and its ancestors. Otherwise, show the entire hierarchy.",
)
parser.add_argument(
"--output",
nargs=1,
help="Render inheritance graph to a png file.",
)
parser.add_argument(
"--show",
action="store_true",
help="Open the inheritance graph in an image viewer.",
)
return parser
if __name__ == "__main__":
parser = build_parser()
args = parser.parse_args()
sys.exit(main(parser, args))

View File

@@ -135,6 +135,8 @@ CONDITIONAL_REQUIREMENTS["dev"] = (
# The following are executed as commands by the release script.
"twine",
"towncrier",
# For storage_inheritance script
"networkx==2.6.3",
]
)

View File

@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.47.0"
__version__ = "1.47.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -30,8 +30,7 @@ FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_R0_PREFIX = "/_matrix/media/r0"
MEDIA_V3_PREFIX = "/_matrix/media/v3"
MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"

View File

@@ -402,7 +402,7 @@ async def start(hs: "HomeServer") -> None:
if hasattr(signal, "SIGHUP"):
@wrap_as_background_process("sighup")
async def handle_sighup(*args: Any, **kwargs: Any) -> None:
def handle_sighup(*args: Any, **kwargs: Any) -> None:
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"RELOADING=1")

View File

@@ -28,11 +28,13 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.handlers.admin import ExfiltrationWriter
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
@@ -57,7 +59,9 @@ class AdminCmdSlavedStore(
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
BaseSlavedStore,
RoomWorkerStore,
):
pass

View File

@@ -26,8 +26,7 @@ from synapse.api.urls import (
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
MEDIA_R0_PREFIX,
MEDIA_V3_PREFIX,
MEDIA_PREFIX,
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
@@ -48,12 +47,14 @@ from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
@@ -235,6 +236,7 @@ class GenericWorkerSlavedStore(
SlavedPusherStore,
CensorEventsStore,
ClientIpWorkerStore,
SlavedEventStore,
SlavedKeyStore,
RoomWorkerStore,
DirectoryStore,
@@ -250,6 +252,7 @@ class GenericWorkerSlavedStore(
TransactionWorkerStore,
LockStore,
SessionStore,
BaseSlavedStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
@@ -335,8 +338,7 @@ class GenericWorkerServer(HomeServer):
resources.update(
{
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
"/_synapse/admin": admin_resource,
}

View File

@@ -29,8 +29,7 @@ from synapse import events
from synapse.api.urls import (
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
MEDIA_R0_PREFIX,
MEDIA_V3_PREFIX,
MEDIA_PREFIX,
SERVER_KEY_V2_PREFIX,
STATIC_PREFIX,
WEB_CLIENT_PREFIX,
@@ -194,7 +193,6 @@ class SynapseHomeServer(HomeServer):
{
"/_matrix/client/api/v1": client_resource,
"/_matrix/client/r0": client_resource,
"/_matrix/client/v3": client_resource,
"/_matrix/client/unstable": client_resource,
"/_matrix/client/v2_alpha": client_resource,
"/_matrix/client/versions": client_resource,
@@ -246,11 +244,7 @@ class SynapseHomeServer(HomeServer):
if self.config.server.enable_media_repo:
media_repo = self.get_media_repository_resource()
resources.update(
{
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
}
{MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo}
)
elif name == "media":
raise ConfigError(

View File

@@ -40,8 +40,6 @@ from typing import TYPE_CHECKING, Optional, Tuple
from signedjson.sign import sign_json
from twisted.internet.defer import Deferred
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, get_domain_from_id
@@ -168,7 +166,7 @@ class GroupAttestionRenewer:
return {}
def _start_renew_attestations(self) -> "Deferred[None]":
def _start_renew_attestations(self) -> None:
return run_as_background_process("renew_attestations", self._renew_attestations)
async def _renew_attestations(self) -> None:

View File

@@ -793,7 +793,7 @@ class AuthHandler:
) = await self.get_refresh_token_for_user_id(
user_id=existing_token.user_id, device_id=existing_token.device_id
)
access_token = await self.create_access_token_for_user_id(
access_token = await self.get_access_token_for_user_id(
user_id=existing_token.user_id,
device_id=existing_token.device_id,
valid_until_ms=valid_until_ms,
@@ -855,7 +855,7 @@ class AuthHandler:
)
return refresh_token, refresh_token_id
async def create_access_token_for_user_id(
async def get_access_token_for_user_id(
self,
user_id: str,
device_id: Optional[str],
@@ -1828,6 +1828,13 @@ def load_single_legacy_password_auth_provider(
logger.error("Error while initializing %r: %s", module, e)
raise
# The known hooks. If a module implements a method who's name appears in this set
# we'll want to register it
password_auth_provider_methods = {
"check_3pid_auth",
"on_logged_out",
}
# All methods that the module provides should be async, but this wasn't enforced
# in the old module system, so we wrap them if needed
def async_wrapper(f: Optional[Callable]) -> Optional[Callable[..., Awaitable]]:
@@ -1912,14 +1919,11 @@ def load_single_legacy_password_auth_provider(
return run
# If the module has these methods implemented, then we pull them out
# and register them as hooks.
check_3pid_auth_hook: Optional[CHECK_3PID_AUTH_CALLBACK] = async_wrapper(
getattr(provider, "check_3pid_auth", None)
)
on_logged_out_hook: Optional[ON_LOGGED_OUT_CALLBACK] = async_wrapper(
getattr(provider, "on_logged_out", None)
)
# populate hooks with the implemented methods, wrapped with async_wrapper
hooks = {
hook: async_wrapper(getattr(provider, hook, None))
for hook in password_auth_provider_methods
}
supported_login_types = {}
# call get_supported_login_types and add that to the dict
@@ -1946,11 +1950,7 @@ def load_single_legacy_password_auth_provider(
# need to use a tuple here for ("password",) not a list since lists aren't hashable
auth_checkers[(LoginType.PASSWORD, ("password",))] = check_password
api.register_password_auth_provider_callbacks(
check_3pid_auth=check_3pid_auth_hook,
on_logged_out=on_logged_out_hook,
auth_checkers=auth_checkers,
)
api.register_password_auth_provider_callbacks(hooks, auth_checkers=auth_checkers)
CHECK_3PID_AUTH_CALLBACK = Callable[

View File

@@ -819,7 +819,7 @@ class RegistrationHandler:
)
valid_until_ms = self.clock.time_msec() + self.access_token_lifetime
access_token = await self._auth_handler.create_access_token_for_user_id(
access_token = await self._auth_handler.get_access_token_for_user_id(
user_id,
device_id=registered_device_id,
valid_until_ms=valid_until_ms,

View File

@@ -97,7 +97,7 @@ class RoomSummaryHandler:
# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
self._pagination_response_cache: ResponseCache[
Tuple[str, str, bool, Optional[int], Optional[int], Optional[str]]
Tuple[str, bool, Optional[int], Optional[int], Optional[str]]
] = ResponseCache(
hs.get_clock(),
"get_room_hierarchy",
@@ -282,14 +282,7 @@ class RoomSummaryHandler:
# This is due to the pagination process mutating internal state, attempting
# to process multiple requests for the same page will result in errors.
return await self._pagination_response_cache.wrap(
(
requester,
requested_room_id,
suggested_only,
max_depth,
limit,
from_token,
),
(requested_room_id, suggested_only, max_depth, limit, from_token),
self._get_room_hierarchy,
requester,
requested_room_id,

View File

@@ -90,7 +90,7 @@ class FollowerTypingHandler:
self.wheel_timer = WheelTimer(bucket_size=5000)
@wrap_as_background_process("typing._handle_timeouts")
async def _handle_timeouts(self) -> None:
def _handle_timeouts(self) -> None:
logger.debug("Checking for typing timeouts")
now = self.clock.time_msec()

View File

@@ -20,25 +20,10 @@ import os
import platform
import threading
import time
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)
from typing import Callable, Dict, Iterable, Mapping, Optional, Tuple, Union
import attr
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
@@ -47,7 +32,6 @@ from prometheus_client.core import (
)
from twisted.internet import reactor
from twisted.internet.base import ReactorBase
from twisted.python.threadpool import ThreadPool
import synapse
@@ -70,7 +54,7 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy:
@staticmethod
def collect() -> Iterable[Metric]:
def collect():
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
@@ -90,7 +74,7 @@ class LaterGauge:
]
)
def collect(self) -> Iterable[Metric]:
def collect(self):
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
@@ -109,10 +93,10 @@ class LaterGauge:
yield g
def __attrs_post_init__(self) -> None:
def __attrs_post_init__(self):
self._register()
def _register(self) -> None:
def _register(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
@@ -121,12 +105,7 @@ class LaterGauge:
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`,
# but `Protocol` can't be used as a `TypeVar` bound.
MetricsEntry = TypeVar("MetricsEntry")
class InFlightGauge(Generic[MetricsEntry]):
class InFlightGauge:
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
@@ -136,19 +115,14 @@ class InFlightGauge(Generic[MetricsEntry]):
callbacks.
Args:
name
desc
labels
sub_metrics: A list of sub metrics that the callbacks will update.
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""
def __init__(
self,
name: str,
desc: str,
labels: Sequence[str],
sub_metrics: Sequence[str],
):
def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
@@ -156,25 +130,19 @@ class InFlightGauge(Generic[MetricsEntry]):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class: Type[MetricsEntry] = attr.make_class(
self._metrics_class = attr.make_class(
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)
# Counts number of in flight blocks for a given set of label values
self._registrations: Dict[
Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
] = {}
self._registrations: Dict = {}
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
def register(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
@@ -190,17 +158,13 @@ class InFlightGauge(Generic[MetricsEntry]):
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
def unregister(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`."""
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
def collect(self) -> Iterable[Metric]:
def collect(self):
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
@@ -236,7 +200,7 @@ class InFlightGauge(Generic[MetricsEntry]):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
@@ -266,7 +230,7 @@ class GaugeBucketCollector:
name: str,
documentation: str,
buckets: Iterable[float],
registry: CollectorRegistry = REGISTRY,
registry=REGISTRY,
):
"""
Args:
@@ -293,12 +257,12 @@ class GaugeBucketCollector:
registry.register(self)
def collect(self) -> Iterable[Metric]:
def collect(self):
# Don't report metrics unless we've already collected some data
if self._metric is not None:
yield self._metric
def update_data(self, values: Iterable[float]) -> None:
def update_data(self, values: Iterable[float]):
"""Update the data to be reported by the metric
The existing data is cleared, and each measurement in the input is assigned
@@ -340,7 +304,7 @@ class GaugeBucketCollector:
class CPUMetrics:
def __init__(self) -> None:
def __init__(self):
ticks_per_sec = 100
try:
# Try and get the system config
@@ -350,7 +314,7 @@ class CPUMetrics:
self.ticks_per_sec = ticks_per_sec
def collect(self) -> Iterable[Metric]:
def collect(self):
if not HAVE_PROC_SELF_STAT:
return
@@ -400,7 +364,7 @@ gc_time = Histogram(
class GCCounts:
def collect(self) -> Iterable[Metric]:
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
@@ -418,7 +382,7 @@ if not running_on_pypy:
class PyPyGCStats:
def collect(self) -> Iterable[Metric]:
def collect(self):
# @stats is a pretty-printer object with __str__() returning a nice table,
# plus some fields that contain data from that table.
@@ -601,7 +565,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None:
class ReactorLastSeenMetric:
def collect(self) -> Iterable[Metric]:
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
@@ -620,12 +584,9 @@ MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
_last_gc = [0.0, 0.0, 0.0]
F = TypeVar("F", bound=Callable[..., Any])
def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
def runUntilCurrentTimer(reactor, func):
@functools.wraps(func)
def f(*args: Any, **kwargs: Any) -> Any:
def f(*args, **kwargs):
now = reactor.seconds()
num_pending = 0
@@ -688,7 +649,7 @@ def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
return ret
return cast(F, f)
return f
try:
@@ -716,5 +677,5 @@ __all__ = [
"start_http_server",
"LaterGauge",
"InFlightGauge",
"GaugeBucketCollector",
"BucketCollector",
]

View File

@@ -25,25 +25,27 @@ import math
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Any, Dict, List, Type, Union
from typing import Dict, List
from urllib.parse import parse_qs, urlparse
from prometheus_client import REGISTRY, CollectorRegistry
from prometheus_client.core import Sample
from prometheus_client import REGISTRY
from twisted.web.resource import Resource
from twisted.web.server import Request
from synapse.util import caches
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
def floatToGoString(d: Union[int, float]) -> str:
INF = float("inf")
MINUS_INF = float("-inf")
def floatToGoString(d):
d = float(d)
if d == math.inf:
if d == INF:
return "+Inf"
elif d == -math.inf:
elif d == MINUS_INF:
return "-Inf"
elif math.isnan(d):
return "NaN"
@@ -58,7 +60,7 @@ def floatToGoString(d: Union[int, float]) -> str:
return s
def sample_line(line: Sample, name: str) -> str:
def sample_line(line, name):
if line.labels:
labelstr = "{{{0}}}".format(
",".join(
@@ -80,7 +82,7 @@ def sample_line(line: Sample, name: str) -> str:
return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp)
def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> bytes:
def generate_latest(registry, emit_help=False):
# Trigger the cache metrics to be rescraped, which updates the common
# metrics but do not produce metrics themselves
@@ -185,7 +187,7 @@ class MetricsHandler(BaseHTTPRequestHandler):
registry = REGISTRY
def do_GET(self) -> None:
def do_GET(self):
registry = self.registry
params = parse_qs(urlparse(self.path).query)
@@ -205,11 +207,11 @@ class MetricsHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(output)
def log_message(self, format: str, *args: Any) -> None:
def log_message(self, format, *args):
"""Log nothing."""
@classmethod
def factory(cls, registry: CollectorRegistry) -> Type:
def factory(cls, registry):
"""Returns a dynamic MetricsHandler class tied
to the passed registry.
"""
@@ -234,9 +236,7 @@ class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
def start_http_server(
port: int, addr: str = "", registry: CollectorRegistry = REGISTRY
) -> None:
def start_http_server(port, addr="", registry=REGISTRY):
"""Starts an HTTP server for prometheus metrics as a daemon thread"""
CustomMetricsHandler = MetricsHandler.factory(registry)
httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler)
@@ -252,10 +252,10 @@ class MetricsResource(Resource):
isLeaf = True
def __init__(self, registry: CollectorRegistry = REGISTRY):
def __init__(self, registry=REGISTRY):
self.registry = registry
def render_GET(self, request: Request) -> bytes:
def render_GET(self, request):
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
response = generate_latest(self.registry)
request.setHeader(b"Content-Length", str(len(response)))

View File

@@ -15,37 +15,19 @@
import logging
import threading
from functools import wraps
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Iterable,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from typing import TYPE_CHECKING, Dict, Optional, Set, Union
from prometheus_client import Metric
from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
PreserveLoggingContext,
)
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.logging.opentracing import (
SynapseTags,
noop_context_manager,
start_active_span,
)
from synapse.util.async_helpers import maybe_awaitable
if TYPE_CHECKING:
import resource
@@ -134,7 +116,7 @@ class _Collector:
before they are returned.
"""
def collect(self) -> Iterable[Metric]:
def collect(self):
global _background_processes_active_since_last_scrape
# We swap out the _background_processes set with an empty one so that
@@ -162,12 +144,12 @@ REGISTRY.register(_Collector())
class _BackgroundProcess:
def __init__(self, desc: str, ctx: LoggingContext):
def __init__(self, desc, ctx):
self.desc = desc
self._context = ctx
self._reported_stats: Optional[ContextResourceUsage] = None
self._reported_stats = None
def update_metrics(self) -> None:
def update_metrics(self):
"""Updates the metrics with values from this process."""
new_stats = self._context.get_resource_usage()
if self._reported_stats is None:
@@ -187,16 +169,7 @@ class _BackgroundProcess:
)
R = TypeVar("R")
def run_as_background_process(
desc: str,
func: Callable[..., Awaitable[Optional[R]]],
*args: Any,
bg_start_span: bool = True,
**kwargs: Any,
) -> "defer.Deferred[Optional[R]]":
def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -216,13 +189,11 @@ def run_as_background_process(
args: positional args for func
kwargs: keyword args for func
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
Returns: Deferred which returns the result of func, but note that it does not
follow the synapse logcontext rules.
"""
async def run() -> Optional[R]:
async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
@@ -239,13 +210,12 @@ def run_as_background_process(
else:
ctx = noop_context_manager()
with ctx:
return await func(*args, **kwargs)
return await maybe_awaitable(func(*args, **kwargs))
except Exception:
logger.exception(
"Background process '%s' threw an exception",
desc,
)
return None
finally:
_background_process_in_flight_count.labels(desc).dec()
@@ -255,24 +225,19 @@ def run_as_background_process(
return defer.ensureDeferred(run())
F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]])
def wrap_as_background_process(desc: str) -> Callable[[F], F]:
def wrap_as_background_process(desc):
"""Decorator that wraps a function that gets called as a background
process.
Equivalent to calling the function with `run_as_background_process`
Equivalent of calling the function with `run_as_background_process`
"""
def wrap_as_background_process_inner(func: F) -> F:
def wrap_as_background_process_inner(func):
@wraps(func)
def wrap_as_background_process_inner_2(
*args: Any, **kwargs: Any
) -> "defer.Deferred[Optional[R]]":
def wrap_as_background_process_inner_2(*args, **kwargs):
return run_as_background_process(desc, func, *args, **kwargs)
return cast(F, wrap_as_background_process_inner_2)
return wrap_as_background_process_inner_2
return wrap_as_background_process_inner
@@ -300,7 +265,7 @@ class BackgroundProcessLoggingContext(LoggingContext):
super().__init__("%s-%s" % (name, instance_id))
self._proc = _BackgroundProcess(name, self)
def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
def start(self, rusage: "Optional[resource.struct_rusage]"):
"""Log context has started running (again)."""
super().start(rusage)
@@ -311,12 +276,7 @@ class BackgroundProcessLoggingContext(LoggingContext):
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.add(self._proc)
def __exit__(
self,
type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
def __exit__(self, type, value, traceback) -> None:
"""Log context has finished."""
super().__exit__(type, value, traceback)

View File

@@ -16,16 +16,14 @@ import ctypes
import logging
import os
import re
from typing import Iterable, Optional
from prometheus_client import Metric
from typing import Optional
from synapse.metrics import REGISTRY, GaugeMetricFamily
logger = logging.getLogger(__name__)
def _setup_jemalloc_stats() -> None:
def _setup_jemalloc_stats():
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
statistics exposed by jemalloc.
"""
@@ -137,7 +135,7 @@ def _setup_jemalloc_stats() -> None:
class JemallocCollector:
"""Metrics for internal jemalloc stats."""
def collect(self) -> Iterable[Metric]:
def collect(self):
_jemalloc_refresh_stats()
g = GaugeMetricFamily(
@@ -187,7 +185,7 @@ def _setup_jemalloc_stats() -> None:
logger.debug("Added jemalloc stats")
def setup_jemalloc_stats() -> None:
def setup_jemalloc_stats():
"""Try to setup jemalloc stats, if jemalloc is loaded."""
try:

View File

@@ -898,7 +898,7 @@ class UserTokenRestServlet(RestServlet):
if auth_user.to_string() == user_id:
raise SynapseError(400, "Cannot use admin API to login as self")
token = await self.auth_handler.create_access_token_for_user_id(
token = await self.auth_handler.get_access_token_for_user_id(
user_id=auth_user.to_string(),
device_id=None,
valid_until_ms=valid_until_ms,
@@ -909,7 +909,7 @@ class UserTokenRestServlet(RestServlet):
class ShadowBanRestServlet(RestServlet):
"""An admin API for controlling whether a user is shadow-banned.
"""An admin API for shadow-banning a user.
A shadow-banned users receives successful responses to their client-server
API requests, but the events are not propagated into rooms.
@@ -917,19 +917,11 @@ class ShadowBanRestServlet(RestServlet):
Shadow-banning a user should be used as a tool of last resort and may lead
to confusing or broken behaviour for the client.
Example of shadow-banning a user:
Example:
POST /_synapse/admin/v1/users/@test:example.com/shadow_ban
{}
200 OK
{}
Example of removing a user from being shadow-banned:
DELETE /_synapse/admin/v1/users/@test:example.com/shadow_ban
{}
200 OK
{}
"""
@@ -953,18 +945,6 @@ class ShadowBanRestServlet(RestServlet):
return 200, {}
async def on_DELETE(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
if not self.hs.is_mine_id(user_id):
raise SynapseError(400, "Only local users can be shadow-banned")
await self.store.set_shadow_banned(UserID.from_string(user_id), False)
return 200, {}
class RateLimitRestServlet(RestServlet):
"""An admin API to override ratelimiting for an user.

View File

@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
def client_patterns(
path_regex: str,
releases: Iterable[str] = ("r0", "v3"),
releases: Iterable[int] = (0,),
unstable: bool = True,
v1: bool = False,
) -> Iterable[Pattern]:
@@ -52,7 +52,7 @@ def client_patterns(
v1_prefix = CLIENT_API_PREFIX + "/api/v1"
patterns.append(re.compile("^" + v1_prefix + path_regex))
for release in releases:
new_prefix = CLIENT_API_PREFIX + f"/{release}"
new_prefix = CLIENT_API_PREFIX + "/r%d" % (release,)
patterns.append(re.compile("^" + new_prefix + path_regex))
return patterns

View File

@@ -262,7 +262,7 @@ class SigningKeyUploadServlet(RestServlet):
}
"""
PATTERNS = client_patterns("/keys/device_signing/upload$", releases=("v3",))
PATTERNS = client_patterns("/keys/device_signing/upload$", releases=())
def __init__(self, hs: "HomeServer"):
super().__init__()

View File

@@ -188,7 +188,7 @@ class LoggingDatabaseConnection:
# The type of entry which goes on our after_callbacks and exception_callbacks lists.
_CallbackListEntry = Tuple[Callable[..., object], Iterable[Any], Dict[str, Any]]
_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]]
R = TypeVar("R")
@@ -235,7 +235,7 @@ class LoggingTransaction:
self.after_callbacks = after_callbacks
self.exception_callbacks = exception_callbacks
def call_after(self, callback: Callable[..., object], *args: Any, **kwargs: Any):
def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
@@ -247,7 +247,7 @@ class LoggingTransaction:
self.after_callbacks.append((callback, args, kwargs))
def call_on_exception(
self, callback: Callable[..., object], *args: Any, **kwargs: Any
self, callback: Callable[..., None], *args: Any, **kwargs: Any
):
# if self.exception_callbacks is None, that means that whatever constructed the
# LoggingTransaction isn't expecting there to be any callbacks; assert that

View File

@@ -31,6 +31,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpStore
from .deviceinbox import DeviceInboxStore
@@ -48,6 +49,7 @@ from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .openid import OpenIdStore
from .presence import PresenceStore
from .profile import ProfileStore
@@ -61,9 +63,11 @@ from .relations import RelationsStore
from .room import RoomStore
from .room_batch import RoomBatchStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .session import SessionStore
from .signatures import SignatureStore
from .state import StateStore
from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionWorkerStore
@@ -103,6 +107,7 @@ class DataStore(
ReceiptsStore,
EndToEndKeyStore,
EndToEndRoomKeyStore,
SearchStore,
TagsStore,
AccountDataStore,
EventPushActionsStore,
@@ -113,10 +118,13 @@ class DataStore(
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
StatsStore,
RelationsStore,
CensorEventsStore,
UIAuthStore,
EventForwardExtremitiesStore,
CacheInvalidationWorkerStore,
ServerMetricsStore,
LockStore,
SessionStore,

View File

@@ -17,7 +17,7 @@ from typing import Iterable, List, Optional, Tuple
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.types import RoomAlias
from synapse.util.caches.descriptors import cached

View File

@@ -17,7 +17,7 @@ from typing import Any, Dict, List
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
logger = logging.getLogger(__name__)

View File

@@ -16,7 +16,7 @@ import logging
from typing import Any, List, Set, Tuple
from synapse.api.errors import SynapseError
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken

View File

@@ -476,7 +476,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
shadow_banned: true iff the user is to be shadow-banned, false otherwise.
"""
def set_shadow_banned_txn(txn: LoggingTransaction) -> None:
def set_shadow_banned_txn(txn):
user_id = user.to_string()
self.db_pool.simple_update_one_txn(
txn,

View File

@@ -15,7 +15,7 @@
from typing import Dict, Iterable
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.util.caches.descriptors import cached, cachedList

View File

@@ -131,16 +131,24 @@ def prepare_database(
"config==None in prepare_database, but database is not empty"
)
# This should be run on all processes, master or worker. The master will
# apply the deltas, while workers will check if any outstanding deltas
# exist and raise an PrepareDatabaseException if they do.
_upgrade_existing_database(
cur,
version_info,
database_engine,
config,
databases=databases,
)
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker.worker_app is None:
_upgrade_existing_database(
cur,
version_info,
database_engine,
config,
databases=databases,
)
elif version_info.current_version < SCHEMA_VERSION:
# If the DB is on an older version than we expect then we refuse
# to start the worker (as the main process needs to run first to
# update the schema).
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR
% (SCHEMA_VERSION, version_info.current_version)
)
else:
logger.info("%r: Initialising new database", databases)
@@ -350,18 +358,6 @@ def _upgrade_existing_database(
is_worker = config and config.worker.worker_app is not None
# If the schema version needs to be updated, and we are on a worker, we immediately
# know to bail out as workers cannot update the database schema. Only one process
# must update the database at the time, therefore we delegate this task to the master.
if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
# If the DB is on an older version than we expect then we refuse
# to start the worker (as the main process needs to run first to
# update the schema).
raise UpgradeDatabaseException(
OUTDATED_SCHEMA_ON_WORKER_ERROR
% (SCHEMA_VERSION, current_schema_state.current_version)
)
if (
current_schema_state.compat_version is not None
and current_schema_state.compat_version > SCHEMA_VERSION

View File

@@ -18,17 +18,5 @@
-- when a device was deleted using Synapse earlier than 1.47.0.
-- This runs as background task, but may take a bit to finish.
-- Remove any existing instances of this job running. It's OK to stop and restart this job,
-- as it's just deleting entries from a table - no progress will be lost.
--
-- This is necessary due a similar migration running the job accidentally
-- being included in schema version 64 during v1.47.0rc1,rc2. If a
-- homeserver had updated from Synapse <=v1.45.0 (schema version <=64),
-- then they would have started running this background update already.
-- If that update was still running, then simply inserting it again would
-- cause an SQL failure. So we effectively do an "upsert" here instead.
DELETE FROM background_updates WHERE update_name = 'remove_deleted_devices_from_device_inbox';
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6506, 'remove_deleted_devices_from_device_inbox', '{}');
(6505, 'remove_deleted_devices_from_device_inbox', '{}');

View File

@@ -27,7 +27,6 @@ from typing import (
Generic,
Hashable,
Iterable,
Iterator,
Optional,
Set,
TypeVar,
@@ -41,6 +40,7 @@ from typing_extensions import ContextManager
from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.python import failure
from twisted.python.failure import Failure
from synapse.logging.context import (
@@ -78,7 +78,7 @@ class ObservableDeferred(Generic[_T]):
object.__setattr__(self, "_result", None)
object.__setattr__(self, "_observers", [])
def callback(r: _T) -> _T:
def callback(r):
object.__setattr__(self, "_result", (True, r))
# once we have set _result, no more entries will be added to _observers,
@@ -98,7 +98,7 @@ class ObservableDeferred(Generic[_T]):
)
return r
def errback(f: Failure) -> Optional[Failure]:
def errback(f):
object.__setattr__(self, "_result", (False, f))
# once we have set _result, no more entries will be added to _observers,
@@ -109,7 +109,7 @@ class ObservableDeferred(Generic[_T]):
for observer in observers:
# This is a little bit of magic to correctly propagate stack
# traces when we `await` on one of the observer deferreds.
f.value.__failure__ = f # type: ignore[union-attr]
f.value.__failure__ = f
try:
observer.errback(f)
except Exception as e:
@@ -314,7 +314,7 @@ class Linearizer:
# will release the lock.
@contextmanager
def _ctx_manager(_: None) -> Iterator[None]:
def _ctx_manager(_):
try:
yield
finally:
@@ -355,7 +355,7 @@ class Linearizer:
new_defer = make_deferred_yieldable(defer.Deferred())
entry.deferreds[new_defer] = 1
def cb(_r: None) -> "defer.Deferred[None]":
def cb(_r):
logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry.count += 1
@@ -371,7 +371,7 @@ class Linearizer:
# code must be synchronous, so this is the only sensible place.)
return self._clock.sleep(0)
def eb(e: Failure) -> Failure:
def eb(e):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
@@ -435,7 +435,7 @@ class ReadWriteLock:
await make_deferred_yieldable(curr_writer)
@contextmanager
def _ctx_manager() -> Iterator[None]:
def _ctx_manager():
try:
yield
finally:
@@ -464,7 +464,7 @@ class ReadWriteLock:
await make_deferred_yieldable(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager() -> Iterator[None]:
def _ctx_manager():
try:
yield
finally:
@@ -524,7 +524,7 @@ def timeout_deferred(
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value: Failure) -> Failure:
def convert_cancelled(value: failure.Failure):
# if the original deferred was cancelled, and our timeout has fired, then
# the reason it was cancelled was due to our timeout. Turn the CancelledError
# into a TimeoutError.
@@ -534,7 +534,7 @@ def timeout_deferred(
deferred.addErrback(convert_cancelled)
def cancel_timeout(result: _T) -> _T:
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
@@ -542,11 +542,11 @@ def timeout_deferred(
deferred.addBoth(cancel_timeout)
def success_cb(val: _T) -> None:
def success_cb(val):
if not new_d.called:
new_d.callback(val)
def failure_cb(val: Failure) -> None:
def failure_cb(val):
if not new_d.called:
new_d.errback(val)
@@ -557,13 +557,13 @@ def timeout_deferred(
# This class can't be generic because it uses slots with attrs.
# See: https://github.com/python-attrs/attrs/issues/313
@attr.s(slots=True, frozen=True, auto_attribs=True)
@attr.s(slots=True, frozen=True)
class DoneAwaitable: # should be: Generic[R]
"""Simple awaitable that returns the provided value."""
value: Any # should be: R
value = attr.ib(type=Any) # should be: R
def __await__(self) -> Any:
def __await__(self):
return self
def __iter__(self) -> "DoneAwaitable":

View File

@@ -17,7 +17,7 @@ import logging
import typing
from enum import Enum, auto
from sys import intern
from typing import Any, Callable, Dict, List, Optional, Sized
from typing import Callable, Dict, Optional, Sized
import attr
from prometheus_client.core import Gauge
@@ -58,20 +58,20 @@ class EvictionReason(Enum):
time = auto()
@attr.s(slots=True, auto_attribs=True)
@attr.s(slots=True)
class CacheMetric:
_cache: Sized
_cache_type: str
_cache_name: str
_collect_callback: Optional[Callable]
_cache = attr.ib()
_cache_type = attr.ib(type=str)
_cache_name = attr.ib(type=str)
_collect_callback = attr.ib(type=Optional[Callable])
hits: int = 0
misses: int = 0
hits = attr.ib(default=0)
misses = attr.ib(default=0)
eviction_size_by_reason: typing.Counter[EvictionReason] = attr.ib(
factory=collections.Counter
)
memory_usage: Optional[int] = None
memory_usage = attr.ib(default=None)
def inc_hits(self) -> None:
self.hits += 1
@@ -89,14 +89,13 @@ class CacheMetric:
self.memory_usage += memory
def dec_memory_usage(self, memory: int) -> None:
assert self.memory_usage is not None
self.memory_usage -= memory
def clear_memory_usage(self) -> None:
if self.memory_usage is not None:
self.memory_usage = 0
def describe(self) -> List[str]:
def describe(self):
return []
def collect(self) -> None:
@@ -119,9 +118,8 @@ class CacheMetric:
self.eviction_size_by_reason[reason]
)
cache_total.labels(self._cache_name).set(self.hits + self.misses)
max_size = getattr(self._cache, "max_size", None)
if max_size:
cache_max_size.labels(self._cache_name).set(max_size)
if getattr(self._cache, "max_size", None):
cache_max_size.labels(self._cache_name).set(self._cache.max_size)
if TRACK_MEMORY_USAGE:
# self.memory_usage can be None if nothing has been inserted
@@ -195,7 +193,7 @@ KNOWN_KEYS = {
}
def intern_string(string: Optional[str]) -> Optional[str]:
def intern_string(string):
"""Takes a (potentially) unicode string and interns it if it's ascii"""
if string is None:
return None
@@ -206,7 +204,7 @@ def intern_string(string: Optional[str]) -> Optional[str]:
return string
def intern_dict(dictionary: Dict[str, Any]) -> Dict[str, Any]:
def intern_dict(dictionary):
"""Takes a dictionary and interns well known keys and their values"""
return {
KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
@@ -214,7 +212,7 @@ def intern_dict(dictionary: Dict[str, Any]) -> Dict[str, Any]:
}
def _intern_known_values(key: str, value: Any) -> Any:
def _intern_known_values(key, value):
intern_keys = ("event_id", "room_id", "sender", "user_id", "type", "state_key")
if key in intern_keys:

View File

@@ -289,7 +289,7 @@ class DeferredCache(Generic[KT, VT]):
callbacks = [callback] if callback else []
self.cache.set(key, value, callbacks=callbacks)
def invalidate(self, key: KT) -> None:
def invalidate(self, key) -> None:
"""Delete a key, or tree of entries
If the cache is backed by a regular dict, then "key" must be of

View File

@@ -19,15 +19,12 @@ import logging
from typing import (
Any,
Callable,
Dict,
Generic,
Hashable,
Iterable,
Mapping,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
cast,
@@ -35,7 +32,6 @@ from typing import (
from weakref import WeakValueDictionary
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.util import unwrapFirstError
@@ -64,12 +60,7 @@ class _CachedFunction(Generic[F]):
class _CacheDescriptorBase:
def __init__(
self,
orig: Callable[..., Any],
num_args: Optional[int],
cache_context: bool = False,
):
def __init__(self, orig: Callable[..., Any], num_args, cache_context=False):
self.orig = orig
arg_spec = inspect.getfullargspec(orig)
@@ -181,14 +172,14 @@ class LruCacheDescriptor(_CacheDescriptorBase):
def __init__(
self,
orig: Callable[..., Any],
orig,
max_entries: int = 1000,
cache_context: bool = False,
):
super().__init__(orig, num_args=None, cache_context=cache_context)
self.max_entries = max_entries
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
def __get__(self, obj, owner):
cache: LruCache[CacheKey, Any] = LruCache(
cache_name=self.orig.__name__,
max_size=self.max_entries,
@@ -198,7 +189,7 @@ class LruCacheDescriptor(_CacheDescriptorBase):
sentinel = LruCacheDescriptor._Sentinel.sentinel
@functools.wraps(self.orig)
def _wrapped(*args: Any, **kwargs: Any) -> Any:
def _wrapped(*args, **kwargs):
invalidate_callback = kwargs.pop("on_invalidate", None)
callbacks = (invalidate_callback,) if invalidate_callback else ()
@@ -254,19 +245,19 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
return r1 + r2
Args:
num_args: number of positional arguments (excluding ``self`` and
num_args (int): number of positional arguments (excluding ``self`` and
``cache_context``) to use as cache keys. Defaults to all named
args of the function.
"""
def __init__(
self,
orig: Callable[..., Any],
max_entries: int = 1000,
num_args: Optional[int] = None,
tree: bool = False,
cache_context: bool = False,
iterable: bool = False,
orig,
max_entries=1000,
num_args=None,
tree=False,
cache_context=False,
iterable=False,
prune_unread_entries: bool = True,
):
super().__init__(orig, num_args=num_args, cache_context=cache_context)
@@ -281,7 +272,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
self.iterable = iterable
self.prune_unread_entries = prune_unread_entries
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
def __get__(self, obj, owner):
cache: DeferredCache[CacheKey, Any] = DeferredCache(
name=self.orig.__name__,
max_entries=self.max_entries,
@@ -293,7 +284,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
get_cache_key = self.cache_key_builder
@functools.wraps(self.orig)
def _wrapped(*args: Any, **kwargs: Any) -> Any:
def _wrapped(*args, **kwargs):
# If we're passed a cache_context then we'll want to call its invalidate()
# whenever we are invalidated
invalidate_callback = kwargs.pop("on_invalidate", None)
@@ -344,19 +335,13 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
of results.
"""
def __init__(
self,
orig: Callable[..., Any],
cached_method_name: str,
list_name: str,
num_args: Optional[int] = None,
):
def __init__(self, orig, cached_method_name, list_name, num_args=None):
"""
Args:
orig
cached_method_name: The name of the cached method.
list_name: Name of the argument which is the bulk lookup list
num_args: number of positional arguments (excluding ``self``,
orig (function)
cached_method_name (str): The name of the cached method.
list_name (str): Name of the argument which is the bulk lookup list
num_args (int): number of positional arguments (excluding ``self``,
but including list_name) to use as cache keys. Defaults to all
named args of the function.
"""
@@ -375,15 +360,13 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
% (self.list_name, cached_method_name)
)
def __get__(
self, obj: Optional[Any], objtype: Optional[Type] = None
) -> Callable[..., Any]:
def __get__(self, obj, objtype=None):
cached_method = getattr(obj, self.cached_method_name)
cache: DeferredCache[CacheKey, Any] = cached_method.cache
num_args = cached_method.num_args
@functools.wraps(self.orig)
def wrapped(*args: Any, **kwargs: Any) -> Any:
def wrapped(*args, **kwargs):
# If we're passed a cache_context then we'll want to call its
# invalidate() whenever we are invalidated
invalidate_callback = kwargs.pop("on_invalidate", None)
@@ -394,7 +377,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
results = {}
def update_results_dict(res: Any, arg: Hashable) -> None:
def update_results_dict(res, arg):
results[arg] = res
# list of deferreds to wait for
@@ -406,13 +389,13 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
# otherwise a tuple is used.
if num_args == 1:
def arg_to_cache_key(arg: Hashable) -> Hashable:
def arg_to_cache_key(arg):
return arg
else:
keylist = list(keyargs)
def arg_to_cache_key(arg: Hashable) -> Hashable:
def arg_to_cache_key(arg):
keylist[self.list_pos] = arg
return tuple(keylist)
@@ -438,7 +421,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
key = arg_to_cache_key(arg)
cache.set(key, deferred, callback=invalidate_callback)
def complete_all(res: Dict[Hashable, Any]) -> None:
def complete_all(res):
# the wrapped function has completed. It returns a
# a dict. We can now resolve the observable deferreds in
# the cache and update our own result map.
@@ -447,7 +430,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
deferreds_map[e].callback(val)
results[e] = val
def errback(f: Failure) -> Failure:
def errback(f):
# the wrapped function has failed. Invalidate any cache
# entries we're supposed to be populating, and fail
# their deferreds.

View File

@@ -19,8 +19,6 @@ from typing import Any, Generic, Optional, TypeVar, Union, overload
import attr
from typing_extensions import Literal
from twisted.internet import defer
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
@@ -83,7 +81,7 @@ class ExpiringCache(Generic[KT, VT]):
# Don't bother starting the loop if things never expire
return
def f() -> "defer.Deferred[None]":
def f():
return run_as_background_process(
"prune_cache_%s" % self._cache_name, self._prune_cache
)
@@ -159,7 +157,7 @@ class ExpiringCache(Generic[KT, VT]):
self[key] = value
return value
async def _prune_cache(self) -> None:
def _prune_cache(self) -> None:
if not self._expiry_ms:
# zero expiry time means don't expire. This should never get called
# since we have this check in start too.
@@ -212,7 +210,7 @@ class ExpiringCache(Generic[KT, VT]):
return False
@attr.s(slots=True, auto_attribs=True)
@attr.s(slots=True)
class _CacheEntry:
time: int
value: Any
time = attr.ib(type=int)
value = attr.ib()

View File

@@ -18,13 +18,12 @@ from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
def user_left_room(distributor: "Distributor", user: UserID, room_id: str) -> None:
def user_left_room(distributor, user, room_id):
distributor.fire("user_left_room", user=user, room_id=room_id)
@@ -64,7 +63,7 @@ class Distributor:
self.pre_registration[name] = []
self.pre_registration[name].append(observer)
def fire(self, name: str, *args: Any, **kwargs: Any) -> None:
def fire(self, name: str, *args, **kwargs) -> None:
"""Dispatches the given signal to the registered observers.
Runs the observers as a background process. Does not return a deferred.
@@ -96,7 +95,7 @@ class Signal:
Each observer callable may return a Deferred."""
self.observers.append(observer)
def fire(self, *args: Any, **kwargs: Any) -> "defer.Deferred[List[Any]]":
def fire(self, *args, **kwargs) -> "defer.Deferred[List[Any]]":
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
not an error to fire a signal with no observers.
@@ -104,7 +103,7 @@ class Signal:
Returns a Deferred that will complete when all the observers have
completed."""
async def do(observer: Callable[..., Any]) -> Any:
async def do(observer):
try:
return await maybe_awaitable(observer(*args, **kwargs))
except Exception as e:
@@ -121,5 +120,5 @@ class Signal:
defer.gatherResults(deferreds, consumeErrors=True)
)
def __repr__(self) -> str:
def __repr__(self):
return "<Signal name=%r>" % (self.name,)

View File

@@ -3,52 +3,23 @@
# We copy it here as we need to instantiate `GAIResolver` manually, but it is a
# private class.
from socket import (
AF_INET,
AF_INET6,
AF_UNSPEC,
SOCK_DGRAM,
SOCK_STREAM,
AddressFamily,
SocketKind,
gaierror,
getaddrinfo,
)
from typing import (
TYPE_CHECKING,
Callable,
List,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
Union,
)
from zope.interface import implementer
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.interfaces import (
IAddress,
IHostnameResolver,
IHostResolution,
IReactorThreads,
IResolutionReceiver,
)
from twisted.internet.interfaces import IHostnameResolver, IHostResolution
from twisted.internet.threads import deferToThreadPool
if TYPE_CHECKING:
# The types below are copied from
# https://github.com/twisted/twisted/blob/release-21.2.0-10091/src/twisted/internet/interfaces.py
# so that the type hints can match the interfaces.
from twisted.python.runtime import platform
if platform.supportsThreads():
from twisted.python.threadpool import ThreadPool
else:
ThreadPool = object # type: ignore[misc, assignment]
@implementer(IHostResolution)
class HostResolution:
@@ -56,13 +27,13 @@ class HostResolution:
The in-progress resolution of a given hostname.
"""
def __init__(self, name: str):
def __init__(self, name):
"""
Create a L{HostResolution} with the given name.
"""
self.name = name
def cancel(self) -> NoReturn:
def cancel(self):
# IHostResolution.cancel
raise NotImplementedError()
@@ -91,17 +62,6 @@ _socktypeToType = {
}
_GETADDRINFO_RESULT = List[
Tuple[
AddressFamily,
SocketKind,
int,
str,
Union[Tuple[str, int], Tuple[str, int, int, int]],
]
]
@implementer(IHostnameResolver)
class GAIResolver:
"""
@@ -109,12 +69,7 @@ class GAIResolver:
L{getaddrinfo} in a thread.
"""
def __init__(
self,
reactor: IReactorThreads,
getThreadPool: Optional[Callable[[], "ThreadPool"]] = None,
getaddrinfo: Callable[[str, int, int, int], _GETADDRINFO_RESULT] = getaddrinfo,
):
def __init__(self, reactor, getThreadPool=None, getaddrinfo=getaddrinfo):
"""
Create a L{GAIResolver}.
@param reactor: the reactor to schedule result-delivery on
@@ -134,16 +89,14 @@ class GAIResolver:
)
self._getaddrinfo = getaddrinfo
# The types on IHostnameResolver is incorrect in Twisted, see
# https://twistedmatrix.com/trac/ticket/10276
def resolveHostName( # type: ignore[override]
def resolveHostName(
self,
resolutionReceiver: IResolutionReceiver,
hostName: str,
portNumber: int = 0,
addressTypes: Optional[Sequence[Type[IAddress]]] = None,
transportSemantics: str = "TCP",
) -> IHostResolution:
resolutionReceiver,
hostName,
portNumber=0,
addressTypes=None,
transportSemantics="TCP",
):
"""
See L{IHostnameResolver.resolveHostName}
@param resolutionReceiver: see interface
@@ -159,7 +112,7 @@ class GAIResolver:
]
socketType = _transportToSocket[transportSemantics]
def get() -> _GETADDRINFO_RESULT:
def get():
try:
return self._getaddrinfo(
hostName, portNumber, addressFamily, socketType
@@ -172,7 +125,7 @@ class GAIResolver:
resolutionReceiver.resolutionBegan(resolution)
@d.addCallback
def deliverResults(result: _GETADDRINFO_RESULT) -> None:
def deliverResults(result):
for family, socktype, _proto, _cannoname, sockaddr in result:
addrType = _afToType[family]
resolutionReceiver.addressResolved(

View File

@@ -56,22 +56,14 @@ block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]
)
# This is dynamically created in InFlightGauge.__init__.
class _InFlightMetric(Protocol):
real_time_max: float
real_time_sum: float
# Tracks the number of blocks currently active
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
in_flight = InFlightGauge(
"synapse_util_metrics_block_in_flight",
"",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)
T = TypeVar("T", bound=Callable[..., Any])
@@ -188,7 +180,7 @@ class Measure:
"""
return self._logging_context.get_resource_usage()
def _update_in_flight(self, metrics: _InFlightMetric) -> None:
def _update_in_flight(self, metrics) -> None:
"""Gets called when processing in flight metrics"""
assert self.start is not None
duration = self.clock.time() - self.start

View File

@@ -116,7 +116,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
self.auth_blocking._limit_usage_by_mau = False
# Ensure does not throw exception
self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user1, device_id=None, valid_until_ms=None
)
)
@@ -134,7 +134,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
)
self.get_failure(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user1, device_id=None, valid_until_ms=None
),
ResourceLimitError,
@@ -162,7 +162,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
# If not in monthly active cohort
self.get_failure(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user1, device_id=None, valid_until_ms=None
),
ResourceLimitError,
@@ -179,7 +179,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
return_value=make_awaitable(self.clock.time_msec())
)
self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user1, device_id=None, valid_until_ms=None
)
)
@@ -197,7 +197,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
)
# Ensure does not raise exception
self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user1, device_id=None, valid_until_ms=None
)
)

View File

@@ -193,8 +193,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
@override_config({"limit_usage_by_mau": True})
def test_get_or_create_user_mau_not_blocked(self):
# Type ignore: mypy doesn't like us assigning to methods.
self.store.count_monthly_users = Mock( # type: ignore[assignment]
self.store.count_monthly_users = Mock(
return_value=make_awaitable(self.hs.config.server.max_mau_value - 1)
)
# Ensure does not throw exception
@@ -202,8 +201,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
@override_config({"limit_usage_by_mau": True})
def test_get_or_create_user_mau_blocked(self):
# Type ignore: mypy doesn't like us assigning to methods.
self.store.get_monthly_active_count = Mock( # type: ignore[assignment]
self.store.get_monthly_active_count = Mock(
return_value=make_awaitable(self.lots_of_users)
)
self.get_failure(
@@ -211,8 +209,7 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
ResourceLimitError,
)
# Type ignore: mypy doesn't like us assigning to methods.
self.store.get_monthly_active_count = Mock( # type: ignore[assignment]
self.store.get_monthly_active_count = Mock(
return_value=make_awaitable(self.hs.config.server.max_mau_value)
)
self.get_failure(

View File

@@ -14,8 +14,6 @@
from typing import Any, Iterable, List, Optional, Tuple
from unittest import mock
from twisted.internet.defer import ensureDeferred
from synapse.api.constants import (
EventContentFields,
EventTypes,
@@ -318,59 +316,6 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase):
AuthError,
)
def test_room_hierarchy_cache(self) -> None:
"""In-flight room hierarchy requests are deduplicated."""
# Run two `get_room_hierarchy` calls up until they block.
deferred1 = ensureDeferred(
self.handler.get_room_hierarchy(self.user, self.space)
)
deferred2 = ensureDeferred(
self.handler.get_room_hierarchy(self.user, self.space)
)
# Complete the two calls.
result1 = self.get_success(deferred1)
result2 = self.get_success(deferred2)
# Both `get_room_hierarchy` calls should return the same result.
expected = [(self.space, [self.room]), (self.room, ())]
self._assert_hierarchy(result1, expected)
self._assert_hierarchy(result2, expected)
self.assertIs(result1, result2)
# A subsequent `get_room_hierarchy` call should not reuse the result.
result3 = self.get_success(
self.handler.get_room_hierarchy(self.user, self.space)
)
self._assert_hierarchy(result3, expected)
self.assertIsNot(result1, result3)
def test_room_hierarchy_cache_sharing(self) -> None:
"""Room hierarchy responses for different users are not shared."""
user2 = self.register_user("user2", "pass")
# Make the room within the space invite-only.
self.helper.send_state(
self.room,
event_type=EventTypes.JoinRules,
body={"join_rule": JoinRules.INVITE},
tok=self.token,
)
# Run two `get_room_hierarchy` calls for different users up until they block.
deferred1 = ensureDeferred(
self.handler.get_room_hierarchy(self.user, self.space)
)
deferred2 = ensureDeferred(self.handler.get_room_hierarchy(user2, self.space))
# Complete the two calls.
result1 = self.get_success(deferred1)
result2 = self.get_success(deferred2)
# The `get_room_hierarchy` calls should return different results.
self._assert_hierarchy(result1, [(self.space, [self.room]), (self.room, ())])
self._assert_hierarchy(result2, [(self.space, [self.room])])
def _create_room_with_join_rule(
self, join_rule: str, room_version: Optional[str] = None, **extra_content
) -> str:

View File

@@ -1169,14 +1169,14 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# regardless of whether password login or SSO is allowed
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.admin_user, device_id=None, valid_until_ms=None
)
)
self.other_user = self.register_user("user", "pass", displayname="User")
self.other_user_token = self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.other_user, device_id=None, valid_until_ms=None
)
)
@@ -3592,34 +3592,31 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
@parameterized.expand(["POST", "DELETE"])
def test_no_auth(self, method: str):
def test_no_auth(self):
"""
Try to get information of an user without authentication.
"""
channel = self.make_request(method, self.url)
channel = self.make_request("POST", self.url)
self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
def test_requester_is_not_admin(self, method: str):
def test_requester_is_not_admin(self):
"""
If the user is not a server admin, an error is returned.
"""
other_user_token = self.login("user", "pass")
channel = self.make_request(method, self.url, access_token=other_user_token)
channel = self.make_request("POST", self.url, access_token=other_user_token)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
def test_user_is_not_local(self, method: str):
def test_user_is_not_local(self):
"""
Tests that shadow-banning for a user that is not a local returns a 400
"""
url = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain"
channel = self.make_request(method, url, access_token=self.admin_user_tok)
channel = self.make_request("POST", url, access_token=self.admin_user_tok)
self.assertEqual(400, channel.code, msg=channel.json_body)
def test_success(self):
@@ -3639,17 +3636,6 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
result = self.get_success(self.store.get_user_by_access_token(other_user_token))
self.assertTrue(result.shadow_banned)
# Un-shadow-ban the user.
channel = self.make_request(
"DELETE", self.url, access_token=self.admin_user_tok
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual({}, channel.json_body)
# Ensure the user is no longer shadow-banned (and the cache was cleared).
result = self.get_success(self.store.get_user_by_access_token(other_user_token))
self.assertFalse(result.shadow_banned)
class RateLimitTestCase(unittest.HomeserverTestCase):

View File

@@ -71,7 +71,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
@override_config({"password_config": {"localdb_enabled": False}})
def test_get_change_password_capabilities_localdb_disabled(self):
access_token = self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user, device_id=None, valid_until_ms=None
)
)
@@ -85,7 +85,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
@override_config({"password_config": {"enabled": False}})
def test_get_change_password_capabilities_password_disabled(self):
access_token = self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user, device_id=None, valid_until_ms=None
)
)
@@ -174,7 +174,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
@override_config({"experimental_features": {"msc3244_enabled": False}})
def test_get_does_not_include_msc3244_fields_when_disabled(self):
access_token = self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user, device_id=None, valid_until_ms=None
)
)
@@ -189,7 +189,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
def test_get_does_include_msc3244_fields_when_enabled(self):
access_token = self.get_success(
self.auth_handler.create_access_token_for_user_id(
self.auth_handler.get_access_token_for_user_id(
self.user, device_id=None, valid_until_ms=None
)
)

View File

@@ -28,12 +28,11 @@ from typing import (
MutableMapping,
Optional,
Tuple,
overload,
Union,
)
from unittest.mock import patch
import attr
from typing_extensions import Literal
from twisted.web.resource import Resource
from twisted.web.server import Site
@@ -56,32 +55,6 @@ class RestHelper:
site = attr.ib(type=Site)
auth_user_id = attr.ib()
@overload
def create_room_as(
self,
room_creator: Optional[str] = ...,
is_public: Optional[bool] = ...,
room_version: Optional[str] = ...,
tok: Optional[str] = ...,
expect_code: Literal[200] = ...,
extra_content: Optional[Dict] = ...,
custom_headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = ...,
) -> str:
...
@overload
def create_room_as(
self,
room_creator: Optional[str] = ...,
is_public: Optional[bool] = ...,
room_version: Optional[str] = ...,
tok: Optional[str] = ...,
expect_code: int = ...,
extra_content: Optional[Dict] = ...,
custom_headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = ...,
) -> Optional[str]:
...
def create_room_as(
self,
room_creator: Optional[str] = None,
@@ -91,7 +64,7 @@ class RestHelper:
expect_code: int = 200,
extra_content: Optional[Dict] = None,
custom_headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = None,
) -> Optional[str]:
) -> str:
"""
Create a room.
@@ -134,8 +107,6 @@ class RestHelper:
if expect_code == 200:
return channel.json_body["room_id"]
else:
return None
def invite(self, room=None, src=None, targ=None, expect_code=200, tok=None):
self.change_membership(
@@ -205,7 +176,7 @@ class RestHelper:
extra_data: Optional[dict] = None,
tok: Optional[str] = None,
expect_code: int = 200,
expect_errcode: Optional[str] = None,
expect_errcode: str = None,
) -> None:
"""
Send a membership state event into a room.
@@ -289,7 +260,9 @@ class RestHelper:
txn_id=None,
tok=None,
expect_code=200,
custom_headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = None,
custom_headers: Optional[
Iterable[Tuple[Union[bytes, str], Union[bytes, str]]]
] = None,
):
if txn_id is None:
txn_id = "m%s" % (str(time.time()))
@@ -536,7 +509,7 @@ class RestHelper:
went.
"""
cookies: Dict[str, str] = {}
cookies = {}
# if we're doing a ui auth, hit the ui auth redirect endpoint
if ui_auth_session_id:
@@ -658,13 +631,7 @@ class RestHelper:
# hit the redirect url again with the right Host header, which should now issue
# a cookie and redirect to the SSO provider.
def get_location(channel: FakeChannel) -> str:
location_values = channel.headers.getRawHeaders("Location")
# Keep mypy happy by asserting that location_values is nonempty
assert location_values
return location_values[0]
location = get_location(channel)
location = channel.headers.getRawHeaders("Location")[0]
parts = urllib.parse.urlsplit(location)
channel = make_request(
self.hs.get_reactor(),
@@ -678,7 +645,7 @@ class RestHelper:
assert channel.code == 302
channel.extract_cookies(cookies)
return get_location(channel)
return channel.headers.getRawHeaders("Location")[0]
def initiate_sso_ui_auth(
self, ui_auth_session_id: str, cookies: MutableMapping[str, str]

View File

@@ -24,7 +24,6 @@ from typing import (
MutableMapping,
Optional,
Tuple,
Type,
Union,
)
@@ -227,7 +226,7 @@ def make_request(
path: Union[bytes, str],
content: Union[bytes, str, JsonDict] = b"",
access_token: Optional[str] = None,
request: Type[Request] = SynapseRequest,
request: Request = SynapseRequest,
shorthand: bool = True,
federation_auth_origin: Optional[bytes] = None,
content_is_form: bool = False,

View File

@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from unittest import mock
from synapse.app.generic_worker import GenericWorkerServer
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
@@ -22,22 +19,6 @@ from synapse.storage.schema import SCHEMA_VERSION
from tests.unittest import HomeserverTestCase
def fake_listdir(filepath: str) -> List[str]:
"""
A fake implementation of os.listdir which we can use to mock out the filesystem.
Args:
filepath: The directory to list files for.
Returns:
A list of files and folders in the directory.
"""
if filepath.endswith("full_schemas"):
return [str(SCHEMA_VERSION)]
return ["99_add_unicorn_to_database.sql"]
class WorkerSchemaTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(
@@ -70,7 +51,7 @@ class WorkerSchemaTests(HomeserverTestCase):
prepare_database(db_conn, db_pool.engine, self.hs.config)
def test_not_upgraded_old_schema_version(self):
def test_not_upgraded(self):
"""Test that workers don't start if the DB has an older schema version"""
db_pool = self.hs.get_datastore().db_pool
db_conn = LoggingDatabaseConnection(
@@ -86,34 +67,3 @@ class WorkerSchemaTests(HomeserverTestCase):
with self.assertRaises(PrepareDatabaseException):
prepare_database(db_conn, db_pool.engine, self.hs.config)
def test_not_upgraded_current_schema_version_with_outstanding_deltas(self):
"""
Test that workers don't start if the DB is on the current schema version,
but there are still outstanding delta migrations to run.
"""
db_pool = self.hs.get_datastore().db_pool
db_conn = LoggingDatabaseConnection(
db_pool._db_pool.connect(),
db_pool.engine,
"tests",
)
# Set the schema version of the database to the current version
cur = db_conn.cursor()
cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION,))
db_conn.commit()
# Path `os.listdir` here to make synapse think that there is a migration
# file ready to be run.
# Note that we can't patch this function for the whole method, else Synapse
# will try to find the file when building the database initially.
with mock.patch("os.listdir", mock.Mock(side_effect=fake_listdir)):
with self.assertRaises(PrepareDatabaseException):
# Synapse should think that there is an outstanding migration file due to
# patching 'os.listdir' in the function decorator.
#
# We expect Synapse to raise an exception to indicate the master process
# needs to apply this migration file.
prepare_database(db_conn, db_pool.engine, self.hs.config)

View File

@@ -44,7 +44,6 @@ from twisted.python.threadpool import ThreadPool
from twisted.test.proto_helpers import MemoryReactor
from twisted.trial import unittest
from twisted.web.resource import Resource
from twisted.web.server import Request
from synapse import events
from synapse.api.constants import EventTypes, Membership
@@ -96,13 +95,16 @@ def around(target):
return _around
T = TypeVar("T")
class TestCase(unittest.TestCase):
"""A subclass of twisted.trial's TestCase which looks for 'loglevel'
attributes on both itself and its individual test methods, to override the
root logger's logging level while that test (case|method) runs."""
def __init__(self, methodName: str):
super().__init__(methodName)
def __init__(self, methodName, *args, **kwargs):
super().__init__(methodName, *args, **kwargs)
method = getattr(self, methodName)
@@ -218,16 +220,16 @@ class HomeserverTestCase(TestCase):
Attributes:
servlets: List of servlet registration function.
user_id (str): The user ID to assume if auth is hijacked.
hijack_auth: Whether to hijack auth to return the user specified
hijack_auth (bool): Whether to hijack auth to return the user specified
in user_id.
"""
hijack_auth: ClassVar[bool] = True
needs_threadpool: ClassVar[bool] = False
hijack_auth = True
needs_threadpool = False
servlets: ClassVar[List[RegisterServletsFunc]] = []
def __init__(self, methodName: str):
super().__init__(methodName)
def __init__(self, methodName, *args, **kwargs):
super().__init__(methodName, *args, **kwargs)
# see if we have any additional config for this test
method = getattr(self, methodName)
@@ -299,10 +301,9 @@ class HomeserverTestCase(TestCase):
None,
)
# Type ignore: mypy doesn't like us assigning to methods.
self.hs.get_auth().get_user_by_req = get_user_by_req # type: ignore[assignment]
self.hs.get_auth().get_user_by_access_token = get_user_by_access_token # type: ignore[assignment]
self.hs.get_auth().get_access_token_from_request = Mock( # type: ignore[assignment]
self.hs.get_auth().get_user_by_req = get_user_by_req
self.hs.get_auth().get_user_by_access_token = get_user_by_access_token
self.hs.get_auth().get_access_token_from_request = Mock(
return_value="1234"
)
@@ -416,7 +417,7 @@ class HomeserverTestCase(TestCase):
path: Union[bytes, str],
content: Union[bytes, str, JsonDict] = b"",
access_token: Optional[str] = None,
request: Type[Request] = SynapseRequest,
request: Type[T] = SynapseRequest,
shorthand: bool = True,
federation_auth_origin: Optional[bytes] = None,
content_is_form: bool = False,
@@ -595,7 +596,7 @@ class HomeserverTestCase(TestCase):
nonce_str += b"\x00notadmin"
want_mac.update(nonce.encode("ascii") + b"\x00" + nonce_str)
want_mac_digest = want_mac.hexdigest()
want_mac = want_mac.hexdigest()
body = json.dumps(
{
@@ -604,7 +605,7 @@ class HomeserverTestCase(TestCase):
"displayname": displayname,
"password": password,
"admin": admin,
"mac": want_mac_digest,
"mac": want_mac,
"inhibit_login": True,
}
)