1
0

Compare commits

...

28 Commits

Author SHA1 Message Date
David Robertson
bdb00ee73b Wrap metadata check in try-except
Not sure if this is a good idea, but here is a proposed change
2022-04-27 15:37:59 +01:00
Dirk Klimpel
b76f1a4d5f Add some type hints to datastore (#12485) 2022-04-27 13:05:00 +01:00
Nick Mills-Barrett
63ba9ba38b Bound ephemeral events by key (#12544)
Co-authored-by: Brad Murray <bradtgmurray@gmail.com>
Co-authored-by: Andrew Morgan <andrewm@element.io>
2022-04-26 20:14:21 +01:00
David Robertson
9986621bc8 Merge tag 'v1.58.0rc2' into develop
Synapse 1.58.0rc2 (2022-04-26)
==============================

This release candidate fixes bugs related to Synapse 1.58.0rc1's logic for handling device list updates.

Bugfixes
--------

- Fix a bug introduced in Synapse 1.58.0rc1 where the main process could consume excessive amounts of CPU and memory while handling sentry logging failures. ([\#12554](https://github.com/matrix-org/synapse/issues/12554))
- Fix a bug introduced in Synapse 1.58.0rc1 where opentracing contexts were not correctly sent to whitelisted remote servers with device lists updates. ([\#12555](https://github.com/matrix-org/synapse/issues/12555))

Internal Changes
----------------

- Reduce unnecessary work when handling remote device list updates. ([\#12557](https://github.com/matrix-org/synapse/issues/12557))
2022-04-26 18:07:15 +01:00
David Robertson
9cfecd2dc0 Adjust changelog 2022-04-26 17:22:12 +01:00
David Robertson
56c9c6c465 Credit Tulir's contribution in 1.58.0rc1 to Beeper, too 2022-04-26 17:17:56 +01:00
David Robertson
6b64ee9ec7 1.58.0rc2 2022-04-26 17:16:43 +01:00
Erik Johnston
f59e3f4c90 Mark remote device list updates as already handled (#12557) 2022-04-26 17:07:21 +01:00
David Robertson
6d89f1239c Comment out dodgy log-kv (#12554) 2022-04-26 15:53:06 +01:00
Erik Johnston
c48ab3734e Fix sending opentracing contexts to remote servers (#12555) 2022-04-26 14:48:16 +00:00
Jason Robinson
706456de1f Mark Dockerfile as requiring BuildKit (#12541)
Co-authored-by: David Robertson <davidr@element.io>
2022-04-26 15:31:52 +01:00
David Robertson
ee1601e59d Unbold deprecation: it is mentioned at the top 2022-04-26 11:59:10 +01:00
David Robertson
6b9e95015b Lint the release script 2022-04-26 11:53:37 +01:00
David Robertson
416604e3bc Another set of changelog updates 2022-04-26 11:51:47 +01:00
David Robertson
a54d9b0508 We don't require redbaron in the release script 2022-04-26 11:37:21 +01:00
David Robertson
f987cdd80b Changelog update 2022-04-26 11:32:57 +01:00
David Robertson
30db7fdb91 1.58.0rc1 2022-04-26 11:15:33 +01:00
David Robertson
7c063da25c Temporarily lower debian changelog version number
This seems to make dch happy when we prepare the release.
2022-04-26 11:14:41 +01:00
David Robertson
730fcda546 Update release script to be poetry-aware
Poetry now manages the project version in pyproject.toml.
2022-04-26 11:14:27 +01:00
Shay
99ab45423a build debian package for jammy jellyfish (#12543) 2022-04-26 10:34:59 +01:00
Richard van der Hoff
17d99f758a Optimise backfill calculation (#12522)
Try to avoid an OOM by checking fewer extremities.

Generally this is a big rewrite of _maybe_backfill, to try and fix some of the TODOs and other problems in it. It's best reviewed commit-by-commit.
2022-04-26 10:27:11 +01:00
Shay
e75c7e3b6d Add a table of contents to config manual (#12527)
* Update config_documentation.md
2022-04-25 11:43:59 -07:00
Sean Quah
8a87b4435a Handle cancellation in EventsWorkerStore._get_events_from_cache_or_db (#12529)
Multiple calls to `EventsWorkerStore._get_events_from_cache_or_db` can
reuse the same database fetch, which is initiated by the first call.
Ensure that cancelling the first call doesn't cancel the other calls
sharing the same database fetch.

Signed-off-by: Sean Quah <seanq@element.io>
2022-04-25 19:39:17 +01:00
Sami Olmari
813d728d09 Correct typo in user_admin_api.md device deletion JSON (#12533)
Signed-off-by: Sami Olmari <sami@olmari.fi>
2022-04-25 12:39:15 +00:00
David Robertson
8bac3e0435 disallow-untyped-defs in docker and stubs directories (#12528) 2022-04-25 12:32:35 +00:00
Patrick Cloke
185da8f0f2 Misc. clean-ups to the relations code (#12519)
* Corrects some typos / copy & paste errors in tests.
* Clarifies docstrings.
* Removes an unnecessary method.
2022-04-25 08:25:56 -04:00
villepeh
d9b71410c2 Add HAProxy delegation example to docs (#12501)
Signed-off-by: Ville Petteri Huh
2022-04-25 13:18:18 +01:00
Dirk Klimpel
a36a38b1ca Add some example configurations for worker (#12492)
Signed-off-by: Dirk Klimpel <dirk@klimpel.org>
2022-04-25 13:17:03 +01:00
91 changed files with 1005 additions and 562 deletions

View File

@@ -1,3 +1,93 @@
Synapse 1.58.0rc2 (2022-04-26)
==============================
This release candidate fixes bugs related to Synapse 1.58.0rc1's logic for handling device list updates.
Bugfixes
--------
- Fix a bug introduced in Synapse 1.58.0rc1 where the main process could consume excessive amounts of CPU and memory while handling sentry logging failures. ([\#12554](https://github.com/matrix-org/synapse/issues/12554))
- Fix a bug introduced in Synapse 1.58.0rc1 where opentracing contexts were not correctly sent to whitelisted remote servers with device lists updates. ([\#12555](https://github.com/matrix-org/synapse/issues/12555))
Internal Changes
----------------
- Reduce unnecessary work when handling remote device list updates. ([\#12557](https://github.com/matrix-org/synapse/issues/12557))
Synapse 1.58.0rc1 (2022-04-26)
==============================
As of this release, the groups/communities feature in Synapse is now disabled by default. See [\#11584](https://github.com/matrix-org/synapse/issues/11584) for details. As mentioned in [the upgrade notes](https://github.com/matrix-org/synapse/blob/develop/docs/upgrade.md#upgrading-to-v1580), this feature will be removed in Synapse 1.61.
Features
--------
- Implement [MSC3383](https://github.com/matrix-org/matrix-spec-proposals/pull/3383) for including the destination in server-to-server authentication headers. Contributed by @Bubu and @jcgruenhage for Famedly. ([\#11398](https://github.com/matrix-org/synapse/issues/11398))
- Docker images and Debian packages from matrix.org now contain a locked set of Python dependencies, greatly improving build reproducibility. ([Board](https://github.com/orgs/matrix-org/projects/54), [\#11537](https://github.com/matrix-org/synapse/issues/11537))
- Enable processing of device list updates asynchronously. ([\#12365](https://github.com/matrix-org/synapse/issues/12365), [\#12465](https://github.com/matrix-org/synapse/issues/12465))
- Implement [MSC2815](https://github.com/matrix-org/matrix-spec-proposals/pull/2815) to allow room moderators to view redacted event content. Contributed by @tulir @ Beeper. ([\#12427](https://github.com/matrix-org/synapse/issues/12427))
- Build Debian packages for Ubuntu 22.04 "Jammy Jellyfish". ([\#12543](https://github.com/matrix-org/synapse/issues/12543))
Bugfixes
--------
- Prevent a sync request from removing a user's busy presence status. ([\#12213](https://github.com/matrix-org/synapse/issues/12213))
- Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper. ([\#12319](https://github.com/matrix-org/synapse/issues/12319))
- Fix a long-standing bug which incorrectly caused `GET /_matrix/client/v3/rooms/{roomId}/event/{eventId}` to return edited events rather than the original. ([\#12476](https://github.com/matrix-org/synapse/issues/12476))
- Fix a bug introduced in Synapse 1.27.0 where the admin API for [deleting forward extremities](https://github.com/matrix-org/synapse/blob/erikj/fix_delete_event_response_count/docs/admin_api/rooms.md#deleting-forward-extremities) would always return a count of 1, no matter how many extremities were deleted. ([\#12496](https://github.com/matrix-org/synapse/issues/12496))
- Fix a long-standing bug where the image thumbnails embedded into email notifications were broken. ([\#12510](https://github.com/matrix-org/synapse/issues/12510))
- Fix a bug in the implementation of [MSC3202](https://github.com/matrix-org/matrix-spec-proposals/pull/3202) where Synapse would use the field name `device_unused_fallback_keys`, rather than `device_unused_fallback_key_types`. ([\#12520](https://github.com/matrix-org/synapse/issues/12520))
- Fix a bug introduced in Synapse 0.99.3 which could cause Synapse to consume large amounts of RAM when back-paginating in a large room. ([\#12522](https://github.com/matrix-org/synapse/issues/12522))
Improved Documentation
----------------------
- Fix rendering of the documentation site when using the 'print' feature. ([\#12340](https://github.com/matrix-org/synapse/issues/12340))
- Add a manual documenting config file options. ([\#12368](https://github.com/matrix-org/synapse/issues/12368), [\#12527](https://github.com/matrix-org/synapse/issues/12527))
- Update documentation to reflect that both the `run_background_tasks_on` option and the options for moving stream writers off of the main process are no longer experimental. ([\#12451](https://github.com/matrix-org/synapse/issues/12451))
- Update worker documentation and replace old `federation_reader` with `generic_worker`. ([\#12457](https://github.com/matrix-org/synapse/issues/12457))
- Strongly recommend [Poetry](https://python-poetry.org/) for development. ([\#12475](https://github.com/matrix-org/synapse/issues/12475))
- Add some example configurations for workers and update architectural diagram. ([\#12492](https://github.com/matrix-org/synapse/issues/12492))
- Fix a broken link in `README.rst`. ([\#12495](https://github.com/matrix-org/synapse/issues/12495))
- Add HAProxy delegation example with CORS headers to docs. ([\#12501](https://github.com/matrix-org/synapse/issues/12501))
- Remove extraneous comma in User Admin API's device deletion section so that the example JSON is actually valid and works. Contributed by @olmari. ([\#12533](https://github.com/matrix-org/synapse/issues/12533))
Deprecations and Removals
-------------------------
- The groups/communities feature in Synapse is now disabled by default. ([\#12344](https://github.com/matrix-org/synapse/issues/12344))
- Remove unstable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). ([\#12382](https://github.com/matrix-org/synapse/issues/12382))
Internal Changes
----------------
- Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join. ([\#12394](https://github.com/matrix-org/synapse/issues/12394))
- Preparation for faster-room-join work: Implement a tracking mechanism to allow functions to wait for full room state to arrive. ([\#12399](https://github.com/matrix-org/synapse/issues/12399))
- Remove an unstable identifier from [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083). ([\#12395](https://github.com/matrix-org/synapse/issues/12395))
- Run CI in the locked [Poetry](https://python-poetry.org/) environment, and remove corresponding `tox` jobs. ([\#12425](https://github.com/matrix-org/synapse/issues/12425), [\#12434](https://github.com/matrix-org/synapse/issues/12434), [\#12438](https://github.com/matrix-org/synapse/issues/12438), [\#12441](https://github.com/matrix-org/synapse/issues/12441), [\#12449](https://github.com/matrix-org/synapse/issues/12449), [\#12478](https://github.com/matrix-org/synapse/issues/12478), [\#12514](https://github.com/matrix-org/synapse/issues/12514), [\#12472](https://github.com/matrix-org/synapse/issues/12472))
- Change Mutual Rooms' `unstable_features` flag to `uk.half-shot.msc2666.mutual_rooms` which matches the current iteration of [MSC2666](https://github.com/matrix-org/matrix-spec-proposals/pull/2666). ([\#12445](https://github.com/matrix-org/synapse/issues/12445))
- Fix typo in the release script help string. ([\#12450](https://github.com/matrix-org/synapse/issues/12450))
- Fix a minor typo in the Debian changelogs generated by the release script. ([\#12497](https://github.com/matrix-org/synapse/issues/12497))
- Reintroduce the list of targets to the linter script, to avoid linting unwanted local-only directories during development. ([\#12455](https://github.com/matrix-org/synapse/issues/12455))
- Limit length of `device_id` to less than 512 characters. ([\#12454](https://github.com/matrix-org/synapse/issues/12454))
- Dockerfile-workers: reduce the amount we install in the image. ([\#12464](https://github.com/matrix-org/synapse/issues/12464))
- Dockerfile-workers: give the master its own log config. ([\#12466](https://github.com/matrix-org/synapse/issues/12466))
- complement-synapse-workers: factor out separate entry point script. ([\#12467](https://github.com/matrix-org/synapse/issues/12467))
- Back out experimental implementation of [MSC2314](https://github.com/matrix-org/matrix-spec-proposals/pull/2314). ([\#12474](https://github.com/matrix-org/synapse/issues/12474))
- Fix grammatical error in federation error response when the room version of a room is unknown. ([\#12483](https://github.com/matrix-org/synapse/issues/12483))
- Remove unnecessary configuration overrides in tests. ([\#12511](https://github.com/matrix-org/synapse/issues/12511))
- Refactor the relations code for clarity. ([\#12519](https://github.com/matrix-org/synapse/issues/12519))
- Add type hints so `docker` and `stubs` directories pass `mypy --disallow-untyped-defs`. ([\#12528](https://github.com/matrix-org/synapse/issues/12528))
- Update `delay_cancellation` to accept any awaitable, rather than just `Deferred`s. ([\#12468](https://github.com/matrix-org/synapse/issues/12468))
- Handle cancellation in `EventsWorkerStore._get_events_from_cache_or_db`. ([\#12529](https://github.com/matrix-org/synapse/issues/12529))
Synapse 1.57.1 (2022-04-20)
===========================

View File

@@ -1 +0,0 @@
Implement [MSC3383](https://github.com/matrix-org/matrix-spec-proposals/pull/3383) for including the destination in server-to-server authentication headers. Contributed by @Bubu and @jcgruenhage for Famedly GmbH.

View File

@@ -1 +0,0 @@
Prevent a sync request from removing a user's busy presence status.

View File

@@ -1 +0,0 @@
Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.

View File

@@ -1 +0,0 @@
Use poetry to manage Synapse's dependencies.

View File

@@ -1 +0,0 @@
Fix rendering of the documentation site when using the 'print' feature.

View File

@@ -1 +0,0 @@
The groups/communities feature in Synapse has been disabled by default.

View File

@@ -1 +0,0 @@
Enable processing of device list updates asynchronously.

View File

@@ -1 +0,0 @@
Add a manual documenting config file options.

View File

@@ -1 +0,0 @@
Remove unstable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).

View File

@@ -1 +0,0 @@
Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join.

View File

@@ -1 +0,0 @@
Remove an unstable identifier from [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083).

View File

@@ -1 +0,0 @@
Preparation for faster-room-join work: Implement a tracking mechanism to allow functions to wait for full room state to arrive.

View File

@@ -1 +0,0 @@
Run twisted trunk CI job in the locked poetry environment.

View File

@@ -1 +0,0 @@
Implement [MSC2815](https://github.com/matrix-org/matrix-spec-proposals/pull/2815) to allow room moderators to view redacted event content. Contributed by @tulir.

View File

@@ -1 +0,0 @@
Run lints under poetry in CI, and remove corresponding tox lint jobs.

View File

@@ -1 +0,0 @@
Run "main" trial tests under `poetry`.

View File

@@ -1 +0,0 @@
Bump twisted version in `poetry.lock` to work around [pip bug #9644](https://github.com/pypa/pip/issues/9644).

View File

@@ -1 +0,0 @@
Change Mutual Rooms' `unstable_features` flag to `uk.half-shot.msc2666.mutual_rooms` which matches the current MSC iteration.

View File

@@ -1 +0,0 @@
Use `poetry` to manage the virtualenv in debian packages.

View File

@@ -1 +0,0 @@
Fix typo in the release script help string.

View File

@@ -1 +0,0 @@
Update documentation to reflect that both the `run_background_tasks_on` option and the options for moving stream writers off of the main process are no longer experimental.

View File

@@ -1 +0,0 @@
Limit length of device_id to less than 512 characters.

View File

@@ -1 +0,0 @@
Reintroduce the list of targets to the linter script, to avoid linting unwanted local-only directories during development.

View File

@@ -1 +0,0 @@
Update worker documentation and replace old `federation_reader` with `generic_worker`.

View File

@@ -1 +0,0 @@
Dockerfile-workers: reduce the amount we install in the image.

View File

@@ -1 +0,0 @@
Enable processing of device list updates asynchronously.

View File

@@ -1 +0,0 @@
Dockerfile-workers: give the master its own log config.

View File

@@ -1 +0,0 @@
complement-synapse-workers: factor out separate entry point script.

View File

@@ -1 +0,0 @@
Update `delay_cancellation` to accept any awaitable, rather than just `Deferred`s.

View File

@@ -1 +0,0 @@
Add a CI job which tests Synapse against the latest version of all dependencies.

View File

@@ -1 +0,0 @@
Back out experimental implementation of [MSC2314](https://github.com/matrix-org/matrix-spec-proposals/pull/2314).

View File

@@ -1 +0,0 @@
Strongly recommend `poetry` for development.

View File

@@ -1 +0,0 @@
Fix a long-standing bug which incorrectly caused `GET /_matrix/client/r3/rooms/{roomId}/event/{eventId}` to return edited events rather than the original.

View File

@@ -1 +0,0 @@
Use poetry-core instead of setuptools to build wheels.

View File

@@ -1 +0,0 @@
Fix grammatical error in federation error response when the room version of a room is unknown.

1
changelog.d/12485.misc Normal file
View File

@@ -0,0 +1 @@
Add some type hints to datastore.

View File

@@ -1 +0,0 @@
Fix a broken link in `README.rst`.

View File

@@ -1 +0,0 @@
Fix bug where the admin API for [deleting forward extremities](https://github.com/matrix-org/synapse/blob/erikj/fix_delete_event_response_count/docs/admin_api/rooms.md#deleting-forward-extremities) would always return a count of 1 no matter how many extremities were deleted. Broke in v1.27.0.

View File

@@ -1 +0,0 @@
Fix a minor typo in the Debian changelogs generated by the release script.

View File

@@ -1 +0,0 @@
Fix a long-standing bug where the image thumbanils embedded into email notifications were broken.

View File

@@ -1 +0,0 @@
Remove unnecessary configuration overrides in tests.

View File

@@ -1 +0,0 @@
Use poetry-core instead of setuptools to build wheels.

View File

@@ -1 +0,0 @@
Fix a bug in the implementation of MSC3202 where Synapse would use the field name `device_unused_fallback_keys`, rather than `device_unused_fallback_key_types`.

1
changelog.d/12541.docker Normal file
View File

@@ -0,0 +1 @@
Explicitly opt-in to using [BuildKit-specific features](https://github.com/moby/buildkit/blob/master/frontend/dockerfile/docs/syntax.md) in the Dockerfile. This fixes issues with building images in some GitLab CI environments.

1
changelog.d/12544.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.

11
debian/changelog vendored
View File

@@ -1,8 +1,15 @@
matrix-synapse-py3 (1.58.0+nmu1) UNRELEASED; urgency=medium
matrix-synapse-py3 (1.58.0~rc2) stable; urgency=medium
* New Synapse release 1.58.0rc2.
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Apr 2022 17:14:56 +0100
matrix-synapse-py3 (1.58.0~rc1) stable; urgency=medium
* Use poetry to manage the bundled virtualenv included with this package.
* New Synapse release 1.58.0rc1.
-- Synapse Packaging team <packages@matrix.org> Wed, 30 Mar 2022 12:21:43 +0100
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Apr 2022 11:15:20 +0100
matrix-synapse-py3 (1.57.1) stable; urgency=medium

View File

@@ -1,3 +1,4 @@
# syntax=docker/dockerfile:1
# Dockerfile to build the matrixdotorg/synapse docker images.
#
# Note that it uses features which are only available in BuildKit - see

View File

@@ -29,7 +29,7 @@
import os
import subprocess
import sys
from typing import Any, Dict, Mapping, Set
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Set
import jinja2
import yaml
@@ -201,7 +201,7 @@ upstream {upstream_worker_type} {{
# Utility functions
def log(txt: str):
def log(txt: str) -> None:
"""Log something to the stdout.
Args:
@@ -210,7 +210,7 @@ def log(txt: str):
print(txt)
def error(txt: str):
def error(txt: str) -> NoReturn:
"""Log something and exit with an error code.
Args:
@@ -220,7 +220,7 @@ def error(txt: str):
sys.exit(2)
def convert(src: str, dst: str, **template_vars):
def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template
Args:
@@ -290,7 +290,7 @@ def add_sharding_to_shared_config(
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
def generate_base_homeserver_config():
def generate_base_homeserver_config() -> None:
"""Starts Synapse and generates a basic homeserver config, which will later be
modified for worker support.
@@ -302,12 +302,14 @@ def generate_base_homeserver_config():
subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])
def generate_worker_files(environ, config_path: str, data_dir: str):
def generate_worker_files(
environ: Mapping[str, str], config_path: str, data_dir: str
) -> None:
"""Read the desired list of workers from environment variables and generate
shared homeserver, nginx and supervisord configs.
Args:
environ: _Environ[str]
environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
@@ -369,13 +371,13 @@ def generate_worker_files(environ, config_path: str, data_dir: str):
nginx_locations = {}
# Read the desired worker configuration from the environment
worker_types = environ.get("SYNAPSE_WORKER_TYPES")
if worker_types is None:
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES")
if worker_types_env is None:
# No workers, just the main process
worker_types = []
else:
# Split type names by comma
worker_types = worker_types.split(",")
worker_types = worker_types_env.split(",")
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
@@ -547,7 +549,7 @@ def generate_worker_log_config(
return log_config_filepath
def main(args, environ):
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")

View File

@@ -6,27 +6,28 @@ import os
import platform
import subprocess
import sys
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional
import jinja2
# Utility functions
def log(txt):
def log(txt: str) -> None:
print(txt, file=sys.stderr)
def error(txt):
def error(txt: str) -> NoReturn:
log(txt)
sys.exit(2)
def convert(src, dst, environ):
def convert(src: str, dst: str, environ: Mapping[str, object]) -> None:
"""Generate a file from a template
Args:
src (str): path to input file
dst (str): path to file to write
environ (dict): environment dictionary, for replacement mappings.
src: path to input file
dst: path to file to write
environ: environment dictionary, for replacement mappings.
"""
with open(src) as infile:
template = infile.read()
@@ -35,25 +36,30 @@ def convert(src, dst, environ):
outfile.write(rendered)
def generate_config_from_template(config_dir, config_path, environ, ownership):
def generate_config_from_template(
config_dir: str,
config_path: str,
os_environ: Mapping[str, str],
ownership: Optional[str],
) -> None:
"""Generate a homeserver.yaml from environment variables
Args:
config_dir (str): where to put generated config files
config_path (str): where to put the main config file
environ (dict): environment dictionary
ownership (str|None): "<user>:<group>" string which will be used to set
config_dir: where to put generated config files
config_path: where to put the main config file
os_environ: environment mapping
ownership: "<user>:<group>" string which will be used to set
ownership of the generated configs. If None, ownership will not change.
"""
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
if v not in environ:
if v not in os_environ:
error(
"Environment variable '%s' is mandatory when generating a config file."
% (v,)
)
# populate some params from data files (if they exist, else create new ones)
environ = environ.copy()
environ: Dict[str, Any] = dict(os_environ)
secrets = {
"registration": "SYNAPSE_REGISTRATION_SHARED_SECRET",
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY",
@@ -127,12 +133,12 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
subprocess.check_output(args)
def run_generate_config(environ, ownership):
def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) -> None:
"""Run synapse with a --generate-config param to generate a template config file
Args:
environ (dict): env var dict
ownership (str|None): "userid:groupid" arg for chmod. If None, ownership will not change.
environ: env vars from `os.enrivon`.
ownership: "userid:groupid" arg for chmod. If None, ownership will not change.
Never returns.
"""
@@ -178,7 +184,7 @@ def run_generate_config(environ, ownership):
os.execv(sys.executable, args)
def main(args, environ):
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
mode = args[1] if len(args) > 1 else "run"
# if we were given an explicit user to switch to, do so

View File

@@ -804,7 +804,7 @@ POST /_synapse/admin/v2/users/<user_id>/delete_devices
"devices": [
"QBUAZIFURK",
"AUIECTSRND"
],
]
}
```

View File

@@ -206,6 +206,28 @@ backend matrix
server matrix 127.0.0.1:8008
```
[Delegation](delegate.md) example:
```
frontend https
acl matrix-well-known-client-path path /.well-known/matrix/client
acl matrix-well-known-server-path path /.well-known/matrix/server
use_backend matrix-well-known-client if matrix-well-known-client-path
use_backend matrix-well-known-server if matrix-well-known-server-path
backend matrix-well-known-client
http-after-response set-header Access-Control-Allow-Origin "*"
http-after-response set-header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
http-after-response set-header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
http-request return status 200 content-type application/json string '{"m.homeserver":{"base_url":"https://matrix.example.com"},"m.identity_server":{"base_url":"https://identity.example.com"}}'
backend matrix-well-known-server
http-after-response set-header Access-Control-Allow-Origin "*"
http-after-response set-header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
http-after-response set-header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
http-request return status 200 content-type application/json string '{"m.server":"matrix.example.com:443"}'
```
### Relayd
```

View File

@@ -0,0 +1,8 @@
worker_app: synapse.app.generic_worker
worker_name: background_worker
# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093
worker_log_config: /etc/matrix-synapse/background-worker-log.yaml

View File

@@ -0,0 +1,23 @@
worker_app: synapse.app.generic_worker
worker_name: event_persister1
# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093
worker_listeners:
- type: http
port: 8034
resources:
- names: [replication]
# Enable listener if this stream writer handles endpoints for the `typing` or
# `to_device` streams. Uses a different port to the `replication` listener to
# avoid exposing the `replication` listener publicly.
#
#- type: http
# port: 8035
# resources:
# - names: [client]
worker_log_config: /etc/matrix-synapse/event-persister-log.yaml

View File

@@ -1,12 +1,13 @@
worker_app: synapse.app.generic_worker
worker_name: generic_worker1
# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093
worker_listeners:
- type: http
port: 8011
port: 8083
resources:
- names: [client, federation]

View File

@@ -64,7 +64,49 @@ apply if you want your config file to be read properly. A few helpful things to
In addition, each setting has an example of its usage, with the proper indentation
shown.
## Contents
[Modules](#modules)
[Server](#server)
[Homeserver Blocking](#homeserver-blocking)
[TLS](#tls)
[Federation](#federation)
[Caching](#caching)
[Database](#database)
[Logging](#logging)
[Ratelimiting](#ratelimiting)
[Media Store](#media-store)
[Captcha](#captcha)
[TURN](#turn)
[Registration](#registration)
[API Configuration](#api-configuration)
[Signing Keys](#signing-keys)
[Single Sign On Integration](#single-sign-on-integration)
[Push](#push)
[Rooms](#rooms)
[Opentracing](#opentracing)
[Workers](#workers)
[Background Updates](#background-updates)
## Modules
Server admins can expand Synapse's functionality with external modules.
@@ -3409,4 +3451,4 @@ background_updates:
sleep_duration_ms: 300
min_batch_size: 10
default_batch_size: 50
```
```

View File

@@ -138,20 +138,7 @@ as the `listeners` option in the shared config.
For example:
```yaml
worker_app: synapse.app.generic_worker
worker_name: worker1
# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093
worker_listeners:
- type: http
port: 8083
resources:
- names: [client, federation]
worker_log_config: /home/matrix/synapse/config/worker1_log_config.yaml
{{#include systemd-with-workers/workers/generic_worker.yaml}}
```
...is a full configuration for a generic worker instance, which will expose a
@@ -363,6 +350,12 @@ stream_writers:
events: event_persister1
```
An example for a stream writer instance:
```yaml
{{#include systemd-with-workers/workers/event_persister.yaml}}
```
Some of the streams have associated endpoints which, for maximum efficiency, should
be routed to the workers handling that stream. See below for the currently supported
streams and the endpoints associated with them:
@@ -436,6 +429,12 @@ run_background_tasks_on: background_worker
You might also wish to investigate the `update_user_directory` and
`media_instance_running_background_jobs` settings.
An example for a dedicated background worker instance:
```yaml
{{#include systemd-with-workers/workers/background_worker.yaml}}
```
### `synapse.app.pusher`
Handles sending push notifications to sygnal and email. Doesn't handle any
@@ -615,14 +614,14 @@ The following shows an example setup using Redis and a reverse proxy:
| Main | | Generic | | Generic | | Event |
| Process | | Worker 1 | | Worker 2 | | Persister |
+--------------+ +--------------+ +--------------+ +--------------+
^ ^ | ^ | | ^ | ^ ^
| | | | | | | | | |
| | | | | HTTP | | | | |
| +----------+<--|---|---------+ | | | |
| | +-------------|-->+----------+ |
| | | |
| | | |
v v v v
====================================================================
^ ^ | ^ | | ^ | | ^ ^
| | | | | | | | | | |
| | | | | HTTP | | | | | |
| +----------+<--|---|---------+<--|---|---------+ | |
| | +-------------|-->+-------------+ |
| | | |
| | | |
v v v v
======================================================================
Redis pub/sub channel
```

62
poetry.lock generated
View File

@@ -1,11 +1,3 @@
[[package]]
name = "appdirs"
version = "1.4.4"
description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "attrs"
version = "21.4.0"
@@ -49,17 +41,6 @@ six = "*"
[package.extras]
visualize = ["graphviz (>0.5.1)", "Twisted (>=16.1.1)"]
[[package]]
name = "baron"
version = "0.10.1"
description = "Full Syntax Tree for python to make writing refactoring code a realist task"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
rply = "*"
[[package]]
name = "bcrypt"
version = "3.2.0"
@@ -984,20 +965,6 @@ Pygments = ">=2.5.1"
[package.extras]
md = ["cmarkgfm (>=0.8.0)"]
[[package]]
name = "redbaron"
version = "0.9.2"
description = "Abstraction on top of baron, a FST for python to make writing refactoring code a realistic task"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
baron = ">=0.7"
[package.extras]
notebook = ["pygments"]
[[package]]
name = "requests"
version = "2.27.1"
@@ -1038,17 +1005,6 @@ python-versions = ">=3.7"
[package.extras]
idna2008 = ["idna"]
[[package]]
name = "rply"
version = "0.7.8"
description = "A pure Python Lex/Yacc that works with RPython"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
appdirs = "*"
[[package]]
name = "secretstorage"
version = "3.3.1"
@@ -1597,13 +1553,9 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7"
content-hash = "964ad29eaf7fd02749a4e735818f3bc0ba729c2f4b9e3213f0daa02643508b16"
content-hash = "f482a4f594a165dfe01ce253a22510d5faf38647ab0dcebc35789350cafd9bf0"
[metadata.files]
appdirs = [
{file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"},
{file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"},
]
attrs = [
{file = "attrs-21.4.0-py2.py3-none-any.whl", hash = "sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"},
{file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"},
@@ -1616,10 +1568,6 @@ automat = [
{file = "Automat-20.2.0-py2.py3-none-any.whl", hash = "sha256:b6feb6455337df834f6c9962d6ccf771515b7d939bca142b29c20c2376bc6111"},
{file = "Automat-20.2.0.tar.gz", hash = "sha256:7979803c74610e11ef0c0d68a2942b152df52da55336e0c9d58daf1831cbdf33"},
]
baron = [
{file = "baron-0.10.1-py2.py3-none-any.whl", hash = "sha256:befb33f4b9e832c7cd1e3cf0eafa6dd3cb6ed4cb2544245147c019936f4e0a8a"},
{file = "baron-0.10.1.tar.gz", hash = "sha256:af822ad44d4eb425c8516df4239ac4fdba9fdb398ef77e4924cd7c9b4045bc2f"},
]
bcrypt = [
{file = "bcrypt-3.2.0-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:b589229207630484aefe5899122fb938a5b017b0f4349f769b8c13e78d99a8fd"},
{file = "bcrypt-3.2.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c95d4cbebffafcdd28bd28bb4e25b31c50f6da605c81ffd9ad8a3d1b2ab7b1b6"},
@@ -2412,10 +2360,6 @@ readme-renderer = [
{file = "readme_renderer-33.0-py3-none-any.whl", hash = "sha256:f02cee0c4de9636b5a62b6be50c9742427ba1b956aad1d938bfb087d0d72ccdf"},
{file = "readme_renderer-33.0.tar.gz", hash = "sha256:e3b53bc84bd6af054e4cc1fe3567dc1ae19f554134221043a3f8c674e22209db"},
]
redbaron = [
{file = "redbaron-0.9.2-py2.py3-none-any.whl", hash = "sha256:d01032b6a848b5521a8d6ef72486315c2880f420956870cdd742e2b5a09b9bab"},
{file = "redbaron-0.9.2.tar.gz", hash = "sha256:472d0739ca6b2240bb2278ae428604a75472c9c12e86c6321e8c016139c0132f"},
]
requests = [
{file = "requests-2.27.1-py2.py3-none-any.whl", hash = "sha256:f22fa1e554c9ddfd16e6e41ac79759e17be9e492b3587efa038054674760e72d"},
{file = "requests-2.27.1.tar.gz", hash = "sha256:68d7c56fd5a8999887728ef304a6d12edc7be74f1cfa47714fc8b414525c9a61"},
@@ -2428,10 +2372,6 @@ rfc3986 = [
{file = "rfc3986-2.0.0-py2.py3-none-any.whl", hash = "sha256:50b1502b60e289cb37883f3dfd34532b8873c7de9f49bb546641ce9cbd256ebd"},
{file = "rfc3986-2.0.0.tar.gz", hash = "sha256:97aacf9dbd4bfd829baad6e6309fa6573aaf1be3f6fa735c8ab05e46cecb261c"},
]
rply = [
{file = "rply-0.7.8-py2.py3-none-any.whl", hash = "sha256:28ffd11d656c48aeb8c508eb382acd6a0bd906662624b34388751732a27807e7"},
{file = "rply-0.7.8.tar.gz", hash = "sha256:2a808ac25a4580a9991fc304d64434e299a8fc75760574492f242cbb5bb301c9"},
]
secretstorage = [
{file = "SecretStorage-3.3.1-py3-none-any.whl", hash = "sha256:422d82c36172d88d6a0ed5afdec956514b189ddbfb72fefab0c8a1cee4eaf71f"},
{file = "SecretStorage-3.3.1.tar.gz", hash = "sha256:fd666c51a6bf200643495a04abb261f83229dcb6fd8472ec393df7ffc8b6f195"},

View File

@@ -54,7 +54,7 @@ skip_gitignore = true
[tool.poetry]
name = "matrix-synapse"
version = "1.57.1"
version = "1.58.0rc2"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
@@ -270,7 +270,6 @@ idna = ">=2.5"
# The following are used by the release script
click = "==8.1.0"
redbaron = "==0.9.2"
GitPython = "==3.1.14"
commonmark = "==0.9.1"
pygithub = "==1.55"

View File

@@ -26,6 +26,7 @@ DISTS = (
"debian:sid",
"ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14)
"ubuntu:impish", # 21.10 (EOL 2022-07)
"ubuntu:jammy", # 22.04 LTS (EOL 2027-04)
)
DESC = """\

View File

@@ -25,13 +25,12 @@ import sys
import urllib.request
from os import path
from tempfile import TemporaryDirectory
from typing import List, Optional, Tuple
from typing import List, Optional
import attr
import click
import commonmark
import git
import redbaron
from click.exceptions import ClickException
from github import Github
from packaging import version
@@ -100,7 +99,7 @@ def prepare():
repo.remote().fetch()
# Get the current version and AST from root Synapse module.
current_version, parsed_synapse_ast, version_node = parse_version_from_module()
current_version = get_package_version()
# Figure out what sort of release we're doing and calcuate the new version.
rc = click.confirm("RC", default=True)
@@ -162,7 +161,7 @@ def prepare():
click.get_current_context().abort()
# Switch to the release branch.
parsed_new_version = version.parse(new_version)
parsed_new_version: version.Version = version.parse(new_version)
# We assume for debian changelogs that we only do RCs or full releases.
assert not parsed_new_version.is_devrelease
@@ -207,17 +206,15 @@ def prepare():
# Create the new release branch
release_branch = repo.create_head(release_branch_name, commit=base_branch)
# Switch to the release branch and ensure its up to date.
# Switch to the release branch and ensure it's up to date.
repo.git.checkout(release_branch_name)
update_branch(repo)
# Update the `__version__` variable and write it back to the file.
version_node.value = '"' + new_version + '"'
with open("synapse/__init__.py", "w") as f:
f.write(parsed_synapse_ast.dumps())
# Update the version specified in pyproject.toml.
subprocess.check_output(["poetry", "version", new_version])
# Generate changelogs.
generate_and_write_changelog(current_version)
generate_and_write_changelog(current_version, new_version)
# Generate debian changelogs
if parsed_new_version.pre is not None:
@@ -284,7 +281,7 @@ def tag(gh_token: Optional[str]):
repo.remote().fetch()
# Find out the version and tag name.
current_version, _, _ = parse_version_from_module()
current_version = get_package_version()
tag_name = f"v{current_version}"
# Check we haven't released this version.
@@ -362,7 +359,7 @@ def publish(gh_token: str):
if repo.is_dirty():
raise click.ClickException("Uncommitted changes exist.")
current_version, _, _ = parse_version_from_module()
current_version = get_package_version()
tag_name = f"v{current_version}"
if not click.confirm(f"Publish {tag_name}?", default=True):
@@ -396,7 +393,7 @@ def publish(gh_token: str):
def upload():
"""Upload release to pypi."""
current_version, _, _ = parse_version_from_module()
current_version = get_package_version()
tag_name = f"v{current_version}"
pypi_asset_names = [
@@ -424,7 +421,7 @@ def upload():
def announce():
"""Generate markdown to announce the release."""
current_version, _, _ = parse_version_from_module()
current_version = get_package_version()
tag_name = f"v{current_version}"
click.echo(
@@ -455,37 +452,11 @@ Announce the release in
)
def parse_version_from_module() -> Tuple[
version.Version, redbaron.RedBaron, redbaron.Node
]:
# Parse the AST and load the `__version__` node so that we can edit it
# later.
with open("synapse/__init__.py") as f:
red = redbaron.RedBaron(f.read())
version_node = None
for node in red:
if node.type != "assignment":
continue
if node.target.type != "name":
continue
if node.target.value != "__version__":
continue
version_node = node
break
if not version_node:
print("Failed to find '__version__' definition in synapse/__init__.py")
sys.exit(1)
# Parse the current version.
current_version = version.parse(version_node.value.value.strip('"'))
assert isinstance(current_version, version.Version)
return current_version, red, version_node
def get_package_version() -> version.Version:
version_string = subprocess.check_output(["poetry", "version", "--short"]).decode(
"utf-8"
)
return version.Version(version_string)
def find_ref(repo: git.Repo, ref_name: str) -> Optional[git.HEAD]:
@@ -565,11 +536,13 @@ def get_changes_for_version(wanted_version: version.Version) -> str:
return "\n".join(version_changelog)
def generate_and_write_changelog(current_version: version.Version):
def generate_and_write_changelog(current_version: version.Version, new_version: str):
# We do this by getting a draft so that we can edit it before writing to the
# changelog.
result = run_until_successful(
"python3 -m towncrier --draft", shell=True, capture_output=True
f"python3 -m towncrier build --draft --version {new_version}",
shell=True,
capture_output=True,
)
new_changes = result.stdout.decode("utf-8")
new_changes = new_changes.replace(

View File

@@ -103,7 +103,7 @@ class SortedDict(Dict[_KT, _VT]):
self,
start: Optional[int] = ...,
stop: Optional[int] = ...,
reverse=bool,
reverse: bool = ...,
) -> Iterator[_KT]: ...
def bisect_left(self, value: _KT) -> int: ...
def bisect_right(self, value: _KT) -> int: ...

View File

@@ -81,7 +81,7 @@ class SortedList(MutableSequence[_T]):
self,
start: Optional[int] = ...,
stop: Optional[int] = ...,
reverse=bool,
reverse: bool = ...,
) -> Iterator[_T]: ...
def _islice(
self,
@@ -153,14 +153,14 @@ class SortedKeyList(SortedList[_T]):
maximum: Optional[int] = ...,
inclusive: Tuple[bool, bool] = ...,
reverse: bool = ...,
): ...
) -> Iterator[_T]: ...
def irange_key(
self,
min_key: Optional[Any] = ...,
max_key: Optional[Any] = ...,
inclusive: Tuple[bool, bool] = ...,
reserve: bool = ...,
): ...
) -> Iterator[_T]: ...
def bisect_left(self, value: _T) -> int: ...
def bisect_right(self, value: _T) -> int: ...
def bisect(self, value: _T) -> int: ...

View File

@@ -103,7 +103,7 @@ class SortedSet(MutableSet[_T], Sequence[_T]):
self,
start: Optional[int] = ...,
stop: Optional[int] = ...,
reverse=bool,
reverse: bool = ...,
) -> Iterator[_T]: ...
def irange(
self,

View File

@@ -18,6 +18,8 @@ from typing import Any, List, Optional, Type, Union
from twisted.internet import protocol
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IAddress
from twisted.python.failure import Failure
class RedisProtocol(protocol.Protocol):
def publish(self, channel: str, message: bytes) -> "Deferred[None]": ...
@@ -34,11 +36,14 @@ class RedisProtocol(protocol.Protocol):
def get(self, key: str) -> "Deferred[Any]": ...
class SubscriberProtocol(RedisProtocol):
def __init__(self, *args, **kwargs): ...
def __init__(self, *args: object, **kwargs: object): ...
password: Optional[str]
def subscribe(self, channels: Union[str, List[str]]): ...
def connectionMade(self): ...
def connectionLost(self, reason): ...
def subscribe(self, channels: Union[str, List[str]]) -> "Deferred[None]": ...
def connectionMade(self) -> None: ...
# type-ignore: twisted.internet.protocol.Protocol provides a default argument for
# `reason`. txredisapi's LineReceiver Protocol doesn't. But that's fine: it's what's
# actually specified in twisted.internet.interfaces.IProtocol.
def connectionLost(self, reason: Failure) -> None: ... # type: ignore[override]
def lazyConnection(
host: str = ...,
@@ -74,7 +79,7 @@ class RedisFactory(protocol.ReconnectingClientFactory):
replyTimeout: Optional[int] = None,
convertNumbers: Optional[int] = True,
): ...
def buildProtocol(self, addr) -> RedisProtocol: ...
def buildProtocol(self, addr: IAddress) -> RedisProtocol: ...
class SubscriberFactory(RedisFactory):
def __init__(self) -> None: ...

View File

@@ -479,9 +479,9 @@ class EventClientSerializer:
Args:
event: The event being serialized.
time_now: The current time in milliseconds
config: Event serialization config
aggregations: The bundled aggregation to serialize.
serialized_event: The serialized event which may be modified.
config: Event serialization config
apply_edits: Whether the content of the event should be modified to reflect
any replacement in `aggregations.replace`.
"""

View File

@@ -416,7 +416,7 @@ class ApplicationServicesHandler:
return typing
async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
@@ -447,7 +447,7 @@ class ApplicationServicesHandler:
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
service=service, from_key=from_key, to_key=new_token
)
return receipts

View File

@@ -505,8 +505,9 @@ class DeviceHandler(DeviceWorkerHandler):
"device_list_key", position, users={user_id}, rooms=room_ids
)
# We may need to do some processing asynchronously.
self._handle_new_device_update_async()
# We may need to do some processing asynchronously for local user IDs.
if self.hs.is_mine_id(user_id):
self._handle_new_device_update_async()
async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
@@ -683,9 +684,12 @@ class DeviceHandler(DeviceWorkerHandler):
self.federation_sender.send_device_messages(
host, immediate=False
)
log_kv(
{"message": "sent device update to host", "host": host}
)
# TODO: when called, this isn't in a logging context.
# This leads to log spam, sentry event spam, and massive
# memory usage. See #12552.
# log_kv(
# {"message": "sent device update to host", "host": host}
# )
if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're

View File

@@ -1,4 +1,4 @@
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
# Copyright 2020 Sorunome
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,10 +15,14 @@
"""Contains handlers for federation events."""
import enum
import itertools
import logging
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
import attr
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -92,6 +96,24 @@ def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
return sorted(joined_domains.items(), key=lambda d: d[1])
class _BackfillPointType(Enum):
# a regular backwards extremity (ie, an event which we don't yet have, but which
# is referred to by other events in the DAG)
BACKWARDS_EXTREMITY = enum.auto()
# an MSC2716 "insertion event"
INSERTION_PONT = enum.auto()
@attr.s(slots=True, auto_attribs=True, frozen=True)
class _BackfillPoint:
"""A potential point we might backfill from"""
event_id: str
depth: int
type: _BackfillPointType
class FederationHandler:
"""Handles general incoming federation requests
@@ -157,89 +179,51 @@ class FederationHandler:
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
room_id
)
]
insertion_events_to_be_backfilled: Dict[str, int] = {}
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backward_extremities_in_room(
insertion_events_to_be_backfilled = [
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
room_id
)
)
]
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
"_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
backwards_extremities,
insertion_events_to_be_backfilled,
)
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
if not backwards_extremities and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False
# We only want to paginate if we can actually see the events we'll get,
# as otherwise we'll just spend a lot of resources to get redacted
# events.
#
# We do this by filtering all the backwards extremities and seeing if
# any remain. Given we don't have the extremity events themselves, we
# need to actually check the events that reference them.
#
# *Note*: the spec wants us to keep backfilling until we reach the start
# of the room in case we are allowed to see some of the history. However
# in practice that causes more issues than its worth, as a) its
# relatively rare for there to be any visible history and b) even when
# there is its often sufficiently long ago that clients would stop
# attempting to paginate before backfill reached the visible history.
#
# TODO: If we do do a backfill then we should filter the backwards
# extremities to only include those that point to visible portions of
# history.
#
# TODO: Correctly handle the case where we are allowed to see the
# forward event but not the backward extremity, e.g. in the case of
# initial join of the server where we are allowed to see the join
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.
forward_event_ids = await self.store.get_successor_events(
list(oldest_events_with_depth)
# we now have a list of potential places to backpaginate from. We prefer to
# start with the most recent (ie, max depth), so let's sort the list.
sorted_backfill_points: List[_BackfillPoint] = sorted(
itertools.chain(
backwards_extremities,
insertion_events_to_be_backfilled,
),
key=lambda e: -int(e.depth),
)
extremities_events = await self.store.get_events(
forward_event_ids,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = await filter_events_for_server(
self.storage,
self.server_name,
list(extremities_events.values()),
redact=False,
check_history_visibility_only=True,
)
logger.debug(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
"_maybe_backfill_inner: room_id: %s: current_depth: %s, limit: %s, "
"backfill points (%d): %s",
room_id,
current_depth,
limit,
len(sorted_backfill_points),
sorted_backfill_points,
)
if not filtered_extremities and not insertion_events_to_be_backfilled:
return False
extremities = {
**oldest_events_with_depth,
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
**insertion_events_to_be_backfilled,
}
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
# If we're approaching an extremity we trigger a backfill, otherwise we
# no-op.
#
@@ -249,6 +233,11 @@ class FederationHandler:
# chose more than one times the limit in case of failure, but choosing a
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
#
# XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
# care about events that have happened after our current position.
#
max_depth = sorted_backfill_points[0].depth
if current_depth - 2 * limit > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
@@ -265,31 +254,98 @@ class FederationHandler:
# 2. we have likely previously tried and failed to backfill from that
# extremity, so to avoid getting "stuck" requesting the same
# backfill repeatedly we drop those extremities.
filtered_sorted_extremeties_tuple = [
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
]
logger.debug(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)
#
# However, we need to check that the filtered extremities are non-empty.
# If they are empty then either we can a) bail or b) still attempt to
# backfill. We opt to try backfilling anyway just in case we do get
# relevant events.
if filtered_sorted_extremeties_tuple:
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
#
filtered_sorted_backfill_points = [
t for t in sorted_backfill_points if t.depth <= current_depth
]
if filtered_sorted_backfill_points:
logger.debug(
"_maybe_backfill_inner: backfill points before current depth: %s",
filtered_sorted_backfill_points,
)
sorted_backfill_points = filtered_sorted_backfill_points
else:
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
)
# We don't want to specify too many extremities as it causes the backfill
# request URI to be too long.
extremities = dict(sorted_extremeties_tuple[:5])
# For performance's sake, we only want to paginate from a particular extremity
# if we can actually see the events we'll get. Otherwise, we'd just spend a lot
# of resources to get redacted events. We check each extremity in turn and
# ignore those which users on our server wouldn't be able to see.
#
# Additionally, we limit ourselves to backfilling from at most 5 extremities,
# for two reasons:
#
# - The check which determines if we can see an extremity's events can be
# expensive (we load the full state for the room at each of the backfill
# points, or (worse) their successors)
# - We want to avoid the server-server API request URI becoming too long.
#
# *Note*: the spec wants us to keep backfilling until we reach the start
# of the room in case we are allowed to see some of the history. However,
# in practice that causes more issues than its worth, as (a) it's
# relatively rare for there to be any visible history and (b) even when
# there is it's often sufficiently long ago that clients would stop
# attempting to paginate before backfill reached the visible history.
extremities_to_request: List[str] = []
for bp in sorted_backfill_points:
if len(extremities_to_request) >= 5:
break
# For regular backwards extremities, we don't have the extremity events
# themselves, so we need to actually check the events that reference them -
# their "successor" events.
#
# TODO: Correctly handle the case where we are allowed to see the
# successor event but not the backward extremity, e.g. in the case of
# initial join of the server where we are allowed to see the join
# event but not anything before it. This would require looking at the
# state *before* the event, ignoring the special casing certain event
# types have.
if bp.type == _BackfillPointType.INSERTION_PONT:
event_ids_to_check = [bp.event_id]
else:
event_ids_to_check = await self.store.get_successor_events(bp.event_id)
events_to_check = await self.store.get_events_as_list(
event_ids_to_check,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = await filter_events_for_server(
self.storage,
self.server_name,
events_to_check,
redact=False,
check_history_visibility_only=True,
)
if filtered_extremities:
extremities_to_request.append(bp.event_id)
else:
logger.debug(
"_maybe_backfill_inner: skipping extremity %s as it would not be visible",
bp,
)
if not extremities_to_request:
logger.debug(
"_maybe_backfill_inner: found no extremities which would be visible"
)
return False
logger.debug(
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
)
# Now we need to decide which hosts to hit first.
@@ -309,7 +365,7 @@ class FederationHandler:
for dom in domains:
try:
await self._federation_event_handler.backfill(
dom, room_id, limit=100, extremities=extremities
dom, room_id, limit=100, extremities=extremities_to_request
)
# If this succeeded then we probably already have the
# appropriate stuff.

View File

@@ -239,13 +239,14 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
return events, to_key
async def get_new_events_as(
self, from_key: int, service: ApplicationService
self, from_key: int, to_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new read receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
to_key: the stream position up to which events should be fetched to
service: The appservice which may be interested
Returns:
@@ -255,7 +256,6 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
* The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()
if from_key == to_key:
return [], to_key

View File

@@ -256,64 +256,6 @@ class RelationsHandler:
return filtered_results
async def _get_bundled_aggregation_for_event(
self, event: EventBase, ignored_users: FrozenSet[str]
) -> Optional[BundledAggregations]:
"""Generate bundled aggregations for an event.
Note that this does not use a cache, but depends on cached methods.
Args:
event: The event to calculate bundled aggregations for.
ignored_users: The users ignored by the requesting user.
Returns:
The bundled aggregations for an event, if bundled aggregations are
enabled and the event can have bundled aggregations.
"""
# Do not bundle aggregations for an event which represents an edit or an
# annotation. It does not make sense for them to have related events.
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, (dict, frozendict)):
relation_type = relates_to.get("rel_type")
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
return None
event_id = event.event_id
room_id = event.room_id
# The bundled aggregations to include, a mapping of relation type to a
# type-specific value. Some types include the direct return type here
# while others need more processing during serialization.
aggregations = BundledAggregations()
annotations = await self.get_annotations_for_event(
event_id, room_id, ignored_users=ignored_users
)
if annotations:
aggregations.annotations = {"chunk": annotations}
references, next_token = await self.get_relations_for_event(
event_id,
event,
room_id,
RelationTypes.REFERENCE,
ignored_users=ignored_users,
)
if references:
aggregations.references = {
"chunk": [{"event_id": event.event_id} for event in references]
}
if next_token:
aggregations.references["next_batch"] = await next_token.to_string(
self._main_store
)
# Store the bundled aggregations in the event metadata for later use.
return aggregations
async def get_threads_for_events(
self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str]
) -> Dict[str, _ThreadAggregation]:
@@ -435,11 +377,39 @@ class RelationsHandler:
# Fetch other relations per event.
for event in events_by_id.values():
event_result = await self._get_bundled_aggregation_for_event(
event, ignored_users
# Do not bundle aggregations for an event which represents an edit or an
# annotation. It does not make sense for them to have related events.
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, (dict, frozendict)):
relation_type = relates_to.get("rel_type")
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
continue
annotations = await self.get_annotations_for_event(
event.event_id, event.room_id, ignored_users=ignored_users
)
if event_result:
results[event.event_id] = event_result
if annotations:
results.setdefault(
event.event_id, BundledAggregations()
).annotations = {"chunk": annotations}
references, next_token = await self.get_relations_for_event(
event.event_id,
event,
event.room_id,
RelationTypes.REFERENCE,
ignored_users=ignored_users,
)
if references:
aggregations = results.setdefault(event.event_id, BundledAggregations())
aggregations.references = {
"chunk": [{"event_id": ev.event_id} for ev in references]
}
if next_token:
aggregations.references["next_batch"] = await next_token.to_string(
self._main_store
)
# Fetch any edits (but not for redacted events).
#

View File

@@ -54,7 +54,7 @@ class RoomBatchHandler:
# it has a larger `depth` but before the successor event because the `stream_ordering`
# is negative before the successor event.
successor_event_ids = await self.store.get_successor_events(
[most_recent_prev_event_id]
most_recent_prev_event_id
)
# If we can't find any successor events, then it's a forward extremity of

View File

@@ -15,12 +15,17 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
IdGenerator,
MultiWriterIdGenerator,
@@ -266,7 +271,9 @@ class DataStore(
A tuple of a list of mappings from user to information and a count of total users.
"""
def get_users_paginate_txn(txn):
def get_users_paginate_txn(
txn: LoggingTransaction,
) -> Tuple[List[JsonDict], int]:
filters = []
args = [self.hs.config.server.server_name]
@@ -301,7 +308,7 @@ class DataStore(
"""
sql = "SELECT COUNT(*) as total_users " + sql_base
txn.execute(sql, args)
count = txn.fetchone()[0]
count = cast(Tuple[int], txn.fetchone())[0]
sql = f"""
SELECT name, user_type, is_guest, admin, deactivated, shadow_banned,
@@ -338,7 +345,9 @@ class DataStore(
)
def check_database_before_upgrade(cur, database_engine, config: HomeServerConfig):
def check_database_before_upgrade(
cur: Cursor, database_engine: BaseDatabaseEngine, config: HomeServerConfig
) -> None:
"""Called before upgrading an existing database to check that it is broadly sane
compared with the configuration.
"""

View File

@@ -14,7 +14,7 @@
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Pattern, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Pattern, Tuple, cast
from synapse.appservice import (
ApplicationService,
@@ -83,7 +83,7 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore):
txn.execute(
"SELECT COALESCE(max(txn_id), 0) FROM application_services_txns"
)
return txn.fetchone()[0] # type: ignore
return cast(Tuple[int], txn.fetchone())[0]
self._as_txn_seq_gen = build_sequence_generator(
db_conn,

View File

@@ -14,7 +14,17 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple, cast
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
cast,
)
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
@@ -118,7 +128,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
prefilled_cache=device_outbox_prefill,
)
def process_replication_rows(self, stream_name, instance_name, token, rows):
def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[ToDeviceStream.ToDeviceStreamRow],
) -> None:
if stream_name == ToDeviceStream.NAME:
# If replication is happening than postgres must be being used.
assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
@@ -134,7 +150,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
return super().process_replication_rows(stream_name, instance_name, token, rows)
def get_to_device_stream_token(self):
def get_to_device_stream_token(self) -> int:
return self._device_inbox_id_gen.get_current_token()
async def get_messages_for_user_devices(
@@ -301,7 +317,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
if not user_ids_to_query:
return {}, to_stream_id
def get_device_messages_txn(txn: LoggingTransaction):
def get_device_messages_txn(
txn: LoggingTransaction,
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
# Build a query to select messages from any of the given devices that
# are between the given stream id bounds.
@@ -428,7 +446,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
def delete_messages_for_device_txn(txn):
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
sql = (
"DELETE FROM device_inbox"
" WHERE user_id = ? AND device_id = ?"
@@ -455,15 +473,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
) -> Tuple[List[dict], int]:
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
) -> Tuple[List[JsonDict], int]:
"""
Args:
destination(str): The name of the remote server.
last_stream_id(int|long): The last position of the device message stream
destination: The name of the remote server.
last_stream_id: The last position of the device message stream
that the server sent up to.
current_stream_id(int|long): The current position of the device
message stream.
current_stream_id: The current position of the device message stream.
Returns:
A list of messages for the device and where in the stream the messages got to.
"""
@@ -485,7 +502,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return [], last_stream_id
@trace
def get_new_messages_for_remote_destination_txn(txn):
def get_new_messages_for_remote_destination_txn(
txn: LoggingTransaction,
) -> Tuple[List[JsonDict], int]:
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
" WHERE destination = ?"
@@ -527,7 +546,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
up_to_stream_id: Where to delete messages up to.
"""
def delete_messages_for_remote_destination_txn(txn):
def delete_messages_for_remote_destination_txn(txn: LoggingTransaction) -> None:
sql = (
"DELETE FROM device_federation_outbox"
" WHERE destination = ?"
@@ -566,7 +585,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
if last_id == current_id:
return [], current_id, False
def get_all_new_device_messages_txn(txn):
def get_all_new_device_messages_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
# We limit like this as we might have multiple rows per stream_id, and
# we want to make sure we always get all entries for any stream_id
# we return.
@@ -607,8 +628,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def add_messages_to_device_inbox(
self,
local_messages_by_user_then_device: dict,
remote_messages_by_destination: dict,
local_messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
remote_messages_by_destination: Dict[str, JsonDict],
) -> int:
"""Used to send messages from this server.
@@ -624,7 +645,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
assert self._can_write_to_device
def add_messages_txn(txn, now_ms, stream_id):
def add_messages_txn(
txn: LoggingTransaction, now_ms: int, stream_id: int
) -> None:
# Add the local messages directly to the local inbox.
self._add_messages_to_local_device_inbox_txn(
txn, stream_id, local_messages_by_user_then_device
@@ -677,11 +700,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return self._device_inbox_id_gen.get_current_token()
async def add_messages_from_remote_to_device_inbox(
self, origin: str, message_id: str, local_messages_by_user_then_device: dict
self,
origin: str,
message_id: str,
local_messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
) -> int:
assert self._can_write_to_device
def add_messages_txn(txn, now_ms, stream_id):
def add_messages_txn(
txn: LoggingTransaction, now_ms: int, stream_id: int
) -> None:
# Check if we've already inserted a matching message_id for that
# origin. This can happen if the origin doesn't receive our
# acknowledgement from the first time we received the message.
@@ -727,8 +755,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return stream_id
def _add_messages_to_local_device_inbox_txn(
self, txn, stream_id, messages_by_user_then_device
):
self,
txn: LoggingTransaction,
stream_id: int,
messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
) -> None:
assert self._can_write_to_device
local_by_user_then_device = {}
@@ -840,8 +871,10 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
self._remove_dead_devices_from_device_inbox,
)
async def _background_drop_index_device_inbox(self, progress, batch_size):
def reindex_txn(conn):
async def _background_drop_index_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
def reindex_txn(conn: LoggingDatabaseConnection) -> None:
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
txn.close()

View File

@@ -25,6 +25,7 @@ from typing import (
Optional,
Set,
Tuple,
cast,
)
from synapse.api.errors import Codes, StoreError
@@ -136,7 +137,9 @@ class DeviceWorkerStore(SQLBaseStore):
Number of devices of this users.
"""
def count_devices_by_users_txn(txn, user_ids):
def count_devices_by_users_txn(
txn: LoggingTransaction, user_ids: List[str]
) -> int:
sql = """
SELECT count(*)
FROM devices
@@ -149,7 +152,7 @@ class DeviceWorkerStore(SQLBaseStore):
)
txn.execute(sql + clause, args)
return txn.fetchone()[0]
return cast(Tuple[int], txn.fetchone())[0]
if not user_ids:
return 0
@@ -468,7 +471,7 @@ class DeviceWorkerStore(SQLBaseStore):
"""
txn.execute(sql, (destination, from_stream_id, now_stream_id, limit))
return list(txn)
return cast(List[Tuple[str, str, int, Optional[str]]], txn.fetchall())
async def _get_device_update_edus_by_remote(
self,
@@ -549,7 +552,7 @@ class DeviceWorkerStore(SQLBaseStore):
async def _get_last_device_update_for_remote_user(
self, destination: str, user_id: str, from_stream_id: int
) -> int:
def f(txn):
def f(txn: LoggingTransaction) -> int:
prev_sent_id_sql = """
SELECT coalesce(max(stream_id), 0) as stream_id
FROM device_lists_outbound_last_success
@@ -767,7 +770,7 @@ class DeviceWorkerStore(SQLBaseStore):
if not user_ids_to_check:
return set()
def _get_users_whose_devices_changed_txn(txn):
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
changes = set()
stream_id_where_clause = "stream_id > ?"
@@ -966,7 +969,9 @@ class DeviceWorkerStore(SQLBaseStore):
async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
"""Mark that we no longer track device lists for remote user."""
def _mark_remote_user_device_list_as_unsubscribed_txn(txn):
def _mark_remote_user_device_list_as_unsubscribed_txn(
txn: LoggingTransaction,
) -> None:
self.db_pool.simple_delete_txn(
txn,
table="device_lists_remote_extremeties",
@@ -1004,7 +1009,7 @@ class DeviceWorkerStore(SQLBaseStore):
)
def _store_dehydrated_device_txn(
self, txn, user_id: str, device_id: str, device_data: str
self, txn: LoggingTransaction, user_id: str, device_id: str, device_data: str
) -> Optional[str]:
old_device_id = self.db_pool.simple_select_one_onecol_txn(
txn,
@@ -1081,7 +1086,7 @@ class DeviceWorkerStore(SQLBaseStore):
"""
yesterday = self._clock.time_msec() - prune_age
def _prune_txn(txn):
def _prune_txn(txn: LoggingTransaction) -> None:
# look for (user, destination) pairs which have an update older than
# the cutoff.
#
@@ -1204,8 +1209,10 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
"drop_device_lists_outbound_last_success_non_unique_idx",
)
async def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn):
async def _drop_device_list_streams_non_unique_indexes(
self, progress: JsonDict, batch_size: int
) -> int:
def f(conn: LoggingDatabaseConnection) -> None:
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
@@ -1217,7 +1224,9 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
)
return 1
async def _remove_duplicate_outbound_pokes(self, progress, batch_size):
async def _remove_duplicate_outbound_pokes(
self, progress: JsonDict, batch_size: int
) -> int:
# for some reason, we have accumulated duplicate entries in
# device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
# efficient.
@@ -1230,7 +1239,7 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
{"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
)
def _txn(txn):
def _txn(txn: LoggingTransaction) -> int:
clause, args = make_tuple_comparison_clause(
[(x, last_row[x]) for x in KEY_COLS]
)
@@ -1602,7 +1611,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
context = get_active_span_text_map()
def add_device_changes_txn(txn, stream_ids):
def add_device_changes_txn(
txn: LoggingTransaction, stream_ids: List[int]
) -> None:
self._add_device_change_to_stream_txn(
txn,
user_id,
@@ -1635,8 +1646,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
txn: LoggingTransaction,
user_id: str,
device_ids: Collection[str],
stream_ids: List[str],
):
stream_ids: List[int],
) -> None:
txn.call_after(
self._device_list_stream_cache.entity_has_changed,
user_id,
@@ -1720,7 +1731,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
user_id: str,
device_ids: Iterable[str],
room_ids: Collection[str],
stream_ids: List[str],
stream_ids: List[int],
context: Dict[str, str],
) -> None:
"""Record the user in the room has updated their device."""
@@ -1748,7 +1759,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
device_id,
room_id,
stream_id,
False,
# We only need to calculate outbound pokes for local users
not self.hs.is_mine_id(user_id),
encoded_context,
)
for room_id in room_ids
@@ -1774,9 +1786,21 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
LIMIT ?
"""
def get_uncoverted_outbound_room_pokes_txn(txn):
def get_uncoverted_outbound_room_pokes_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
txn.execute(sql, (limit,))
return txn.fetchall()
return [
(
user_id,
device_id,
room_id,
stream_id,
db_to_json(opentracing_context),
)
for user_id, device_id, room_id, stream_id, opentracing_context in txn
]
return await self.db_pool.runInteraction(
"get_uncoverted_outbound_room_pokes", get_uncoverted_outbound_room_pokes_txn
@@ -1797,7 +1821,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
Marks the associated row in `device_lists_changes_in_room` as handled.
"""
def add_device_list_outbound_pokes_txn(txn, stream_ids: List[int]):
def add_device_list_outbound_pokes_txn(
txn: LoggingTransaction, stream_ids: List[int]
) -> None:
if hosts:
self._add_device_outbound_poke_to_stream_txn(
txn,

View File

@@ -695,7 +695,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# Return all events where not all sets can reach them.
return {eid for eid, n in event_to_missing_sets.items() if n}
async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
async def get_oldest_event_ids_with_depth_in_room(
self, room_id
) -> List[Tuple[str, int]]:
"""Gets the oldest events(backwards extremities) in the room along with the
aproximate depth.
@@ -708,7 +710,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id: Room where we want to find the oldest events
Returns:
Map from event_id to depth
List of (event_id, depth) tuples
"""
def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
@@ -741,7 +743,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
txn.execute(sql, (room_id, False))
return dict(txn)
return txn.fetchall()
return await self.db_pool.runInteraction(
"get_oldest_event_ids_with_depth_in_room",
@@ -751,7 +753,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
async def get_insertion_event_backward_extremities_in_room(
self, room_id
) -> Dict[str, int]:
) -> List[Tuple[str, int]]:
"""Get the insertion events we know about that we haven't backfilled yet.
We use this function so that we can compare and see if someones current
@@ -763,7 +765,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id: Room where we want to find the oldest events
Returns:
Map from event_id to depth
List of (event_id, depth) tuples
"""
def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
@@ -778,8 +780,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
"""
txn.execute(sql, (room_id,))
return dict(txn)
return txn.fetchall()
return await self.db_pool.runInteraction(
"get_insertion_event_backward_extremities_in_room",
@@ -1295,22 +1296,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_results.reverse()
return event_results
async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:
"""Fetch all events that have the given events as a prev event
async def get_successor_events(self, event_id: str) -> List[str]:
"""Fetch all events that have the given event as a prev event
Args:
event_ids: The events to use as the previous events.
event_id: The event to search for as a prev_event.
"""
rows = await self.db_pool.simple_select_many_batch(
return await self.db_pool.simple_select_onecol(
table="event_edges",
column="prev_event_id",
iterable=event_ids,
retcols=("event_id",),
keyvalues={"prev_event_id": event_id},
retcol="event_id",
desc="get_successor_events",
)
return [row["event_id"] for row in rows]
@wrap_as_background_process("delete_old_forward_extrem_cache")
async def _delete_old_forward_extrem_cache(self) -> None:
def _delete_old_forward_extrem_cache_txn(txn):

View File

@@ -75,7 +75,7 @@ from synapse.storage.util.id_generators import (
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@@ -640,42 +640,57 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids.difference_update(already_fetching_ids)
if missing_events_ids:
log_ctx = current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
# Add entries to `self._current_event_fetches` for each event we're
# going to pull from the DB. We use a single deferred that resolves
# to all the events we pulled from the DB (this will result in this
# function returning more events than requested, but that can happen
# already due to `_get_events_from_db`).
fetching_deferred: ObservableDeferred[
Dict[str, EventCacheEntry]
] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
for event_id in missing_events_ids:
self._current_event_fetches[event_id] = fetching_deferred
async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
"""Fetches the events in `missing_event_ids` from the database.
# Note that _get_events_from_db is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
try:
missing_events = await self._get_events_from_db(
missing_events_ids,
)
Also creates entries in `self._current_event_fetches` to allow
concurrent `_get_events_from_cache_or_db` calls to reuse the same fetch.
"""
log_ctx = current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
event_entry_map.update(missing_events)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
raise e
finally:
# Ensure that we mark these events as no longer being fetched.
# Add entries to `self._current_event_fetches` for each event we're
# going to pull from the DB. We use a single deferred that resolves
# to all the events we pulled from the DB (this will result in this
# function returning more events than requested, but that can happen
# already due to `_get_events_from_db`).
fetching_deferred: ObservableDeferred[
Dict[str, EventCacheEntry]
] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
for event_id in missing_events_ids:
self._current_event_fetches.pop(event_id, None)
self._current_event_fetches[event_id] = fetching_deferred
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)
# Note that _get_events_from_db is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event
# out of the database to check it.
#
try:
missing_events = await self._get_events_from_db(
missing_events_ids,
)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
raise e
finally:
# Ensure that we mark these events as no longer being fetched.
for event_id in missing_events_ids:
self._current_event_fetches.pop(event_id, None)
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)
return missing_events
# We must allow the database fetch to complete in the presence of
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
# reuse the same fetch.
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
get_missing_events_from_db()
)
event_entry_map.update(missing_events)
if already_fetching_deferreds:
# Wait for the other event requests to finish and add their results

View File

@@ -522,7 +522,9 @@ class GroupServerWorkerStore(SQLBaseStore):
desc="get_joined_groups",
)
async def get_all_groups_for_user(self, user_id, now_token) -> List[JsonDict]:
async def get_all_groups_for_user(
self, user_id: str, now_token: int
) -> List[JsonDict]:
def _get_all_groups_for_user_txn(txn: LoggingTransaction) -> List[JsonDict]:
sql = """
SELECT group_id, type, membership, u.content

View File

@@ -15,11 +15,12 @@
import itertools
import logging
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple
from signedjson.key import decode_verify_key_bytes
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.keys import FetchKeyResult
from synapse.storage.types import Cursor
from synapse.util.caches.descriptors import cached, cachedList
@@ -35,7 +36,9 @@ class KeyStore(SQLBaseStore):
"""Persistence for signature verification keys"""
@cached()
def _get_server_verify_key(self, server_name_and_key_id):
def _get_server_verify_key(
self, server_name_and_key_id: Tuple[str, str]
) -> FetchKeyResult:
raise NotImplementedError()
@cachedList(
@@ -179,19 +182,21 @@ class KeyStore(SQLBaseStore):
async def get_server_keys_json(
self, server_keys: Iterable[Tuple[str, Optional[str], Optional[str]]]
) -> Dict[Tuple[str, Optional[str], Optional[str]], List[dict]]:
) -> Dict[Tuple[str, Optional[str], Optional[str]], List[Dict[str, Any]]]:
"""Retrieve the key json for a list of server_keys and key ids.
If no keys are found for a given server, key_id and source then
that server, key_id, and source triplet entry will be an empty list.
The JSON is returned as a byte array so that it can be efficiently
used in an HTTP response.
Args:
server_keys (list): List of (server_name, key_id, source) triplets.
server_keys: List of (server_name, key_id, source) triplets.
Returns:
A mapping from (server_name, key_id, source) triplets to a list of dicts
"""
def _get_server_keys_json_txn(txn):
def _get_server_keys_json_txn(
txn: LoggingTransaction,
) -> Dict[Tuple[str, Optional[str], Optional[str]], List[Dict[str, Any]]]:
results = {}
for server_name, key_id, from_server in server_keys:
keyvalues = {"server_name": server_name}

View File

@@ -388,7 +388,14 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
return await self.db_pool.runInteraction("get_url_cache", get_url_cache_txn)
async def store_url_cache(
self, url, response_code, etag, expires_ts, og, media_id, download_ts
self,
url: str,
response_code: int,
etag: Optional[str],
expires_ts: int,
og: Optional[str],
media_id: str,
download_ts: int,
) -> None:
await self.db_pool.simple_insert(
"local_media_repository_url_cache",
@@ -441,7 +448,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
)
async def get_cached_remote_media(
self, origin, media_id: str
self, origin: str, media_id: str
) -> Optional[Dict[str, Any]]:
return await self.db_pool.simple_select_one(
"remote_media_cache",
@@ -608,7 +615,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
)
async def delete_remote_media(self, media_origin: str, media_id: str) -> None:
def delete_remote_media_txn(txn):
def delete_remote_media_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_delete_txn(
txn,
"remote_media_cache",

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple, cast
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast
from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
@@ -103,7 +103,9 @@ class PresenceStore(PresenceBackgroundUpdateStore):
prefilled_cache=presence_cache_prefill,
)
async def update_presence(self, presence_states) -> Tuple[int, int]:
async def update_presence(
self, presence_states: List[UserPresenceState]
) -> Tuple[int, int]:
assert self._can_persist_presence
stream_ordering_manager = self._presence_id_gen.get_next_mult(
@@ -121,7 +123,10 @@ class PresenceStore(PresenceBackgroundUpdateStore):
return stream_orderings[-1], self._presence_id_gen.get_current_token()
def _update_presence_txn(
self, txn: LoggingTransaction, stream_orderings, presence_states
self,
txn: LoggingTransaction,
stream_orderings: List[int],
presence_states: List[UserPresenceState],
) -> None:
for stream_id, state in zip(stream_orderings, presence_states):
txn.call_after(
@@ -405,7 +410,13 @@ class PresenceStore(PresenceBackgroundUpdateStore):
self._presence_on_startup = []
return active_on_startup
def process_replication_rows(self, stream_name, instance_name, token, rows) -> None:
def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[Any],
) -> None:
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(instance_name, token)
for row in rows:

View File

@@ -14,11 +14,25 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, List, Optional, Tuple
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
cast,
)
from synapse.push import PusherConfig, ThrottleParams
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
@@ -117,7 +131,7 @@ class PusherWorkerStore(SQLBaseStore):
return self._decode_pushers_rows(ret)
async def get_all_pushers(self) -> Iterator[PusherConfig]:
def get_pushers(txn):
def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]:
txn.execute("SELECT * FROM pushers")
rows = self.db_pool.cursor_to_dict(txn)
@@ -152,7 +166,9 @@ class PusherWorkerStore(SQLBaseStore):
if last_id == current_id:
return [], current_id, False
def get_all_updated_pushers_rows_txn(txn):
def get_all_updated_pushers_rows_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
sql = """
SELECT id, user_name, app_id, pushkey
FROM pushers
@@ -160,10 +176,13 @@ class PusherWorkerStore(SQLBaseStore):
ORDER BY id ASC LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
updates = [
(stream_id, (user_name, app_id, pushkey, False))
for stream_id, user_name, app_id, pushkey in txn
]
updates = cast(
List[Tuple[int, tuple]],
[
(stream_id, (user_name, app_id, pushkey, False))
for stream_id, user_name, app_id, pushkey in txn
],
)
sql = """
SELECT stream_id, user_id, app_id, pushkey
@@ -192,12 +211,12 @@ class PusherWorkerStore(SQLBaseStore):
)
@cached(num_args=1, max_entries=15000)
async def get_if_user_has_pusher(self, user_id: str):
async def get_if_user_has_pusher(self, user_id: str) -> None:
# This only exists for the cachedList decorator
raise NotImplementedError()
async def update_pusher_last_stream_ordering(
self, app_id, pushkey, user_id, last_stream_ordering
self, app_id: str, pushkey: str, user_id: str, last_stream_ordering: int
) -> None:
await self.db_pool.simple_update_one(
"pushers",
@@ -291,7 +310,7 @@ class PusherWorkerStore(SQLBaseStore):
last_user = progress.get("last_user", "")
def _delete_pushers(txn) -> int:
def _delete_pushers(txn: LoggingTransaction) -> int:
sql = """
SELECT name FROM users
@@ -339,7 +358,7 @@ class PusherWorkerStore(SQLBaseStore):
last_pusher = progress.get("last_pusher", 0)
def _delete_pushers(txn) -> int:
def _delete_pushers(txn: LoggingTransaction) -> int:
sql = """
SELECT p.id, access_token FROM pushers AS p
@@ -396,7 +415,7 @@ class PusherWorkerStore(SQLBaseStore):
last_pusher = progress.get("last_pusher", 0)
def _delete_pushers(txn) -> int:
def _delete_pushers(txn: LoggingTransaction) -> int:
sql = """
SELECT p.id, p.user_name, p.app_id, p.pushkey
@@ -502,7 +521,7 @@ class PusherStore(PusherWorkerStore):
async def delete_pusher_by_app_id_pushkey_user_id(
self, app_id: str, pushkey: str, user_id: str
) -> None:
def delete_pusher_txn(txn, stream_id):
def delete_pusher_txn(txn: LoggingTransaction, stream_id: int) -> None:
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_if_user_has_pusher, (user_id,)
)
@@ -547,7 +566,7 @@ class PusherStore(PusherWorkerStore):
# account.
pushers = list(await self.get_pushers_by_user_id(user_id))
def delete_pushers_txn(txn, stream_ids):
def delete_pushers_txn(txn: LoggingTransaction, stream_ids: List[int]) -> None:
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_if_user_has_pusher, (user_id,)
)

View File

@@ -370,10 +370,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def _update_state_for_partial_state_event_txn(
self,
txn,
txn: LoggingTransaction,
event: EventBase,
context: EventContext,
):
) -> None:
# we shouldn't have any outliers here
assert not event.internal_metadata.is_outlier()

View File

@@ -131,7 +131,7 @@ class UIAuthWorkerStore(SQLBaseStore):
session_id: str,
stage_type: str,
result: Union[str, bool, JsonDict],
):
) -> None:
"""
Mark a session stage as completed.
@@ -200,7 +200,9 @@ class UIAuthWorkerStore(SQLBaseStore):
desc="set_ui_auth_client_dict",
)
async def set_ui_auth_session_data(self, session_id: str, key: str, value: Any):
async def set_ui_auth_session_data(
self, session_id: str, key: str, value: Any
) -> None:
"""
Store a key-value pair into the sessions data associated with this
request. This data is stored server-side and cannot be modified by
@@ -223,7 +225,7 @@ class UIAuthWorkerStore(SQLBaseStore):
def _set_ui_auth_session_data_txn(
self, txn: LoggingTransaction, session_id: str, key: str, value: Any
):
) -> None:
# Get the current value.
result = cast(
Dict[str, Any],
@@ -275,7 +277,7 @@ class UIAuthWorkerStore(SQLBaseStore):
session_id: str,
user_agent: str,
ip: str,
):
) -> None:
"""Add the given user agent / IP to the tracking table"""
await self.db_pool.simple_upsert(
table="ui_auth_sessions_ips",
@@ -318,7 +320,7 @@ class UIAuthWorkerStore(SQLBaseStore):
def _delete_old_ui_auth_sessions_txn(
self, txn: LoggingTransaction, expiration_time: int
):
) -> None:
# Get the expired sessions.
sql = "SELECT session_id FROM ui_auth_sessions WHERE creation_time <= ?"
txn.execute(sql, [expiration_time])

View File

@@ -54,10 +54,16 @@ class DependencyException(Exception):
DEV_EXTRAS = {"lint", "mypy", "test", "dev"}
RUNTIME_EXTRAS = (
set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS
)
VERSION = metadata.version(DISTRIBUTION_NAME)
try:
RUNTIME_EXTRAS = (
set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS
)
VERSION = metadata.version(DISTRIBUTION_NAME)
except Exception as e:
raise RuntimeError(
"Unable to read Synapse's package installation metadata. "
"This may be a problem with how Synapse has been packaged."
) from e
def _is_dev_dependency(req: Requirement) -> bool:

View File

@@ -419,6 +419,13 @@ async def _event_to_memberships(
return {}
# for each event, get the event_ids of the membership state at those events.
#
# TODO: this means that we request the entire membership list. If there are only
# one or two users on this server, and the room is huge, this is very wasteful
# (it means more db work, and churns the *stateGroupMembersCache*).
# It might be that we could extend StateFilter to specify "give me keys matching
# *:<server_name>", to avoid this.
event_to_state_ids = await storage.state.get_state_ids_for_events(
frozenset(e.event_id for e in events),
state_filter=StateFilter.from_types(types=((EventTypes.Member, None),)),

View File

@@ -411,6 +411,88 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
"exclusive_as_user", "password", self.exclusive_as_user_device_id
)
def test_sending_read_receipt_batches_to_application_services(self):
"""Tests that a large batch of read receipts are sent correctly to
interested application services.
"""
# Register an application service that's interested in a certain user
# and room prefix
interested_appservice = self._register_application_service(
namespaces={
ApplicationService.NS_USERS: [
{
"regex": "@exclusive_as_user:.+",
"exclusive": True,
}
],
ApplicationService.NS_ROOMS: [
{
"regex": "!fakeroom_.*",
"exclusive": True,
}
],
},
)
# "Complete" a transaction.
# All this really does for us is make an entry in the application_services_state
# database table, which tracks the current stream_token per stream ID per AS.
self.get_success(
self.hs.get_datastores().main.complete_appservice_txn(
0,
interested_appservice,
)
)
# Now, pretend that we receive a large burst of read receipts (300 total) that
# all come in at once.
for i in range(300):
self.get_success(
# Insert a fake read receipt into the database
self.hs.get_datastores().main.insert_receipt(
# We have to use unique room ID + user ID combinations here, as the db query
# is an upsert.
room_id=f"!fakeroom_{i}:test",
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
data={},
)
)
# Now notify the appservice handler that 300 read receipts have all arrived
# at once. What will it do!
# note: stream tokens start at 2
for stream_token in range(2, 303):
self.get_success(
self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
services=[interested_appservice],
stream_key="receipt_key",
new_token=stream_token,
users=[self.exclusive_as_user],
)
)
# Using our txn send mock, we can see what the AS received. After iterating over every
# transaction, we'd like to see all 300 read receipts accounted for.
# No more, no less.
all_ephemeral_events = []
for call in self.send_mock.call_args_list:
ephemeral_events = call[0][2]
all_ephemeral_events += ephemeral_events
# Ensure that no duplicate events were sent
self.assertEqual(len(all_ephemeral_events), 300)
# Check that the ephemeral event is a read receipt with the expected structure
latest_read_receipt = all_ephemeral_events[-1]
self.assertEqual(latest_read_receipt["type"], "m.receipt")
event_id = list(latest_read_receipt["content"].keys())[0]
self.assertEqual(
latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
)
@unittest.override_config(
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
)

View File

@@ -560,43 +560,6 @@ class RelationsTestCase(BaseRelationsTestCase):
{"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
)
def test_edit_thread(self) -> None:
"""Test that editing a thread works."""
# Create a thread and edit the last event.
channel = self._send_relation(
RelationTypes.THREAD,
"m.room.message",
content={"msgtype": "m.text", "body": "A threaded reply!"},
)
threaded_event_id = channel.json_body["event_id"]
new_body = {"msgtype": "m.text", "body": "I've been edited!"}
self._send_relation(
RelationTypes.REPLACE,
"m.room.message",
content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
parent_id=threaded_event_id,
)
# Fetch the thread root, to get the bundled aggregation for the thread.
channel = self.make_request(
"GET",
f"/rooms/{self.room}/event/{self.parent_id}",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)
# We expect that the edit message appears in the thread summary in the
# unsigned relations section.
relations_dict = channel.json_body["unsigned"].get("m.relations")
self.assertIn(RelationTypes.THREAD, relations_dict)
thread_summary = relations_dict[RelationTypes.THREAD]
self.assertIn("latest_event", thread_summary)
latest_event_in_thread = thread_summary["latest_event"]
self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!")
def test_edit_edit(self) -> None:
"""Test that an edit cannot be edited."""
new_body = {"msgtype": "m.text", "body": "Initial edit"}
@@ -1047,7 +1010,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
thread_2 = channel.json_body["event_id"]
def assert_annotations(bundled_aggregations: JsonDict) -> None:
def assert_thread(bundled_aggregations: JsonDict) -> None:
self.assertEqual(2, bundled_aggregations.get("count"))
self.assertTrue(bundled_aggregations.get("current_user_participated"))
# The latest thread event has some fields that don't matter.
@@ -1066,7 +1029,38 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
bundled_aggregations.get("latest_event"),
)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_annotations, 9)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9)
def test_thread_edit_latest_event(self) -> None:
"""Test that editing the latest event in a thread works."""
# Create a thread and edit the last event.
channel = self._send_relation(
RelationTypes.THREAD,
"m.room.message",
content={"msgtype": "m.text", "body": "A threaded reply!"},
)
threaded_event_id = channel.json_body["event_id"]
new_body = {"msgtype": "m.text", "body": "I've been edited!"}
channel = self._send_relation(
RelationTypes.REPLACE,
"m.room.message",
content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
parent_id=threaded_event_id,
)
# Fetch the thread root, to get the bundled aggregation for the thread.
relations_dict = self._get_bundled_aggregations()
# We expect that the edit message appears in the thread summary in the
# unsigned relations section.
self.assertIn(RelationTypes.THREAD, relations_dict)
thread_summary = relations_dict[RelationTypes.THREAD]
self.assertIn("latest_event", thread_summary)
latest_event_in_thread = thread_summary["latest_event"]
self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!")
def test_aggregation_get_event_for_annotation(self) -> None:
"""Test that annotations do not get bundled aggregations included
@@ -1093,7 +1087,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
thread_id = channel.json_body["event_id"]
# Annotate the annotation.
# Annotate the thread.
self._send_relation(
RelationTypes.ANNOTATION, "m.reaction", "a", parent_id=thread_id
)

View File

@@ -13,10 +13,11 @@
# limitations under the License.
import json
from contextlib import contextmanager
from typing import Generator
from typing import Generator, Tuple
from unittest import mock
from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import ensureDeferred
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.room_versions import EventFormatVersions, RoomVersions
@@ -281,3 +282,119 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
# This next event fetch should succeed
self.get_success(self.store.get_event(self.event_ids[0]))
class GetEventCancellationTestCase(unittest.HomeserverTestCase):
"""Test cancellation of `get_event` calls."""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.store: EventsWorkerStore = hs.get_datastores().main
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room = self.helper.create_room_as(self.user, tok=self.token)
res = self.helper.send(self.room, tok=self.token)
self.event_id = res["event_id"]
# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
@contextmanager
def blocking_get_event_calls(
self,
) -> Generator[
Tuple["Deferred[None]", "Deferred[None]", "Deferred[None]"], None, None
]:
"""Starts two concurrent `get_event` calls for the same event.
Both `get_event` calls will use the same database fetch, which will be blocked
at the time this function returns.
Returns:
A tuple containing:
* A `Deferred` that unblocks the database fetch.
* A cancellable `Deferred` for the first `get_event` call.
* A cancellable `Deferred` for the second `get_event` call.
"""
# Patch `DatabasePool.runWithConnection` to block.
unblock: "Deferred[None]" = Deferred()
original_runWithConnection = self.store.db_pool.runWithConnection
async def runWithConnection(*args, **kwargs):
await unblock
return await original_runWithConnection(*args, **kwargs)
with mock.patch.object(
self.store.db_pool,
"runWithConnection",
new=runWithConnection,
):
ctx1 = LoggingContext("get_event1")
ctx2 = LoggingContext("get_event2")
async def get_event(ctx: LoggingContext) -> None:
with ctx:
await self.store.get_event(self.event_id)
get_event1 = ensureDeferred(get_event(ctx1))
get_event2 = ensureDeferred(get_event(ctx2))
# Both `get_event` calls ought to be blocked.
self.assertNoResult(get_event1)
self.assertNoResult(get_event2)
yield unblock, get_event1, get_event2
# Confirm that the two `get_event` calls shared the same database fetch.
self.assertEqual(ctx1.get_resource_usage().evt_db_fetch_count, 1)
self.assertEqual(ctx2.get_resource_usage().evt_db_fetch_count, 0)
def test_first_get_event_cancelled(self):
"""Test cancellation of the first `get_event` call sharing a database fetch.
The first `get_event` call is the one which initiates the fetch. We expect the
fetch to complete despite the cancellation. Furthermore, the first `get_event`
call must not abort before the fetch is complete, otherwise the fetch will be
using a finished logging context.
"""
with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
# Cancel the first `get_event` call.
get_event1.cancel()
# The first `get_event` call must not abort immediately, otherwise its
# logging context will be finished while it is still in use by the database
# fetch.
self.assertNoResult(get_event1)
# The second `get_event` call must not be cancelled.
self.assertNoResult(get_event2)
# Unblock the database fetch.
unblock.callback(None)
# A `CancelledError` should be raised out of the first `get_event` call.
exc = self.get_failure(get_event1, CancelledError).value
self.assertIsInstance(exc, CancelledError)
# The second `get_event` call should complete successfully.
self.get_success(get_event2)
def test_second_get_event_cancelled(self):
"""Test cancellation of the second `get_event` call sharing a database fetch."""
with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
# Cancel the second `get_event` call.
get_event2.cancel()
# The first `get_event` call must not be cancelled.
self.assertNoResult(get_event1)
# The second `get_event` call gets cancelled immediately.
exc = self.get_failure(get_event2, CancelledError).value
self.assertIsInstance(exc, CancelledError)
# Unblock the database fetch.
unblock.callback(None)
# The first `get_event` call should complete successfully.
self.get_success(get_event1)

View File

@@ -29,7 +29,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
for device_id in device_ids:
stream_id = self.get_success(
self.store.add_device_change_to_streams(
"user_id", [device_id], ["!some:room"]
user_id, [device_id], ["!some:room"]
)
)