Compare commits
28 Commits
matthew/co
...
dmr/warn-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bdb00ee73b | ||
|
|
b76f1a4d5f | ||
|
|
63ba9ba38b | ||
|
|
9986621bc8 | ||
|
|
9cfecd2dc0 | ||
|
|
56c9c6c465 | ||
|
|
6b64ee9ec7 | ||
|
|
f59e3f4c90 | ||
|
|
6d89f1239c | ||
|
|
c48ab3734e | ||
|
|
706456de1f | ||
|
|
ee1601e59d | ||
|
|
6b9e95015b | ||
|
|
416604e3bc | ||
|
|
a54d9b0508 | ||
|
|
f987cdd80b | ||
|
|
30db7fdb91 | ||
|
|
7c063da25c | ||
|
|
730fcda546 | ||
|
|
99ab45423a | ||
|
|
17d99f758a | ||
|
|
e75c7e3b6d | ||
|
|
8a87b4435a | ||
|
|
813d728d09 | ||
|
|
8bac3e0435 | ||
|
|
185da8f0f2 | ||
|
|
d9b71410c2 | ||
|
|
a36a38b1ca |
90
CHANGES.md
90
CHANGES.md
@@ -1,3 +1,93 @@
|
||||
Synapse 1.58.0rc2 (2022-04-26)
|
||||
==============================
|
||||
|
||||
This release candidate fixes bugs related to Synapse 1.58.0rc1's logic for handling device list updates.
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a bug introduced in Synapse 1.58.0rc1 where the main process could consume excessive amounts of CPU and memory while handling sentry logging failures. ([\#12554](https://github.com/matrix-org/synapse/issues/12554))
|
||||
- Fix a bug introduced in Synapse 1.58.0rc1 where opentracing contexts were not correctly sent to whitelisted remote servers with device lists updates. ([\#12555](https://github.com/matrix-org/synapse/issues/12555))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Reduce unnecessary work when handling remote device list updates. ([\#12557](https://github.com/matrix-org/synapse/issues/12557))
|
||||
|
||||
|
||||
Synapse 1.58.0rc1 (2022-04-26)
|
||||
==============================
|
||||
|
||||
As of this release, the groups/communities feature in Synapse is now disabled by default. See [\#11584](https://github.com/matrix-org/synapse/issues/11584) for details. As mentioned in [the upgrade notes](https://github.com/matrix-org/synapse/blob/develop/docs/upgrade.md#upgrading-to-v1580), this feature will be removed in Synapse 1.61.
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Implement [MSC3383](https://github.com/matrix-org/matrix-spec-proposals/pull/3383) for including the destination in server-to-server authentication headers. Contributed by @Bubu and @jcgruenhage for Famedly. ([\#11398](https://github.com/matrix-org/synapse/issues/11398))
|
||||
- Docker images and Debian packages from matrix.org now contain a locked set of Python dependencies, greatly improving build reproducibility. ([Board](https://github.com/orgs/matrix-org/projects/54), [\#11537](https://github.com/matrix-org/synapse/issues/11537))
|
||||
- Enable processing of device list updates asynchronously. ([\#12365](https://github.com/matrix-org/synapse/issues/12365), [\#12465](https://github.com/matrix-org/synapse/issues/12465))
|
||||
- Implement [MSC2815](https://github.com/matrix-org/matrix-spec-proposals/pull/2815) to allow room moderators to view redacted event content. Contributed by @tulir @ Beeper. ([\#12427](https://github.com/matrix-org/synapse/issues/12427))
|
||||
- Build Debian packages for Ubuntu 22.04 "Jammy Jellyfish". ([\#12543](https://github.com/matrix-org/synapse/issues/12543))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Prevent a sync request from removing a user's busy presence status. ([\#12213](https://github.com/matrix-org/synapse/issues/12213))
|
||||
- Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper. ([\#12319](https://github.com/matrix-org/synapse/issues/12319))
|
||||
- Fix a long-standing bug which incorrectly caused `GET /_matrix/client/v3/rooms/{roomId}/event/{eventId}` to return edited events rather than the original. ([\#12476](https://github.com/matrix-org/synapse/issues/12476))
|
||||
- Fix a bug introduced in Synapse 1.27.0 where the admin API for [deleting forward extremities](https://github.com/matrix-org/synapse/blob/erikj/fix_delete_event_response_count/docs/admin_api/rooms.md#deleting-forward-extremities) would always return a count of 1, no matter how many extremities were deleted. ([\#12496](https://github.com/matrix-org/synapse/issues/12496))
|
||||
- Fix a long-standing bug where the image thumbnails embedded into email notifications were broken. ([\#12510](https://github.com/matrix-org/synapse/issues/12510))
|
||||
- Fix a bug in the implementation of [MSC3202](https://github.com/matrix-org/matrix-spec-proposals/pull/3202) where Synapse would use the field name `device_unused_fallback_keys`, rather than `device_unused_fallback_key_types`. ([\#12520](https://github.com/matrix-org/synapse/issues/12520))
|
||||
- Fix a bug introduced in Synapse 0.99.3 which could cause Synapse to consume large amounts of RAM when back-paginating in a large room. ([\#12522](https://github.com/matrix-org/synapse/issues/12522))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Fix rendering of the documentation site when using the 'print' feature. ([\#12340](https://github.com/matrix-org/synapse/issues/12340))
|
||||
- Add a manual documenting config file options. ([\#12368](https://github.com/matrix-org/synapse/issues/12368), [\#12527](https://github.com/matrix-org/synapse/issues/12527))
|
||||
- Update documentation to reflect that both the `run_background_tasks_on` option and the options for moving stream writers off of the main process are no longer experimental. ([\#12451](https://github.com/matrix-org/synapse/issues/12451))
|
||||
- Update worker documentation and replace old `federation_reader` with `generic_worker`. ([\#12457](https://github.com/matrix-org/synapse/issues/12457))
|
||||
- Strongly recommend [Poetry](https://python-poetry.org/) for development. ([\#12475](https://github.com/matrix-org/synapse/issues/12475))
|
||||
- Add some example configurations for workers and update architectural diagram. ([\#12492](https://github.com/matrix-org/synapse/issues/12492))
|
||||
- Fix a broken link in `README.rst`. ([\#12495](https://github.com/matrix-org/synapse/issues/12495))
|
||||
- Add HAProxy delegation example with CORS headers to docs. ([\#12501](https://github.com/matrix-org/synapse/issues/12501))
|
||||
- Remove extraneous comma in User Admin API's device deletion section so that the example JSON is actually valid and works. Contributed by @olmari. ([\#12533](https://github.com/matrix-org/synapse/issues/12533))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- The groups/communities feature in Synapse is now disabled by default. ([\#12344](https://github.com/matrix-org/synapse/issues/12344))
|
||||
- Remove unstable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). ([\#12382](https://github.com/matrix-org/synapse/issues/12382))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join. ([\#12394](https://github.com/matrix-org/synapse/issues/12394))
|
||||
- Preparation for faster-room-join work: Implement a tracking mechanism to allow functions to wait for full room state to arrive. ([\#12399](https://github.com/matrix-org/synapse/issues/12399))
|
||||
- Remove an unstable identifier from [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083). ([\#12395](https://github.com/matrix-org/synapse/issues/12395))
|
||||
- Run CI in the locked [Poetry](https://python-poetry.org/) environment, and remove corresponding `tox` jobs. ([\#12425](https://github.com/matrix-org/synapse/issues/12425), [\#12434](https://github.com/matrix-org/synapse/issues/12434), [\#12438](https://github.com/matrix-org/synapse/issues/12438), [\#12441](https://github.com/matrix-org/synapse/issues/12441), [\#12449](https://github.com/matrix-org/synapse/issues/12449), [\#12478](https://github.com/matrix-org/synapse/issues/12478), [\#12514](https://github.com/matrix-org/synapse/issues/12514), [\#12472](https://github.com/matrix-org/synapse/issues/12472))
|
||||
- Change Mutual Rooms' `unstable_features` flag to `uk.half-shot.msc2666.mutual_rooms` which matches the current iteration of [MSC2666](https://github.com/matrix-org/matrix-spec-proposals/pull/2666). ([\#12445](https://github.com/matrix-org/synapse/issues/12445))
|
||||
- Fix typo in the release script help string. ([\#12450](https://github.com/matrix-org/synapse/issues/12450))
|
||||
- Fix a minor typo in the Debian changelogs generated by the release script. ([\#12497](https://github.com/matrix-org/synapse/issues/12497))
|
||||
- Reintroduce the list of targets to the linter script, to avoid linting unwanted local-only directories during development. ([\#12455](https://github.com/matrix-org/synapse/issues/12455))
|
||||
- Limit length of `device_id` to less than 512 characters. ([\#12454](https://github.com/matrix-org/synapse/issues/12454))
|
||||
- Dockerfile-workers: reduce the amount we install in the image. ([\#12464](https://github.com/matrix-org/synapse/issues/12464))
|
||||
- Dockerfile-workers: give the master its own log config. ([\#12466](https://github.com/matrix-org/synapse/issues/12466))
|
||||
- complement-synapse-workers: factor out separate entry point script. ([\#12467](https://github.com/matrix-org/synapse/issues/12467))
|
||||
- Back out experimental implementation of [MSC2314](https://github.com/matrix-org/matrix-spec-proposals/pull/2314). ([\#12474](https://github.com/matrix-org/synapse/issues/12474))
|
||||
- Fix grammatical error in federation error response when the room version of a room is unknown. ([\#12483](https://github.com/matrix-org/synapse/issues/12483))
|
||||
- Remove unnecessary configuration overrides in tests. ([\#12511](https://github.com/matrix-org/synapse/issues/12511))
|
||||
- Refactor the relations code for clarity. ([\#12519](https://github.com/matrix-org/synapse/issues/12519))
|
||||
- Add type hints so `docker` and `stubs` directories pass `mypy --disallow-untyped-defs`. ([\#12528](https://github.com/matrix-org/synapse/issues/12528))
|
||||
- Update `delay_cancellation` to accept any awaitable, rather than just `Deferred`s. ([\#12468](https://github.com/matrix-org/synapse/issues/12468))
|
||||
- Handle cancellation in `EventsWorkerStore._get_events_from_cache_or_db`. ([\#12529](https://github.com/matrix-org/synapse/issues/12529))
|
||||
|
||||
|
||||
Synapse 1.57.1 (2022-04-20)
|
||||
===========================
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Implement [MSC3383](https://github.com/matrix-org/matrix-spec-proposals/pull/3383) for including the destination in server-to-server authentication headers. Contributed by @Bubu and @jcgruenhage for Famedly GmbH.
|
||||
@@ -1 +0,0 @@
|
||||
Prevent a sync request from removing a user's busy presence status.
|
||||
@@ -1 +0,0 @@
|
||||
Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.
|
||||
@@ -1 +0,0 @@
|
||||
Use poetry to manage Synapse's dependencies.
|
||||
@@ -1 +0,0 @@
|
||||
Fix rendering of the documentation site when using the 'print' feature.
|
||||
@@ -1 +0,0 @@
|
||||
The groups/communities feature in Synapse has been disabled by default.
|
||||
@@ -1 +0,0 @@
|
||||
Enable processing of device list updates asynchronously.
|
||||
@@ -1 +0,0 @@
|
||||
Add a manual documenting config file options.
|
||||
@@ -1 +0,0 @@
|
||||
Remove unstable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).
|
||||
@@ -1 +0,0 @@
|
||||
Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join.
|
||||
@@ -1 +0,0 @@
|
||||
Remove an unstable identifier from [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083).
|
||||
@@ -1 +0,0 @@
|
||||
Preparation for faster-room-join work: Implement a tracking mechanism to allow functions to wait for full room state to arrive.
|
||||
@@ -1 +0,0 @@
|
||||
Run twisted trunk CI job in the locked poetry environment.
|
||||
@@ -1 +0,0 @@
|
||||
Implement [MSC2815](https://github.com/matrix-org/matrix-spec-proposals/pull/2815) to allow room moderators to view redacted event content. Contributed by @tulir.
|
||||
@@ -1 +0,0 @@
|
||||
Run lints under poetry in CI, and remove corresponding tox lint jobs.
|
||||
@@ -1 +0,0 @@
|
||||
Run "main" trial tests under `poetry`.
|
||||
@@ -1 +0,0 @@
|
||||
Bump twisted version in `poetry.lock` to work around [pip bug #9644](https://github.com/pypa/pip/issues/9644).
|
||||
@@ -1 +0,0 @@
|
||||
Change Mutual Rooms' `unstable_features` flag to `uk.half-shot.msc2666.mutual_rooms` which matches the current MSC iteration.
|
||||
@@ -1 +0,0 @@
|
||||
Use `poetry` to manage the virtualenv in debian packages.
|
||||
@@ -1 +0,0 @@
|
||||
Fix typo in the release script help string.
|
||||
@@ -1 +0,0 @@
|
||||
Update documentation to reflect that both the `run_background_tasks_on` option and the options for moving stream writers off of the main process are no longer experimental.
|
||||
@@ -1 +0,0 @@
|
||||
Limit length of device_id to less than 512 characters.
|
||||
@@ -1 +0,0 @@
|
||||
Reintroduce the list of targets to the linter script, to avoid linting unwanted local-only directories during development.
|
||||
@@ -1 +0,0 @@
|
||||
Update worker documentation and replace old `federation_reader` with `generic_worker`.
|
||||
@@ -1 +0,0 @@
|
||||
Dockerfile-workers: reduce the amount we install in the image.
|
||||
@@ -1 +0,0 @@
|
||||
Enable processing of device list updates asynchronously.
|
||||
@@ -1 +0,0 @@
|
||||
Dockerfile-workers: give the master its own log config.
|
||||
@@ -1 +0,0 @@
|
||||
complement-synapse-workers: factor out separate entry point script.
|
||||
@@ -1 +0,0 @@
|
||||
Update `delay_cancellation` to accept any awaitable, rather than just `Deferred`s.
|
||||
@@ -1 +0,0 @@
|
||||
Add a CI job which tests Synapse against the latest version of all dependencies.
|
||||
@@ -1 +0,0 @@
|
||||
Back out experimental implementation of [MSC2314](https://github.com/matrix-org/matrix-spec-proposals/pull/2314).
|
||||
@@ -1 +0,0 @@
|
||||
Strongly recommend `poetry` for development.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a long-standing bug which incorrectly caused `GET /_matrix/client/r3/rooms/{roomId}/event/{eventId}` to return edited events rather than the original.
|
||||
@@ -1 +0,0 @@
|
||||
Use poetry-core instead of setuptools to build wheels.
|
||||
@@ -1 +0,0 @@
|
||||
Fix grammatical error in federation error response when the room version of a room is unknown.
|
||||
1
changelog.d/12485.misc
Normal file
1
changelog.d/12485.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add some type hints to datastore.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a broken link in `README.rst`.
|
||||
@@ -1 +0,0 @@
|
||||
Fix bug where the admin API for [deleting forward extremities](https://github.com/matrix-org/synapse/blob/erikj/fix_delete_event_response_count/docs/admin_api/rooms.md#deleting-forward-extremities) would always return a count of 1 no matter how many extremities were deleted. Broke in v1.27.0.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a minor typo in the Debian changelogs generated by the release script.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a long-standing bug where the image thumbanils embedded into email notifications were broken.
|
||||
@@ -1 +0,0 @@
|
||||
Remove unnecessary configuration overrides in tests.
|
||||
@@ -1 +0,0 @@
|
||||
Use poetry-core instead of setuptools to build wheels.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug in the implementation of MSC3202 where Synapse would use the field name `device_unused_fallback_keys`, rather than `device_unused_fallback_key_types`.
|
||||
1
changelog.d/12541.docker
Normal file
1
changelog.d/12541.docker
Normal file
@@ -0,0 +1 @@
|
||||
Explicitly opt-in to using [BuildKit-specific features](https://github.com/moby/buildkit/blob/master/frontend/dockerfile/docs/syntax.md) in the Dockerfile. This fixes issues with building images in some GitLab CI environments.
|
||||
1
changelog.d/12544.bugfix
Normal file
1
changelog.d/12544.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.
|
||||
11
debian/changelog
vendored
11
debian/changelog
vendored
@@ -1,8 +1,15 @@
|
||||
matrix-synapse-py3 (1.58.0+nmu1) UNRELEASED; urgency=medium
|
||||
matrix-synapse-py3 (1.58.0~rc2) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.58.0rc2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Apr 2022 17:14:56 +0100
|
||||
|
||||
matrix-synapse-py3 (1.58.0~rc1) stable; urgency=medium
|
||||
|
||||
* Use poetry to manage the bundled virtualenv included with this package.
|
||||
* New Synapse release 1.58.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 30 Mar 2022 12:21:43 +0100
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Apr 2022 11:15:20 +0100
|
||||
|
||||
matrix-synapse-py3 (1.57.1) stable; urgency=medium
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# syntax=docker/dockerfile:1
|
||||
# Dockerfile to build the matrixdotorg/synapse docker images.
|
||||
#
|
||||
# Note that it uses features which are only available in BuildKit - see
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any, Dict, Mapping, Set
|
||||
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Set
|
||||
|
||||
import jinja2
|
||||
import yaml
|
||||
@@ -201,7 +201,7 @@ upstream {upstream_worker_type} {{
|
||||
|
||||
|
||||
# Utility functions
|
||||
def log(txt: str):
|
||||
def log(txt: str) -> None:
|
||||
"""Log something to the stdout.
|
||||
|
||||
Args:
|
||||
@@ -210,7 +210,7 @@ def log(txt: str):
|
||||
print(txt)
|
||||
|
||||
|
||||
def error(txt: str):
|
||||
def error(txt: str) -> NoReturn:
|
||||
"""Log something and exit with an error code.
|
||||
|
||||
Args:
|
||||
@@ -220,7 +220,7 @@ def error(txt: str):
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def convert(src: str, dst: str, **template_vars):
|
||||
def convert(src: str, dst: str, **template_vars: object) -> None:
|
||||
"""Generate a file from a template
|
||||
|
||||
Args:
|
||||
@@ -290,7 +290,7 @@ def add_sharding_to_shared_config(
|
||||
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
|
||||
|
||||
|
||||
def generate_base_homeserver_config():
|
||||
def generate_base_homeserver_config() -> None:
|
||||
"""Starts Synapse and generates a basic homeserver config, which will later be
|
||||
modified for worker support.
|
||||
|
||||
@@ -302,12 +302,14 @@ def generate_base_homeserver_config():
|
||||
subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])
|
||||
|
||||
|
||||
def generate_worker_files(environ, config_path: str, data_dir: str):
|
||||
def generate_worker_files(
|
||||
environ: Mapping[str, str], config_path: str, data_dir: str
|
||||
) -> None:
|
||||
"""Read the desired list of workers from environment variables and generate
|
||||
shared homeserver, nginx and supervisord configs.
|
||||
|
||||
Args:
|
||||
environ: _Environ[str]
|
||||
environ: os.environ instance.
|
||||
config_path: The location of the generated Synapse main worker config file.
|
||||
data_dir: The location of the synapse data directory. Where log and
|
||||
user-facing config files live.
|
||||
@@ -369,13 +371,13 @@ def generate_worker_files(environ, config_path: str, data_dir: str):
|
||||
nginx_locations = {}
|
||||
|
||||
# Read the desired worker configuration from the environment
|
||||
worker_types = environ.get("SYNAPSE_WORKER_TYPES")
|
||||
if worker_types is None:
|
||||
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES")
|
||||
if worker_types_env is None:
|
||||
# No workers, just the main process
|
||||
worker_types = []
|
||||
else:
|
||||
# Split type names by comma
|
||||
worker_types = worker_types.split(",")
|
||||
worker_types = worker_types_env.split(",")
|
||||
|
||||
# Create the worker configuration directory if it doesn't already exist
|
||||
os.makedirs("/conf/workers", exist_ok=True)
|
||||
@@ -547,7 +549,7 @@ def generate_worker_log_config(
|
||||
return log_config_filepath
|
||||
|
||||
|
||||
def main(args, environ):
|
||||
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||
|
||||
@@ -6,27 +6,28 @@ import os
|
||||
import platform
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional
|
||||
|
||||
import jinja2
|
||||
|
||||
|
||||
# Utility functions
|
||||
def log(txt):
|
||||
def log(txt: str) -> None:
|
||||
print(txt, file=sys.stderr)
|
||||
|
||||
|
||||
def error(txt):
|
||||
def error(txt: str) -> NoReturn:
|
||||
log(txt)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def convert(src, dst, environ):
|
||||
def convert(src: str, dst: str, environ: Mapping[str, object]) -> None:
|
||||
"""Generate a file from a template
|
||||
|
||||
Args:
|
||||
src (str): path to input file
|
||||
dst (str): path to file to write
|
||||
environ (dict): environment dictionary, for replacement mappings.
|
||||
src: path to input file
|
||||
dst: path to file to write
|
||||
environ: environment dictionary, for replacement mappings.
|
||||
"""
|
||||
with open(src) as infile:
|
||||
template = infile.read()
|
||||
@@ -35,25 +36,30 @@ def convert(src, dst, environ):
|
||||
outfile.write(rendered)
|
||||
|
||||
|
||||
def generate_config_from_template(config_dir, config_path, environ, ownership):
|
||||
def generate_config_from_template(
|
||||
config_dir: str,
|
||||
config_path: str,
|
||||
os_environ: Mapping[str, str],
|
||||
ownership: Optional[str],
|
||||
) -> None:
|
||||
"""Generate a homeserver.yaml from environment variables
|
||||
|
||||
Args:
|
||||
config_dir (str): where to put generated config files
|
||||
config_path (str): where to put the main config file
|
||||
environ (dict): environment dictionary
|
||||
ownership (str|None): "<user>:<group>" string which will be used to set
|
||||
config_dir: where to put generated config files
|
||||
config_path: where to put the main config file
|
||||
os_environ: environment mapping
|
||||
ownership: "<user>:<group>" string which will be used to set
|
||||
ownership of the generated configs. If None, ownership will not change.
|
||||
"""
|
||||
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
|
||||
if v not in environ:
|
||||
if v not in os_environ:
|
||||
error(
|
||||
"Environment variable '%s' is mandatory when generating a config file."
|
||||
% (v,)
|
||||
)
|
||||
|
||||
# populate some params from data files (if they exist, else create new ones)
|
||||
environ = environ.copy()
|
||||
environ: Dict[str, Any] = dict(os_environ)
|
||||
secrets = {
|
||||
"registration": "SYNAPSE_REGISTRATION_SHARED_SECRET",
|
||||
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY",
|
||||
@@ -127,12 +133,12 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
|
||||
subprocess.check_output(args)
|
||||
|
||||
|
||||
def run_generate_config(environ, ownership):
|
||||
def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) -> None:
|
||||
"""Run synapse with a --generate-config param to generate a template config file
|
||||
|
||||
Args:
|
||||
environ (dict): env var dict
|
||||
ownership (str|None): "userid:groupid" arg for chmod. If None, ownership will not change.
|
||||
environ: env vars from `os.enrivon`.
|
||||
ownership: "userid:groupid" arg for chmod. If None, ownership will not change.
|
||||
|
||||
Never returns.
|
||||
"""
|
||||
@@ -178,7 +184,7 @@ def run_generate_config(environ, ownership):
|
||||
os.execv(sys.executable, args)
|
||||
|
||||
|
||||
def main(args, environ):
|
||||
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
mode = args[1] if len(args) > 1 else "run"
|
||||
|
||||
# if we were given an explicit user to switch to, do so
|
||||
|
||||
@@ -804,7 +804,7 @@ POST /_synapse/admin/v2/users/<user_id>/delete_devices
|
||||
"devices": [
|
||||
"QBUAZIFURK",
|
||||
"AUIECTSRND"
|
||||
],
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -206,6 +206,28 @@ backend matrix
|
||||
server matrix 127.0.0.1:8008
|
||||
```
|
||||
|
||||
|
||||
[Delegation](delegate.md) example:
|
||||
```
|
||||
frontend https
|
||||
acl matrix-well-known-client-path path /.well-known/matrix/client
|
||||
acl matrix-well-known-server-path path /.well-known/matrix/server
|
||||
use_backend matrix-well-known-client if matrix-well-known-client-path
|
||||
use_backend matrix-well-known-server if matrix-well-known-server-path
|
||||
|
||||
backend matrix-well-known-client
|
||||
http-after-response set-header Access-Control-Allow-Origin "*"
|
||||
http-after-response set-header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
|
||||
http-after-response set-header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
|
||||
http-request return status 200 content-type application/json string '{"m.homeserver":{"base_url":"https://matrix.example.com"},"m.identity_server":{"base_url":"https://identity.example.com"}}'
|
||||
|
||||
backend matrix-well-known-server
|
||||
http-after-response set-header Access-Control-Allow-Origin "*"
|
||||
http-after-response set-header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, OPTIONS"
|
||||
http-after-response set-header Access-Control-Allow-Headers "Origin, X-Requested-With, Content-Type, Accept, Authorization"
|
||||
http-request return status 200 content-type application/json string '{"m.server":"matrix.example.com:443"}'
|
||||
```
|
||||
|
||||
### Relayd
|
||||
|
||||
```
|
||||
|
||||
8
docs/systemd-with-workers/workers/background_worker.yaml
Normal file
8
docs/systemd-with-workers/workers/background_worker.yaml
Normal file
@@ -0,0 +1,8 @@
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: background_worker
|
||||
|
||||
# The replication listener on the main synapse process.
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_log_config: /etc/matrix-synapse/background-worker-log.yaml
|
||||
23
docs/systemd-with-workers/workers/event_persister.yaml
Normal file
23
docs/systemd-with-workers/workers/event_persister.yaml
Normal file
@@ -0,0 +1,23 @@
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: event_persister1
|
||||
|
||||
# The replication listener on the main synapse process.
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: 8034
|
||||
resources:
|
||||
- names: [replication]
|
||||
|
||||
# Enable listener if this stream writer handles endpoints for the `typing` or
|
||||
# `to_device` streams. Uses a different port to the `replication` listener to
|
||||
# avoid exposing the `replication` listener publicly.
|
||||
#
|
||||
#- type: http
|
||||
# port: 8035
|
||||
# resources:
|
||||
# - names: [client]
|
||||
|
||||
worker_log_config: /etc/matrix-synapse/event-persister-log.yaml
|
||||
@@ -1,12 +1,13 @@
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: generic_worker1
|
||||
|
||||
# The replication listener on the main synapse process.
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: 8011
|
||||
port: 8083
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
|
||||
|
||||
@@ -64,7 +64,49 @@ apply if you want your config file to be read properly. A few helpful things to
|
||||
In addition, each setting has an example of its usage, with the proper indentation
|
||||
shown.
|
||||
|
||||
|
||||
## Contents
|
||||
[Modules](#modules)
|
||||
|
||||
[Server](#server)
|
||||
|
||||
[Homeserver Blocking](#homeserver-blocking)
|
||||
|
||||
[TLS](#tls)
|
||||
|
||||
[Federation](#federation)
|
||||
|
||||
[Caching](#caching)
|
||||
|
||||
[Database](#database)
|
||||
|
||||
[Logging](#logging)
|
||||
|
||||
[Ratelimiting](#ratelimiting)
|
||||
|
||||
[Media Store](#media-store)
|
||||
|
||||
[Captcha](#captcha)
|
||||
|
||||
[TURN](#turn)
|
||||
|
||||
[Registration](#registration)
|
||||
|
||||
[API Configuration](#api-configuration)
|
||||
|
||||
[Signing Keys](#signing-keys)
|
||||
|
||||
[Single Sign On Integration](#single-sign-on-integration)
|
||||
|
||||
[Push](#push)
|
||||
|
||||
[Rooms](#rooms)
|
||||
|
||||
[Opentracing](#opentracing)
|
||||
|
||||
[Workers](#workers)
|
||||
|
||||
[Background Updates](#background-updates)
|
||||
|
||||
## Modules
|
||||
|
||||
Server admins can expand Synapse's functionality with external modules.
|
||||
@@ -3409,4 +3451,4 @@ background_updates:
|
||||
sleep_duration_ms: 300
|
||||
min_batch_size: 10
|
||||
default_batch_size: 50
|
||||
```
|
||||
```
|
||||
|
||||
@@ -138,20 +138,7 @@ as the `listeners` option in the shared config.
|
||||
For example:
|
||||
|
||||
```yaml
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: worker1
|
||||
|
||||
# The replication listener on the main synapse process.
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: 8083
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
|
||||
worker_log_config: /home/matrix/synapse/config/worker1_log_config.yaml
|
||||
{{#include systemd-with-workers/workers/generic_worker.yaml}}
|
||||
```
|
||||
|
||||
...is a full configuration for a generic worker instance, which will expose a
|
||||
@@ -363,6 +350,12 @@ stream_writers:
|
||||
events: event_persister1
|
||||
```
|
||||
|
||||
An example for a stream writer instance:
|
||||
|
||||
```yaml
|
||||
{{#include systemd-with-workers/workers/event_persister.yaml}}
|
||||
```
|
||||
|
||||
Some of the streams have associated endpoints which, for maximum efficiency, should
|
||||
be routed to the workers handling that stream. See below for the currently supported
|
||||
streams and the endpoints associated with them:
|
||||
@@ -436,6 +429,12 @@ run_background_tasks_on: background_worker
|
||||
You might also wish to investigate the `update_user_directory` and
|
||||
`media_instance_running_background_jobs` settings.
|
||||
|
||||
An example for a dedicated background worker instance:
|
||||
|
||||
```yaml
|
||||
{{#include systemd-with-workers/workers/background_worker.yaml}}
|
||||
```
|
||||
|
||||
### `synapse.app.pusher`
|
||||
|
||||
Handles sending push notifications to sygnal and email. Doesn't handle any
|
||||
@@ -615,14 +614,14 @@ The following shows an example setup using Redis and a reverse proxy:
|
||||
| Main | | Generic | | Generic | | Event |
|
||||
| Process | | Worker 1 | | Worker 2 | | Persister |
|
||||
+--------------+ +--------------+ +--------------+ +--------------+
|
||||
^ ^ | ^ | | ^ | ^ ^
|
||||
| | | | | | | | | |
|
||||
| | | | | HTTP | | | | |
|
||||
| +----------+<--|---|---------+ | | | |
|
||||
| | +-------------|-->+----------+ |
|
||||
| | | |
|
||||
| | | |
|
||||
v v v v
|
||||
====================================================================
|
||||
^ ^ | ^ | | ^ | | ^ ^
|
||||
| | | | | | | | | | |
|
||||
| | | | | HTTP | | | | | |
|
||||
| +----------+<--|---|---------+<--|---|---------+ | |
|
||||
| | +-------------|-->+-------------+ |
|
||||
| | | |
|
||||
| | | |
|
||||
v v v v
|
||||
======================================================================
|
||||
Redis pub/sub channel
|
||||
```
|
||||
|
||||
62
poetry.lock
generated
62
poetry.lock
generated
@@ -1,11 +1,3 @@
|
||||
[[package]]
|
||||
name = "appdirs"
|
||||
version = "1.4.4"
|
||||
description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[[package]]
|
||||
name = "attrs"
|
||||
version = "21.4.0"
|
||||
@@ -49,17 +41,6 @@ six = "*"
|
||||
[package.extras]
|
||||
visualize = ["graphviz (>0.5.1)", "Twisted (>=16.1.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "baron"
|
||||
version = "0.10.1"
|
||||
description = "Full Syntax Tree for python to make writing refactoring code a realist task"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[package.dependencies]
|
||||
rply = "*"
|
||||
|
||||
[[package]]
|
||||
name = "bcrypt"
|
||||
version = "3.2.0"
|
||||
@@ -984,20 +965,6 @@ Pygments = ">=2.5.1"
|
||||
[package.extras]
|
||||
md = ["cmarkgfm (>=0.8.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "redbaron"
|
||||
version = "0.9.2"
|
||||
description = "Abstraction on top of baron, a FST for python to make writing refactoring code a realistic task"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[package.dependencies]
|
||||
baron = ">=0.7"
|
||||
|
||||
[package.extras]
|
||||
notebook = ["pygments"]
|
||||
|
||||
[[package]]
|
||||
name = "requests"
|
||||
version = "2.27.1"
|
||||
@@ -1038,17 +1005,6 @@ python-versions = ">=3.7"
|
||||
[package.extras]
|
||||
idna2008 = ["idna"]
|
||||
|
||||
[[package]]
|
||||
name = "rply"
|
||||
version = "0.7.8"
|
||||
description = "A pure Python Lex/Yacc that works with RPython"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[package.dependencies]
|
||||
appdirs = "*"
|
||||
|
||||
[[package]]
|
||||
name = "secretstorage"
|
||||
version = "3.3.1"
|
||||
@@ -1597,13 +1553,9 @@ url_preview = ["lxml"]
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = "^3.7"
|
||||
content-hash = "964ad29eaf7fd02749a4e735818f3bc0ba729c2f4b9e3213f0daa02643508b16"
|
||||
content-hash = "f482a4f594a165dfe01ce253a22510d5faf38647ab0dcebc35789350cafd9bf0"
|
||||
|
||||
[metadata.files]
|
||||
appdirs = [
|
||||
{file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"},
|
||||
{file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"},
|
||||
]
|
||||
attrs = [
|
||||
{file = "attrs-21.4.0-py2.py3-none-any.whl", hash = "sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"},
|
||||
{file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"},
|
||||
@@ -1616,10 +1568,6 @@ automat = [
|
||||
{file = "Automat-20.2.0-py2.py3-none-any.whl", hash = "sha256:b6feb6455337df834f6c9962d6ccf771515b7d939bca142b29c20c2376bc6111"},
|
||||
{file = "Automat-20.2.0.tar.gz", hash = "sha256:7979803c74610e11ef0c0d68a2942b152df52da55336e0c9d58daf1831cbdf33"},
|
||||
]
|
||||
baron = [
|
||||
{file = "baron-0.10.1-py2.py3-none-any.whl", hash = "sha256:befb33f4b9e832c7cd1e3cf0eafa6dd3cb6ed4cb2544245147c019936f4e0a8a"},
|
||||
{file = "baron-0.10.1.tar.gz", hash = "sha256:af822ad44d4eb425c8516df4239ac4fdba9fdb398ef77e4924cd7c9b4045bc2f"},
|
||||
]
|
||||
bcrypt = [
|
||||
{file = "bcrypt-3.2.0-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:b589229207630484aefe5899122fb938a5b017b0f4349f769b8c13e78d99a8fd"},
|
||||
{file = "bcrypt-3.2.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c95d4cbebffafcdd28bd28bb4e25b31c50f6da605c81ffd9ad8a3d1b2ab7b1b6"},
|
||||
@@ -2412,10 +2360,6 @@ readme-renderer = [
|
||||
{file = "readme_renderer-33.0-py3-none-any.whl", hash = "sha256:f02cee0c4de9636b5a62b6be50c9742427ba1b956aad1d938bfb087d0d72ccdf"},
|
||||
{file = "readme_renderer-33.0.tar.gz", hash = "sha256:e3b53bc84bd6af054e4cc1fe3567dc1ae19f554134221043a3f8c674e22209db"},
|
||||
]
|
||||
redbaron = [
|
||||
{file = "redbaron-0.9.2-py2.py3-none-any.whl", hash = "sha256:d01032b6a848b5521a8d6ef72486315c2880f420956870cdd742e2b5a09b9bab"},
|
||||
{file = "redbaron-0.9.2.tar.gz", hash = "sha256:472d0739ca6b2240bb2278ae428604a75472c9c12e86c6321e8c016139c0132f"},
|
||||
]
|
||||
requests = [
|
||||
{file = "requests-2.27.1-py2.py3-none-any.whl", hash = "sha256:f22fa1e554c9ddfd16e6e41ac79759e17be9e492b3587efa038054674760e72d"},
|
||||
{file = "requests-2.27.1.tar.gz", hash = "sha256:68d7c56fd5a8999887728ef304a6d12edc7be74f1cfa47714fc8b414525c9a61"},
|
||||
@@ -2428,10 +2372,6 @@ rfc3986 = [
|
||||
{file = "rfc3986-2.0.0-py2.py3-none-any.whl", hash = "sha256:50b1502b60e289cb37883f3dfd34532b8873c7de9f49bb546641ce9cbd256ebd"},
|
||||
{file = "rfc3986-2.0.0.tar.gz", hash = "sha256:97aacf9dbd4bfd829baad6e6309fa6573aaf1be3f6fa735c8ab05e46cecb261c"},
|
||||
]
|
||||
rply = [
|
||||
{file = "rply-0.7.8-py2.py3-none-any.whl", hash = "sha256:28ffd11d656c48aeb8c508eb382acd6a0bd906662624b34388751732a27807e7"},
|
||||
{file = "rply-0.7.8.tar.gz", hash = "sha256:2a808ac25a4580a9991fc304d64434e299a8fc75760574492f242cbb5bb301c9"},
|
||||
]
|
||||
secretstorage = [
|
||||
{file = "SecretStorage-3.3.1-py3-none-any.whl", hash = "sha256:422d82c36172d88d6a0ed5afdec956514b189ddbfb72fefab0c8a1cee4eaf71f"},
|
||||
{file = "SecretStorage-3.3.1.tar.gz", hash = "sha256:fd666c51a6bf200643495a04abb261f83229dcb6fd8472ec393df7ffc8b6f195"},
|
||||
|
||||
@@ -54,7 +54,7 @@ skip_gitignore = true
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.57.1"
|
||||
version = "1.58.0rc2"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
@@ -270,7 +270,6 @@ idna = ">=2.5"
|
||||
|
||||
# The following are used by the release script
|
||||
click = "==8.1.0"
|
||||
redbaron = "==0.9.2"
|
||||
GitPython = "==3.1.14"
|
||||
commonmark = "==0.9.1"
|
||||
pygithub = "==1.55"
|
||||
|
||||
@@ -26,6 +26,7 @@ DISTS = (
|
||||
"debian:sid",
|
||||
"ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14)
|
||||
"ubuntu:impish", # 21.10 (EOL 2022-07)
|
||||
"ubuntu:jammy", # 22.04 LTS (EOL 2027-04)
|
||||
)
|
||||
|
||||
DESC = """\
|
||||
|
||||
@@ -25,13 +25,12 @@ import sys
|
||||
import urllib.request
|
||||
from os import path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import List, Optional, Tuple
|
||||
from typing import List, Optional
|
||||
|
||||
import attr
|
||||
import click
|
||||
import commonmark
|
||||
import git
|
||||
import redbaron
|
||||
from click.exceptions import ClickException
|
||||
from github import Github
|
||||
from packaging import version
|
||||
@@ -100,7 +99,7 @@ def prepare():
|
||||
repo.remote().fetch()
|
||||
|
||||
# Get the current version and AST from root Synapse module.
|
||||
current_version, parsed_synapse_ast, version_node = parse_version_from_module()
|
||||
current_version = get_package_version()
|
||||
|
||||
# Figure out what sort of release we're doing and calcuate the new version.
|
||||
rc = click.confirm("RC", default=True)
|
||||
@@ -162,7 +161,7 @@ def prepare():
|
||||
click.get_current_context().abort()
|
||||
|
||||
# Switch to the release branch.
|
||||
parsed_new_version = version.parse(new_version)
|
||||
parsed_new_version: version.Version = version.parse(new_version)
|
||||
|
||||
# We assume for debian changelogs that we only do RCs or full releases.
|
||||
assert not parsed_new_version.is_devrelease
|
||||
@@ -207,17 +206,15 @@ def prepare():
|
||||
# Create the new release branch
|
||||
release_branch = repo.create_head(release_branch_name, commit=base_branch)
|
||||
|
||||
# Switch to the release branch and ensure its up to date.
|
||||
# Switch to the release branch and ensure it's up to date.
|
||||
repo.git.checkout(release_branch_name)
|
||||
update_branch(repo)
|
||||
|
||||
# Update the `__version__` variable and write it back to the file.
|
||||
version_node.value = '"' + new_version + '"'
|
||||
with open("synapse/__init__.py", "w") as f:
|
||||
f.write(parsed_synapse_ast.dumps())
|
||||
# Update the version specified in pyproject.toml.
|
||||
subprocess.check_output(["poetry", "version", new_version])
|
||||
|
||||
# Generate changelogs.
|
||||
generate_and_write_changelog(current_version)
|
||||
generate_and_write_changelog(current_version, new_version)
|
||||
|
||||
# Generate debian changelogs
|
||||
if parsed_new_version.pre is not None:
|
||||
@@ -284,7 +281,7 @@ def tag(gh_token: Optional[str]):
|
||||
repo.remote().fetch()
|
||||
|
||||
# Find out the version and tag name.
|
||||
current_version, _, _ = parse_version_from_module()
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
|
||||
# Check we haven't released this version.
|
||||
@@ -362,7 +359,7 @@ def publish(gh_token: str):
|
||||
if repo.is_dirty():
|
||||
raise click.ClickException("Uncommitted changes exist.")
|
||||
|
||||
current_version, _, _ = parse_version_from_module()
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
|
||||
if not click.confirm(f"Publish {tag_name}?", default=True):
|
||||
@@ -396,7 +393,7 @@ def publish(gh_token: str):
|
||||
def upload():
|
||||
"""Upload release to pypi."""
|
||||
|
||||
current_version, _, _ = parse_version_from_module()
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
|
||||
pypi_asset_names = [
|
||||
@@ -424,7 +421,7 @@ def upload():
|
||||
def announce():
|
||||
"""Generate markdown to announce the release."""
|
||||
|
||||
current_version, _, _ = parse_version_from_module()
|
||||
current_version = get_package_version()
|
||||
tag_name = f"v{current_version}"
|
||||
|
||||
click.echo(
|
||||
@@ -455,37 +452,11 @@ Announce the release in
|
||||
)
|
||||
|
||||
|
||||
def parse_version_from_module() -> Tuple[
|
||||
version.Version, redbaron.RedBaron, redbaron.Node
|
||||
]:
|
||||
# Parse the AST and load the `__version__` node so that we can edit it
|
||||
# later.
|
||||
with open("synapse/__init__.py") as f:
|
||||
red = redbaron.RedBaron(f.read())
|
||||
|
||||
version_node = None
|
||||
for node in red:
|
||||
if node.type != "assignment":
|
||||
continue
|
||||
|
||||
if node.target.type != "name":
|
||||
continue
|
||||
|
||||
if node.target.value != "__version__":
|
||||
continue
|
||||
|
||||
version_node = node
|
||||
break
|
||||
|
||||
if not version_node:
|
||||
print("Failed to find '__version__' definition in synapse/__init__.py")
|
||||
sys.exit(1)
|
||||
|
||||
# Parse the current version.
|
||||
current_version = version.parse(version_node.value.value.strip('"'))
|
||||
assert isinstance(current_version, version.Version)
|
||||
|
||||
return current_version, red, version_node
|
||||
def get_package_version() -> version.Version:
|
||||
version_string = subprocess.check_output(["poetry", "version", "--short"]).decode(
|
||||
"utf-8"
|
||||
)
|
||||
return version.Version(version_string)
|
||||
|
||||
|
||||
def find_ref(repo: git.Repo, ref_name: str) -> Optional[git.HEAD]:
|
||||
@@ -565,11 +536,13 @@ def get_changes_for_version(wanted_version: version.Version) -> str:
|
||||
return "\n".join(version_changelog)
|
||||
|
||||
|
||||
def generate_and_write_changelog(current_version: version.Version):
|
||||
def generate_and_write_changelog(current_version: version.Version, new_version: str):
|
||||
# We do this by getting a draft so that we can edit it before writing to the
|
||||
# changelog.
|
||||
result = run_until_successful(
|
||||
"python3 -m towncrier --draft", shell=True, capture_output=True
|
||||
f"python3 -m towncrier build --draft --version {new_version}",
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
)
|
||||
new_changes = result.stdout.decode("utf-8")
|
||||
new_changes = new_changes.replace(
|
||||
|
||||
@@ -103,7 +103,7 @@ class SortedDict(Dict[_KT, _VT]):
|
||||
self,
|
||||
start: Optional[int] = ...,
|
||||
stop: Optional[int] = ...,
|
||||
reverse=bool,
|
||||
reverse: bool = ...,
|
||||
) -> Iterator[_KT]: ...
|
||||
def bisect_left(self, value: _KT) -> int: ...
|
||||
def bisect_right(self, value: _KT) -> int: ...
|
||||
|
||||
@@ -81,7 +81,7 @@ class SortedList(MutableSequence[_T]):
|
||||
self,
|
||||
start: Optional[int] = ...,
|
||||
stop: Optional[int] = ...,
|
||||
reverse=bool,
|
||||
reverse: bool = ...,
|
||||
) -> Iterator[_T]: ...
|
||||
def _islice(
|
||||
self,
|
||||
@@ -153,14 +153,14 @@ class SortedKeyList(SortedList[_T]):
|
||||
maximum: Optional[int] = ...,
|
||||
inclusive: Tuple[bool, bool] = ...,
|
||||
reverse: bool = ...,
|
||||
): ...
|
||||
) -> Iterator[_T]: ...
|
||||
def irange_key(
|
||||
self,
|
||||
min_key: Optional[Any] = ...,
|
||||
max_key: Optional[Any] = ...,
|
||||
inclusive: Tuple[bool, bool] = ...,
|
||||
reserve: bool = ...,
|
||||
): ...
|
||||
) -> Iterator[_T]: ...
|
||||
def bisect_left(self, value: _T) -> int: ...
|
||||
def bisect_right(self, value: _T) -> int: ...
|
||||
def bisect(self, value: _T) -> int: ...
|
||||
|
||||
@@ -103,7 +103,7 @@ class SortedSet(MutableSet[_T], Sequence[_T]):
|
||||
self,
|
||||
start: Optional[int] = ...,
|
||||
stop: Optional[int] = ...,
|
||||
reverse=bool,
|
||||
reverse: bool = ...,
|
||||
) -> Iterator[_T]: ...
|
||||
def irange(
|
||||
self,
|
||||
|
||||
@@ -18,6 +18,8 @@ from typing import Any, List, Optional, Type, Union
|
||||
|
||||
from twisted.internet import protocol
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.internet.interfaces import IAddress
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
class RedisProtocol(protocol.Protocol):
|
||||
def publish(self, channel: str, message: bytes) -> "Deferred[None]": ...
|
||||
@@ -34,11 +36,14 @@ class RedisProtocol(protocol.Protocol):
|
||||
def get(self, key: str) -> "Deferred[Any]": ...
|
||||
|
||||
class SubscriberProtocol(RedisProtocol):
|
||||
def __init__(self, *args, **kwargs): ...
|
||||
def __init__(self, *args: object, **kwargs: object): ...
|
||||
password: Optional[str]
|
||||
def subscribe(self, channels: Union[str, List[str]]): ...
|
||||
def connectionMade(self): ...
|
||||
def connectionLost(self, reason): ...
|
||||
def subscribe(self, channels: Union[str, List[str]]) -> "Deferred[None]": ...
|
||||
def connectionMade(self) -> None: ...
|
||||
# type-ignore: twisted.internet.protocol.Protocol provides a default argument for
|
||||
# `reason`. txredisapi's LineReceiver Protocol doesn't. But that's fine: it's what's
|
||||
# actually specified in twisted.internet.interfaces.IProtocol.
|
||||
def connectionLost(self, reason: Failure) -> None: ... # type: ignore[override]
|
||||
|
||||
def lazyConnection(
|
||||
host: str = ...,
|
||||
@@ -74,7 +79,7 @@ class RedisFactory(protocol.ReconnectingClientFactory):
|
||||
replyTimeout: Optional[int] = None,
|
||||
convertNumbers: Optional[int] = True,
|
||||
): ...
|
||||
def buildProtocol(self, addr) -> RedisProtocol: ...
|
||||
def buildProtocol(self, addr: IAddress) -> RedisProtocol: ...
|
||||
|
||||
class SubscriberFactory(RedisFactory):
|
||||
def __init__(self) -> None: ...
|
||||
|
||||
@@ -479,9 +479,9 @@ class EventClientSerializer:
|
||||
Args:
|
||||
event: The event being serialized.
|
||||
time_now: The current time in milliseconds
|
||||
config: Event serialization config
|
||||
aggregations: The bundled aggregation to serialize.
|
||||
serialized_event: The serialized event which may be modified.
|
||||
config: Event serialization config
|
||||
apply_edits: Whether the content of the event should be modified to reflect
|
||||
any replacement in `aggregations.replace`.
|
||||
"""
|
||||
|
||||
@@ -416,7 +416,7 @@ class ApplicationServicesHandler:
|
||||
return typing
|
||||
|
||||
async def _handle_receipts(
|
||||
self, service: ApplicationService, new_token: Optional[int]
|
||||
self, service: ApplicationService, new_token: int
|
||||
) -> List[JsonDict]:
|
||||
"""
|
||||
Return the latest read receipts that the given application service should receive.
|
||||
@@ -447,7 +447,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
receipts_source = self.event_sources.sources.receipt
|
||||
receipts, _ = await receipts_source.get_new_events_as(
|
||||
service=service, from_key=from_key
|
||||
service=service, from_key=from_key, to_key=new_token
|
||||
)
|
||||
return receipts
|
||||
|
||||
|
||||
@@ -505,8 +505,9 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
"device_list_key", position, users={user_id}, rooms=room_ids
|
||||
)
|
||||
|
||||
# We may need to do some processing asynchronously.
|
||||
self._handle_new_device_update_async()
|
||||
# We may need to do some processing asynchronously for local user IDs.
|
||||
if self.hs.is_mine_id(user_id):
|
||||
self._handle_new_device_update_async()
|
||||
|
||||
async def notify_user_signature_update(
|
||||
self, from_user_id: str, user_ids: List[str]
|
||||
@@ -683,9 +684,12 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
self.federation_sender.send_device_messages(
|
||||
host, immediate=False
|
||||
)
|
||||
log_kv(
|
||||
{"message": "sent device update to host", "host": host}
|
||||
)
|
||||
# TODO: when called, this isn't in a logging context.
|
||||
# This leads to log spam, sentry event spam, and massive
|
||||
# memory usage. See #12552.
|
||||
# log_kv(
|
||||
# {"message": "sent device update to host", "host": host}
|
||||
# )
|
||||
|
||||
if current_stream_id != stream_id:
|
||||
# Clear the set of hosts we've already sent to as we're
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2020 Sorunome
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@@ -15,10 +15,14 @@
|
||||
|
||||
"""Contains handlers for federation events."""
|
||||
|
||||
import enum
|
||||
import itertools
|
||||
import logging
|
||||
from enum import Enum
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
from unpaddedbase64 import decode_base64
|
||||
@@ -92,6 +96,24 @@ def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
|
||||
return sorted(joined_domains.items(), key=lambda d: d[1])
|
||||
|
||||
|
||||
class _BackfillPointType(Enum):
|
||||
# a regular backwards extremity (ie, an event which we don't yet have, but which
|
||||
# is referred to by other events in the DAG)
|
||||
BACKWARDS_EXTREMITY = enum.auto()
|
||||
|
||||
# an MSC2716 "insertion event"
|
||||
INSERTION_PONT = enum.auto()
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True, frozen=True)
|
||||
class _BackfillPoint:
|
||||
"""A potential point we might backfill from"""
|
||||
|
||||
event_id: str
|
||||
depth: int
|
||||
type: _BackfillPointType
|
||||
|
||||
|
||||
class FederationHandler:
|
||||
"""Handles general incoming federation requests
|
||||
|
||||
@@ -157,89 +179,51 @@ class FederationHandler:
|
||||
async def _maybe_backfill_inner(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
) -> bool:
|
||||
oldest_events_with_depth = (
|
||||
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
|
||||
)
|
||||
backwards_extremities = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
|
||||
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
|
||||
room_id
|
||||
)
|
||||
]
|
||||
|
||||
insertion_events_to_be_backfilled: Dict[str, int] = {}
|
||||
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
|
||||
if self.hs.config.experimental.msc2716_enabled:
|
||||
insertion_events_to_be_backfilled = (
|
||||
await self.store.get_insertion_event_backward_extremities_in_room(
|
||||
insertion_events_to_be_backfilled = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
|
||||
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
|
||||
room_id
|
||||
)
|
||||
)
|
||||
]
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
|
||||
oldest_events_with_depth,
|
||||
"_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
|
||||
backwards_extremities,
|
||||
insertion_events_to_be_backfilled,
|
||||
)
|
||||
|
||||
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
|
||||
if not backwards_extremities and not insertion_events_to_be_backfilled:
|
||||
logger.debug("Not backfilling as no extremeties found.")
|
||||
return False
|
||||
|
||||
# We only want to paginate if we can actually see the events we'll get,
|
||||
# as otherwise we'll just spend a lot of resources to get redacted
|
||||
# events.
|
||||
#
|
||||
# We do this by filtering all the backwards extremities and seeing if
|
||||
# any remain. Given we don't have the extremity events themselves, we
|
||||
# need to actually check the events that reference them.
|
||||
#
|
||||
# *Note*: the spec wants us to keep backfilling until we reach the start
|
||||
# of the room in case we are allowed to see some of the history. However
|
||||
# in practice that causes more issues than its worth, as a) its
|
||||
# relatively rare for there to be any visible history and b) even when
|
||||
# there is its often sufficiently long ago that clients would stop
|
||||
# attempting to paginate before backfill reached the visible history.
|
||||
#
|
||||
# TODO: If we do do a backfill then we should filter the backwards
|
||||
# extremities to only include those that point to visible portions of
|
||||
# history.
|
||||
#
|
||||
# TODO: Correctly handle the case where we are allowed to see the
|
||||
# forward event but not the backward extremity, e.g. in the case of
|
||||
# initial join of the server where we are allowed to see the join
|
||||
# event but not anything before it. This would require looking at the
|
||||
# state *before* the event, ignoring the special casing certain event
|
||||
# types have.
|
||||
|
||||
forward_event_ids = await self.store.get_successor_events(
|
||||
list(oldest_events_with_depth)
|
||||
# we now have a list of potential places to backpaginate from. We prefer to
|
||||
# start with the most recent (ie, max depth), so let's sort the list.
|
||||
sorted_backfill_points: List[_BackfillPoint] = sorted(
|
||||
itertools.chain(
|
||||
backwards_extremities,
|
||||
insertion_events_to_be_backfilled,
|
||||
),
|
||||
key=lambda e: -int(e.depth),
|
||||
)
|
||||
|
||||
extremities_events = await self.store.get_events(
|
||||
forward_event_ids,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
get_prev_content=False,
|
||||
)
|
||||
|
||||
# We set `check_history_visibility_only` as we might otherwise get false
|
||||
# positives from users having been erased.
|
||||
filtered_extremities = await filter_events_for_server(
|
||||
self.storage,
|
||||
self.server_name,
|
||||
list(extremities_events.values()),
|
||||
redact=False,
|
||||
check_history_visibility_only=True,
|
||||
)
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
|
||||
"_maybe_backfill_inner: room_id: %s: current_depth: %s, limit: %s, "
|
||||
"backfill points (%d): %s",
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
len(sorted_backfill_points),
|
||||
sorted_backfill_points,
|
||||
)
|
||||
|
||||
if not filtered_extremities and not insertion_events_to_be_backfilled:
|
||||
return False
|
||||
|
||||
extremities = {
|
||||
**oldest_events_with_depth,
|
||||
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
|
||||
**insertion_events_to_be_backfilled,
|
||||
}
|
||||
|
||||
# Check if we reached a point where we should start backfilling.
|
||||
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
|
||||
max_depth = sorted_extremeties_tuple[0][1]
|
||||
|
||||
# If we're approaching an extremity we trigger a backfill, otherwise we
|
||||
# no-op.
|
||||
#
|
||||
@@ -249,6 +233,11 @@ class FederationHandler:
|
||||
# chose more than one times the limit in case of failure, but choosing a
|
||||
# much larger factor will result in triggering a backfill request much
|
||||
# earlier than necessary.
|
||||
#
|
||||
# XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
|
||||
# care about events that have happened after our current position.
|
||||
#
|
||||
max_depth = sorted_backfill_points[0].depth
|
||||
if current_depth - 2 * limit > max_depth:
|
||||
logger.debug(
|
||||
"Not backfilling as we don't need to. %d < %d - 2 * %d",
|
||||
@@ -265,31 +254,98 @@ class FederationHandler:
|
||||
# 2. we have likely previously tried and failed to backfill from that
|
||||
# extremity, so to avoid getting "stuck" requesting the same
|
||||
# backfill repeatedly we drop those extremities.
|
||||
filtered_sorted_extremeties_tuple = [
|
||||
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
|
||||
]
|
||||
|
||||
logger.debug(
|
||||
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
max_depth,
|
||||
len(sorted_extremeties_tuple),
|
||||
sorted_extremeties_tuple,
|
||||
filtered_sorted_extremeties_tuple,
|
||||
)
|
||||
|
||||
#
|
||||
# However, we need to check that the filtered extremities are non-empty.
|
||||
# If they are empty then either we can a) bail or b) still attempt to
|
||||
# backfill. We opt to try backfilling anyway just in case we do get
|
||||
# relevant events.
|
||||
if filtered_sorted_extremeties_tuple:
|
||||
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
|
||||
#
|
||||
filtered_sorted_backfill_points = [
|
||||
t for t in sorted_backfill_points if t.depth <= current_depth
|
||||
]
|
||||
if filtered_sorted_backfill_points:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: backfill points before current depth: %s",
|
||||
filtered_sorted_backfill_points,
|
||||
)
|
||||
sorted_backfill_points = filtered_sorted_backfill_points
|
||||
else:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
|
||||
)
|
||||
|
||||
# We don't want to specify too many extremities as it causes the backfill
|
||||
# request URI to be too long.
|
||||
extremities = dict(sorted_extremeties_tuple[:5])
|
||||
# For performance's sake, we only want to paginate from a particular extremity
|
||||
# if we can actually see the events we'll get. Otherwise, we'd just spend a lot
|
||||
# of resources to get redacted events. We check each extremity in turn and
|
||||
# ignore those which users on our server wouldn't be able to see.
|
||||
#
|
||||
# Additionally, we limit ourselves to backfilling from at most 5 extremities,
|
||||
# for two reasons:
|
||||
#
|
||||
# - The check which determines if we can see an extremity's events can be
|
||||
# expensive (we load the full state for the room at each of the backfill
|
||||
# points, or (worse) their successors)
|
||||
# - We want to avoid the server-server API request URI becoming too long.
|
||||
#
|
||||
# *Note*: the spec wants us to keep backfilling until we reach the start
|
||||
# of the room in case we are allowed to see some of the history. However,
|
||||
# in practice that causes more issues than its worth, as (a) it's
|
||||
# relatively rare for there to be any visible history and (b) even when
|
||||
# there is it's often sufficiently long ago that clients would stop
|
||||
# attempting to paginate before backfill reached the visible history.
|
||||
|
||||
extremities_to_request: List[str] = []
|
||||
for bp in sorted_backfill_points:
|
||||
if len(extremities_to_request) >= 5:
|
||||
break
|
||||
|
||||
# For regular backwards extremities, we don't have the extremity events
|
||||
# themselves, so we need to actually check the events that reference them -
|
||||
# their "successor" events.
|
||||
#
|
||||
# TODO: Correctly handle the case where we are allowed to see the
|
||||
# successor event but not the backward extremity, e.g. in the case of
|
||||
# initial join of the server where we are allowed to see the join
|
||||
# event but not anything before it. This would require looking at the
|
||||
# state *before* the event, ignoring the special casing certain event
|
||||
# types have.
|
||||
if bp.type == _BackfillPointType.INSERTION_PONT:
|
||||
event_ids_to_check = [bp.event_id]
|
||||
else:
|
||||
event_ids_to_check = await self.store.get_successor_events(bp.event_id)
|
||||
|
||||
events_to_check = await self.store.get_events_as_list(
|
||||
event_ids_to_check,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
get_prev_content=False,
|
||||
)
|
||||
|
||||
# We set `check_history_visibility_only` as we might otherwise get false
|
||||
# positives from users having been erased.
|
||||
filtered_extremities = await filter_events_for_server(
|
||||
self.storage,
|
||||
self.server_name,
|
||||
events_to_check,
|
||||
redact=False,
|
||||
check_history_visibility_only=True,
|
||||
)
|
||||
if filtered_extremities:
|
||||
extremities_to_request.append(bp.event_id)
|
||||
else:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: skipping extremity %s as it would not be visible",
|
||||
bp,
|
||||
)
|
||||
|
||||
if not extremities_to_request:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: found no extremities which would be visible"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
|
||||
)
|
||||
|
||||
# Now we need to decide which hosts to hit first.
|
||||
|
||||
@@ -309,7 +365,7 @@ class FederationHandler:
|
||||
for dom in domains:
|
||||
try:
|
||||
await self._federation_event_handler.backfill(
|
||||
dom, room_id, limit=100, extremities=extremities
|
||||
dom, room_id, limit=100, extremities=extremities_to_request
|
||||
)
|
||||
# If this succeeded then we probably already have the
|
||||
# appropriate stuff.
|
||||
|
||||
@@ -239,13 +239,14 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
return events, to_key
|
||||
|
||||
async def get_new_events_as(
|
||||
self, from_key: int, service: ApplicationService
|
||||
self, from_key: int, to_key: int, service: ApplicationService
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
"""Returns a set of new read receipt events that an appservice
|
||||
may be interested in.
|
||||
|
||||
Args:
|
||||
from_key: the stream position at which events should be fetched from
|
||||
to_key: the stream position up to which events should be fetched to
|
||||
service: The appservice which may be interested
|
||||
|
||||
Returns:
|
||||
@@ -255,7 +256,6 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
* The current read receipt stream token.
|
||||
"""
|
||||
from_key = int(from_key)
|
||||
to_key = self.get_current_key()
|
||||
|
||||
if from_key == to_key:
|
||||
return [], to_key
|
||||
|
||||
@@ -256,64 +256,6 @@ class RelationsHandler:
|
||||
|
||||
return filtered_results
|
||||
|
||||
async def _get_bundled_aggregation_for_event(
|
||||
self, event: EventBase, ignored_users: FrozenSet[str]
|
||||
) -> Optional[BundledAggregations]:
|
||||
"""Generate bundled aggregations for an event.
|
||||
|
||||
Note that this does not use a cache, but depends on cached methods.
|
||||
|
||||
Args:
|
||||
event: The event to calculate bundled aggregations for.
|
||||
ignored_users: The users ignored by the requesting user.
|
||||
|
||||
Returns:
|
||||
The bundled aggregations for an event, if bundled aggregations are
|
||||
enabled and the event can have bundled aggregations.
|
||||
"""
|
||||
|
||||
# Do not bundle aggregations for an event which represents an edit or an
|
||||
# annotation. It does not make sense for them to have related events.
|
||||
relates_to = event.content.get("m.relates_to")
|
||||
if isinstance(relates_to, (dict, frozendict)):
|
||||
relation_type = relates_to.get("rel_type")
|
||||
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
|
||||
return None
|
||||
|
||||
event_id = event.event_id
|
||||
room_id = event.room_id
|
||||
|
||||
# The bundled aggregations to include, a mapping of relation type to a
|
||||
# type-specific value. Some types include the direct return type here
|
||||
# while others need more processing during serialization.
|
||||
aggregations = BundledAggregations()
|
||||
|
||||
annotations = await self.get_annotations_for_event(
|
||||
event_id, room_id, ignored_users=ignored_users
|
||||
)
|
||||
if annotations:
|
||||
aggregations.annotations = {"chunk": annotations}
|
||||
|
||||
references, next_token = await self.get_relations_for_event(
|
||||
event_id,
|
||||
event,
|
||||
room_id,
|
||||
RelationTypes.REFERENCE,
|
||||
ignored_users=ignored_users,
|
||||
)
|
||||
if references:
|
||||
aggregations.references = {
|
||||
"chunk": [{"event_id": event.event_id} for event in references]
|
||||
}
|
||||
|
||||
if next_token:
|
||||
aggregations.references["next_batch"] = await next_token.to_string(
|
||||
self._main_store
|
||||
)
|
||||
|
||||
# Store the bundled aggregations in the event metadata for later use.
|
||||
return aggregations
|
||||
|
||||
async def get_threads_for_events(
|
||||
self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str]
|
||||
) -> Dict[str, _ThreadAggregation]:
|
||||
@@ -435,11 +377,39 @@ class RelationsHandler:
|
||||
|
||||
# Fetch other relations per event.
|
||||
for event in events_by_id.values():
|
||||
event_result = await self._get_bundled_aggregation_for_event(
|
||||
event, ignored_users
|
||||
# Do not bundle aggregations for an event which represents an edit or an
|
||||
# annotation. It does not make sense for them to have related events.
|
||||
relates_to = event.content.get("m.relates_to")
|
||||
if isinstance(relates_to, (dict, frozendict)):
|
||||
relation_type = relates_to.get("rel_type")
|
||||
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
|
||||
continue
|
||||
|
||||
annotations = await self.get_annotations_for_event(
|
||||
event.event_id, event.room_id, ignored_users=ignored_users
|
||||
)
|
||||
if event_result:
|
||||
results[event.event_id] = event_result
|
||||
if annotations:
|
||||
results.setdefault(
|
||||
event.event_id, BundledAggregations()
|
||||
).annotations = {"chunk": annotations}
|
||||
|
||||
references, next_token = await self.get_relations_for_event(
|
||||
event.event_id,
|
||||
event,
|
||||
event.room_id,
|
||||
RelationTypes.REFERENCE,
|
||||
ignored_users=ignored_users,
|
||||
)
|
||||
if references:
|
||||
aggregations = results.setdefault(event.event_id, BundledAggregations())
|
||||
aggregations.references = {
|
||||
"chunk": [{"event_id": ev.event_id} for ev in references]
|
||||
}
|
||||
|
||||
if next_token:
|
||||
aggregations.references["next_batch"] = await next_token.to_string(
|
||||
self._main_store
|
||||
)
|
||||
|
||||
# Fetch any edits (but not for redacted events).
|
||||
#
|
||||
|
||||
@@ -54,7 +54,7 @@ class RoomBatchHandler:
|
||||
# it has a larger `depth` but before the successor event because the `stream_ordering`
|
||||
# is negative before the successor event.
|
||||
successor_event_ids = await self.store.get_successor_events(
|
||||
[most_recent_prev_event_id]
|
||||
most_recent_prev_event_id
|
||||
)
|
||||
|
||||
# If we can't find any successor events, then it's a forward extremity of
|
||||
|
||||
@@ -15,12 +15,17 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
|
||||
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.stats import UserSortOrder
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.id_generators import (
|
||||
IdGenerator,
|
||||
MultiWriterIdGenerator,
|
||||
@@ -266,7 +271,9 @@ class DataStore(
|
||||
A tuple of a list of mappings from user to information and a count of total users.
|
||||
"""
|
||||
|
||||
def get_users_paginate_txn(txn):
|
||||
def get_users_paginate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
filters = []
|
||||
args = [self.hs.config.server.server_name]
|
||||
|
||||
@@ -301,7 +308,7 @@ class DataStore(
|
||||
"""
|
||||
sql = "SELECT COUNT(*) as total_users " + sql_base
|
||||
txn.execute(sql, args)
|
||||
count = txn.fetchone()[0]
|
||||
count = cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
sql = f"""
|
||||
SELECT name, user_type, is_guest, admin, deactivated, shadow_banned,
|
||||
@@ -338,7 +345,9 @@ class DataStore(
|
||||
)
|
||||
|
||||
|
||||
def check_database_before_upgrade(cur, database_engine, config: HomeServerConfig):
|
||||
def check_database_before_upgrade(
|
||||
cur: Cursor, database_engine: BaseDatabaseEngine, config: HomeServerConfig
|
||||
) -> None:
|
||||
"""Called before upgrading an existing database to check that it is broadly sane
|
||||
compared with the configuration.
|
||||
"""
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Pattern, Tuple
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Pattern, Tuple, cast
|
||||
|
||||
from synapse.appservice import (
|
||||
ApplicationService,
|
||||
@@ -83,7 +83,7 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore):
|
||||
txn.execute(
|
||||
"SELECT COALESCE(max(txn_id), 0) FROM application_services_txns"
|
||||
)
|
||||
return txn.fetchone()[0] # type: ignore
|
||||
return cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
self._as_txn_seq_gen = build_sequence_generator(
|
||||
db_conn,
|
||||
|
||||
@@ -14,7 +14,17 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple, cast
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
cast,
|
||||
)
|
||||
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
@@ -118,7 +128,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
prefilled_cache=device_outbox_prefill,
|
||||
)
|
||||
|
||||
def process_replication_rows(self, stream_name, instance_name, token, rows):
|
||||
def process_replication_rows(
|
||||
self,
|
||||
stream_name: str,
|
||||
instance_name: str,
|
||||
token: int,
|
||||
rows: Iterable[ToDeviceStream.ToDeviceStreamRow],
|
||||
) -> None:
|
||||
if stream_name == ToDeviceStream.NAME:
|
||||
# If replication is happening than postgres must be being used.
|
||||
assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
|
||||
@@ -134,7 +150,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
)
|
||||
return super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
def get_to_device_stream_token(self):
|
||||
def get_to_device_stream_token(self) -> int:
|
||||
return self._device_inbox_id_gen.get_current_token()
|
||||
|
||||
async def get_messages_for_user_devices(
|
||||
@@ -301,7 +317,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
if not user_ids_to_query:
|
||||
return {}, to_stream_id
|
||||
|
||||
def get_device_messages_txn(txn: LoggingTransaction):
|
||||
def get_device_messages_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
|
||||
# Build a query to select messages from any of the given devices that
|
||||
# are between the given stream id bounds.
|
||||
|
||||
@@ -428,7 +446,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
log_kv({"message": "No changes in cache since last check"})
|
||||
return 0
|
||||
|
||||
def delete_messages_for_device_txn(txn):
|
||||
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
|
||||
sql = (
|
||||
"DELETE FROM device_inbox"
|
||||
" WHERE user_id = ? AND device_id = ?"
|
||||
@@ -455,15 +473,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
|
||||
@trace
|
||||
async def get_new_device_msgs_for_remote(
|
||||
self, destination, last_stream_id, current_stream_id, limit
|
||||
) -> Tuple[List[dict], int]:
|
||||
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
"""
|
||||
Args:
|
||||
destination(str): The name of the remote server.
|
||||
last_stream_id(int|long): The last position of the device message stream
|
||||
destination: The name of the remote server.
|
||||
last_stream_id: The last position of the device message stream
|
||||
that the server sent up to.
|
||||
current_stream_id(int|long): The current position of the device
|
||||
message stream.
|
||||
current_stream_id: The current position of the device message stream.
|
||||
Returns:
|
||||
A list of messages for the device and where in the stream the messages got to.
|
||||
"""
|
||||
@@ -485,7 +502,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
return [], last_stream_id
|
||||
|
||||
@trace
|
||||
def get_new_messages_for_remote_destination_txn(txn):
|
||||
def get_new_messages_for_remote_destination_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
sql = (
|
||||
"SELECT stream_id, messages_json FROM device_federation_outbox"
|
||||
" WHERE destination = ?"
|
||||
@@ -527,7 +546,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
up_to_stream_id: Where to delete messages up to.
|
||||
"""
|
||||
|
||||
def delete_messages_for_remote_destination_txn(txn):
|
||||
def delete_messages_for_remote_destination_txn(txn: LoggingTransaction) -> None:
|
||||
sql = (
|
||||
"DELETE FROM device_federation_outbox"
|
||||
" WHERE destination = ?"
|
||||
@@ -566,7 +585,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
def get_all_new_device_messages_txn(txn):
|
||||
def get_all_new_device_messages_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
||||
# We limit like this as we might have multiple rows per stream_id, and
|
||||
# we want to make sure we always get all entries for any stream_id
|
||||
# we return.
|
||||
@@ -607,8 +628,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
@trace
|
||||
async def add_messages_to_device_inbox(
|
||||
self,
|
||||
local_messages_by_user_then_device: dict,
|
||||
remote_messages_by_destination: dict,
|
||||
local_messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
|
||||
remote_messages_by_destination: Dict[str, JsonDict],
|
||||
) -> int:
|
||||
"""Used to send messages from this server.
|
||||
|
||||
@@ -624,7 +645,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
|
||||
assert self._can_write_to_device
|
||||
|
||||
def add_messages_txn(txn, now_ms, stream_id):
|
||||
def add_messages_txn(
|
||||
txn: LoggingTransaction, now_ms: int, stream_id: int
|
||||
) -> None:
|
||||
# Add the local messages directly to the local inbox.
|
||||
self._add_messages_to_local_device_inbox_txn(
|
||||
txn, stream_id, local_messages_by_user_then_device
|
||||
@@ -677,11 +700,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
return self._device_inbox_id_gen.get_current_token()
|
||||
|
||||
async def add_messages_from_remote_to_device_inbox(
|
||||
self, origin: str, message_id: str, local_messages_by_user_then_device: dict
|
||||
self,
|
||||
origin: str,
|
||||
message_id: str,
|
||||
local_messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
|
||||
) -> int:
|
||||
assert self._can_write_to_device
|
||||
|
||||
def add_messages_txn(txn, now_ms, stream_id):
|
||||
def add_messages_txn(
|
||||
txn: LoggingTransaction, now_ms: int, stream_id: int
|
||||
) -> None:
|
||||
# Check if we've already inserted a matching message_id for that
|
||||
# origin. This can happen if the origin doesn't receive our
|
||||
# acknowledgement from the first time we received the message.
|
||||
@@ -727,8 +755,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
return stream_id
|
||||
|
||||
def _add_messages_to_local_device_inbox_txn(
|
||||
self, txn, stream_id, messages_by_user_then_device
|
||||
):
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
stream_id: int,
|
||||
messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
|
||||
) -> None:
|
||||
assert self._can_write_to_device
|
||||
|
||||
local_by_user_then_device = {}
|
||||
@@ -840,8 +871,10 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
self._remove_dead_devices_from_device_inbox,
|
||||
)
|
||||
|
||||
async def _background_drop_index_device_inbox(self, progress, batch_size):
|
||||
def reindex_txn(conn):
|
||||
async def _background_drop_index_device_inbox(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
def reindex_txn(conn: LoggingDatabaseConnection) -> None:
|
||||
txn = conn.cursor()
|
||||
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
|
||||
txn.close()
|
||||
|
||||
@@ -25,6 +25,7 @@ from typing import (
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
cast,
|
||||
)
|
||||
|
||||
from synapse.api.errors import Codes, StoreError
|
||||
@@ -136,7 +137,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
Number of devices of this users.
|
||||
"""
|
||||
|
||||
def count_devices_by_users_txn(txn, user_ids):
|
||||
def count_devices_by_users_txn(
|
||||
txn: LoggingTransaction, user_ids: List[str]
|
||||
) -> int:
|
||||
sql = """
|
||||
SELECT count(*)
|
||||
FROM devices
|
||||
@@ -149,7 +152,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
txn.execute(sql + clause, args)
|
||||
return txn.fetchone()[0]
|
||||
return cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
if not user_ids:
|
||||
return 0
|
||||
@@ -468,7 +471,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
"""
|
||||
txn.execute(sql, (destination, from_stream_id, now_stream_id, limit))
|
||||
|
||||
return list(txn)
|
||||
return cast(List[Tuple[str, str, int, Optional[str]]], txn.fetchall())
|
||||
|
||||
async def _get_device_update_edus_by_remote(
|
||||
self,
|
||||
@@ -549,7 +552,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
async def _get_last_device_update_for_remote_user(
|
||||
self, destination: str, user_id: str, from_stream_id: int
|
||||
) -> int:
|
||||
def f(txn):
|
||||
def f(txn: LoggingTransaction) -> int:
|
||||
prev_sent_id_sql = """
|
||||
SELECT coalesce(max(stream_id), 0) as stream_id
|
||||
FROM device_lists_outbound_last_success
|
||||
@@ -767,7 +770,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
if not user_ids_to_check:
|
||||
return set()
|
||||
|
||||
def _get_users_whose_devices_changed_txn(txn):
|
||||
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
|
||||
changes = set()
|
||||
|
||||
stream_id_where_clause = "stream_id > ?"
|
||||
@@ -966,7 +969,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
|
||||
"""Mark that we no longer track device lists for remote user."""
|
||||
|
||||
def _mark_remote_user_device_list_as_unsubscribed_txn(txn):
|
||||
def _mark_remote_user_device_list_as_unsubscribed_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="device_lists_remote_extremeties",
|
||||
@@ -1004,7 +1009,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
def _store_dehydrated_device_txn(
|
||||
self, txn, user_id: str, device_id: str, device_data: str
|
||||
self, txn: LoggingTransaction, user_id: str, device_id: str, device_data: str
|
||||
) -> Optional[str]:
|
||||
old_device_id = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
@@ -1081,7 +1086,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
"""
|
||||
yesterday = self._clock.time_msec() - prune_age
|
||||
|
||||
def _prune_txn(txn):
|
||||
def _prune_txn(txn: LoggingTransaction) -> None:
|
||||
# look for (user, destination) pairs which have an update older than
|
||||
# the cutoff.
|
||||
#
|
||||
@@ -1204,8 +1209,10 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
||||
"drop_device_lists_outbound_last_success_non_unique_idx",
|
||||
)
|
||||
|
||||
async def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
|
||||
def f(conn):
|
||||
async def _drop_device_list_streams_non_unique_indexes(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
def f(conn: LoggingDatabaseConnection) -> None:
|
||||
txn = conn.cursor()
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
|
||||
@@ -1217,7 +1224,9 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
||||
)
|
||||
return 1
|
||||
|
||||
async def _remove_duplicate_outbound_pokes(self, progress, batch_size):
|
||||
async def _remove_duplicate_outbound_pokes(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
# for some reason, we have accumulated duplicate entries in
|
||||
# device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
|
||||
# efficient.
|
||||
@@ -1230,7 +1239,7 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
||||
{"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
|
||||
)
|
||||
|
||||
def _txn(txn):
|
||||
def _txn(txn: LoggingTransaction) -> int:
|
||||
clause, args = make_tuple_comparison_clause(
|
||||
[(x, last_row[x]) for x in KEY_COLS]
|
||||
)
|
||||
@@ -1602,7 +1611,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
|
||||
context = get_active_span_text_map()
|
||||
|
||||
def add_device_changes_txn(txn, stream_ids):
|
||||
def add_device_changes_txn(
|
||||
txn: LoggingTransaction, stream_ids: List[int]
|
||||
) -> None:
|
||||
self._add_device_change_to_stream_txn(
|
||||
txn,
|
||||
user_id,
|
||||
@@ -1635,8 +1646,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
txn: LoggingTransaction,
|
||||
user_id: str,
|
||||
device_ids: Collection[str],
|
||||
stream_ids: List[str],
|
||||
):
|
||||
stream_ids: List[int],
|
||||
) -> None:
|
||||
txn.call_after(
|
||||
self._device_list_stream_cache.entity_has_changed,
|
||||
user_id,
|
||||
@@ -1720,7 +1731,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
user_id: str,
|
||||
device_ids: Iterable[str],
|
||||
room_ids: Collection[str],
|
||||
stream_ids: List[str],
|
||||
stream_ids: List[int],
|
||||
context: Dict[str, str],
|
||||
) -> None:
|
||||
"""Record the user in the room has updated their device."""
|
||||
@@ -1748,7 +1759,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
device_id,
|
||||
room_id,
|
||||
stream_id,
|
||||
False,
|
||||
# We only need to calculate outbound pokes for local users
|
||||
not self.hs.is_mine_id(user_id),
|
||||
encoded_context,
|
||||
)
|
||||
for room_id in room_ids
|
||||
@@ -1774,9 +1786,21 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
def get_uncoverted_outbound_room_pokes_txn(txn):
|
||||
def get_uncoverted_outbound_room_pokes_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
|
||||
txn.execute(sql, (limit,))
|
||||
return txn.fetchall()
|
||||
|
||||
return [
|
||||
(
|
||||
user_id,
|
||||
device_id,
|
||||
room_id,
|
||||
stream_id,
|
||||
db_to_json(opentracing_context),
|
||||
)
|
||||
for user_id, device_id, room_id, stream_id, opentracing_context in txn
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_uncoverted_outbound_room_pokes", get_uncoverted_outbound_room_pokes_txn
|
||||
@@ -1797,7 +1821,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
Marks the associated row in `device_lists_changes_in_room` as handled.
|
||||
"""
|
||||
|
||||
def add_device_list_outbound_pokes_txn(txn, stream_ids: List[int]):
|
||||
def add_device_list_outbound_pokes_txn(
|
||||
txn: LoggingTransaction, stream_ids: List[int]
|
||||
) -> None:
|
||||
if hosts:
|
||||
self._add_device_outbound_poke_to_stream_txn(
|
||||
txn,
|
||||
|
||||
@@ -695,7 +695,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
# Return all events where not all sets can reach them.
|
||||
return {eid for eid, n in event_to_missing_sets.items() if n}
|
||||
|
||||
async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
|
||||
async def get_oldest_event_ids_with_depth_in_room(
|
||||
self, room_id
|
||||
) -> List[Tuple[str, int]]:
|
||||
"""Gets the oldest events(backwards extremities) in the room along with the
|
||||
aproximate depth.
|
||||
|
||||
@@ -708,7 +710,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
room_id: Room where we want to find the oldest events
|
||||
|
||||
Returns:
|
||||
Map from event_id to depth
|
||||
List of (event_id, depth) tuples
|
||||
"""
|
||||
|
||||
def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
|
||||
@@ -741,7 +743,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
|
||||
txn.execute(sql, (room_id, False))
|
||||
|
||||
return dict(txn)
|
||||
return txn.fetchall()
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_oldest_event_ids_with_depth_in_room",
|
||||
@@ -751,7 +753,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
|
||||
async def get_insertion_event_backward_extremities_in_room(
|
||||
self, room_id
|
||||
) -> Dict[str, int]:
|
||||
) -> List[Tuple[str, int]]:
|
||||
"""Get the insertion events we know about that we haven't backfilled yet.
|
||||
|
||||
We use this function so that we can compare and see if someones current
|
||||
@@ -763,7 +765,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
room_id: Room where we want to find the oldest events
|
||||
|
||||
Returns:
|
||||
Map from event_id to depth
|
||||
List of (event_id, depth) tuples
|
||||
"""
|
||||
|
||||
def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
|
||||
@@ -778,8 +780,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
"""
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
|
||||
return dict(txn)
|
||||
return txn.fetchall()
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_insertion_event_backward_extremities_in_room",
|
||||
@@ -1295,22 +1296,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
event_results.reverse()
|
||||
return event_results
|
||||
|
||||
async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:
|
||||
"""Fetch all events that have the given events as a prev event
|
||||
async def get_successor_events(self, event_id: str) -> List[str]:
|
||||
"""Fetch all events that have the given event as a prev event
|
||||
|
||||
Args:
|
||||
event_ids: The events to use as the previous events.
|
||||
event_id: The event to search for as a prev_event.
|
||||
"""
|
||||
rows = await self.db_pool.simple_select_many_batch(
|
||||
return await self.db_pool.simple_select_onecol(
|
||||
table="event_edges",
|
||||
column="prev_event_id",
|
||||
iterable=event_ids,
|
||||
retcols=("event_id",),
|
||||
keyvalues={"prev_event_id": event_id},
|
||||
retcol="event_id",
|
||||
desc="get_successor_events",
|
||||
)
|
||||
|
||||
return [row["event_id"] for row in rows]
|
||||
|
||||
@wrap_as_background_process("delete_old_forward_extrem_cache")
|
||||
async def _delete_old_forward_extrem_cache(self) -> None:
|
||||
def _delete_old_forward_extrem_cache_txn(txn):
|
||||
|
||||
@@ -75,7 +75,7 @@ from synapse.storage.util.id_generators import (
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.iterutils import batch_iter
|
||||
@@ -640,42 +640,57 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
missing_events_ids.difference_update(already_fetching_ids)
|
||||
|
||||
if missing_events_ids:
|
||||
log_ctx = current_context()
|
||||
log_ctx.record_event_fetch(len(missing_events_ids))
|
||||
|
||||
# Add entries to `self._current_event_fetches` for each event we're
|
||||
# going to pull from the DB. We use a single deferred that resolves
|
||||
# to all the events we pulled from the DB (this will result in this
|
||||
# function returning more events than requested, but that can happen
|
||||
# already due to `_get_events_from_db`).
|
||||
fetching_deferred: ObservableDeferred[
|
||||
Dict[str, EventCacheEntry]
|
||||
] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
for event_id in missing_events_ids:
|
||||
self._current_event_fetches[event_id] = fetching_deferred
|
||||
async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
|
||||
"""Fetches the events in `missing_event_ids` from the database.
|
||||
|
||||
# Note that _get_events_from_db is also responsible for turning db rows
|
||||
# into FrozenEvents (via _get_event_from_row), which involves seeing if
|
||||
# the events have been redacted, and if so pulling the redaction event out
|
||||
# of the database to check it.
|
||||
#
|
||||
try:
|
||||
missing_events = await self._get_events_from_db(
|
||||
missing_events_ids,
|
||||
)
|
||||
Also creates entries in `self._current_event_fetches` to allow
|
||||
concurrent `_get_events_from_cache_or_db` calls to reuse the same fetch.
|
||||
"""
|
||||
log_ctx = current_context()
|
||||
log_ctx.record_event_fetch(len(missing_events_ids))
|
||||
|
||||
event_entry_map.update(missing_events)
|
||||
except Exception as e:
|
||||
with PreserveLoggingContext():
|
||||
fetching_deferred.errback(e)
|
||||
raise e
|
||||
finally:
|
||||
# Ensure that we mark these events as no longer being fetched.
|
||||
# Add entries to `self._current_event_fetches` for each event we're
|
||||
# going to pull from the DB. We use a single deferred that resolves
|
||||
# to all the events we pulled from the DB (this will result in this
|
||||
# function returning more events than requested, but that can happen
|
||||
# already due to `_get_events_from_db`).
|
||||
fetching_deferred: ObservableDeferred[
|
||||
Dict[str, EventCacheEntry]
|
||||
] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
for event_id in missing_events_ids:
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
self._current_event_fetches[event_id] = fetching_deferred
|
||||
|
||||
with PreserveLoggingContext():
|
||||
fetching_deferred.callback(missing_events)
|
||||
# Note that _get_events_from_db is also responsible for turning db rows
|
||||
# into FrozenEvents (via _get_event_from_row), which involves seeing if
|
||||
# the events have been redacted, and if so pulling the redaction event
|
||||
# out of the database to check it.
|
||||
#
|
||||
try:
|
||||
missing_events = await self._get_events_from_db(
|
||||
missing_events_ids,
|
||||
)
|
||||
except Exception as e:
|
||||
with PreserveLoggingContext():
|
||||
fetching_deferred.errback(e)
|
||||
raise e
|
||||
finally:
|
||||
# Ensure that we mark these events as no longer being fetched.
|
||||
for event_id in missing_events_ids:
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
fetching_deferred.callback(missing_events)
|
||||
|
||||
return missing_events
|
||||
|
||||
# We must allow the database fetch to complete in the presence of
|
||||
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
|
||||
# reuse the same fetch.
|
||||
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
|
||||
get_missing_events_from_db()
|
||||
)
|
||||
event_entry_map.update(missing_events)
|
||||
|
||||
if already_fetching_deferreds:
|
||||
# Wait for the other event requests to finish and add their results
|
||||
|
||||
@@ -522,7 +522,9 @@ class GroupServerWorkerStore(SQLBaseStore):
|
||||
desc="get_joined_groups",
|
||||
)
|
||||
|
||||
async def get_all_groups_for_user(self, user_id, now_token) -> List[JsonDict]:
|
||||
async def get_all_groups_for_user(
|
||||
self, user_id: str, now_token: int
|
||||
) -> List[JsonDict]:
|
||||
def _get_all_groups_for_user_txn(txn: LoggingTransaction) -> List[JsonDict]:
|
||||
sql = """
|
||||
SELECT group_id, type, membership, u.content
|
||||
|
||||
@@ -15,11 +15,12 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
@@ -35,7 +36,9 @@ class KeyStore(SQLBaseStore):
|
||||
"""Persistence for signature verification keys"""
|
||||
|
||||
@cached()
|
||||
def _get_server_verify_key(self, server_name_and_key_id):
|
||||
def _get_server_verify_key(
|
||||
self, server_name_and_key_id: Tuple[str, str]
|
||||
) -> FetchKeyResult:
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(
|
||||
@@ -179,19 +182,21 @@ class KeyStore(SQLBaseStore):
|
||||
|
||||
async def get_server_keys_json(
|
||||
self, server_keys: Iterable[Tuple[str, Optional[str], Optional[str]]]
|
||||
) -> Dict[Tuple[str, Optional[str], Optional[str]], List[dict]]:
|
||||
) -> Dict[Tuple[str, Optional[str], Optional[str]], List[Dict[str, Any]]]:
|
||||
"""Retrieve the key json for a list of server_keys and key ids.
|
||||
If no keys are found for a given server, key_id and source then
|
||||
that server, key_id, and source triplet entry will be an empty list.
|
||||
The JSON is returned as a byte array so that it can be efficiently
|
||||
used in an HTTP response.
|
||||
Args:
|
||||
server_keys (list): List of (server_name, key_id, source) triplets.
|
||||
server_keys: List of (server_name, key_id, source) triplets.
|
||||
Returns:
|
||||
A mapping from (server_name, key_id, source) triplets to a list of dicts
|
||||
"""
|
||||
|
||||
def _get_server_keys_json_txn(txn):
|
||||
def _get_server_keys_json_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Dict[Tuple[str, Optional[str], Optional[str]], List[Dict[str, Any]]]:
|
||||
results = {}
|
||||
for server_name, key_id, from_server in server_keys:
|
||||
keyvalues = {"server_name": server_name}
|
||||
|
||||
@@ -388,7 +388,14 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
return await self.db_pool.runInteraction("get_url_cache", get_url_cache_txn)
|
||||
|
||||
async def store_url_cache(
|
||||
self, url, response_code, etag, expires_ts, og, media_id, download_ts
|
||||
self,
|
||||
url: str,
|
||||
response_code: int,
|
||||
etag: Optional[str],
|
||||
expires_ts: int,
|
||||
og: Optional[str],
|
||||
media_id: str,
|
||||
download_ts: int,
|
||||
) -> None:
|
||||
await self.db_pool.simple_insert(
|
||||
"local_media_repository_url_cache",
|
||||
@@ -441,7 +448,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
)
|
||||
|
||||
async def get_cached_remote_media(
|
||||
self, origin, media_id: str
|
||||
self, origin: str, media_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
return await self.db_pool.simple_select_one(
|
||||
"remote_media_cache",
|
||||
@@ -608,7 +615,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
)
|
||||
|
||||
async def delete_remote_media(self, media_origin: str, media_id: str) -> None:
|
||||
def delete_remote_media_txn(txn):
|
||||
def delete_remote_media_txn(txn: LoggingTransaction) -> None:
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
"remote_media_cache",
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple, cast
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast
|
||||
|
||||
from synapse.api.presence import PresenceState, UserPresenceState
|
||||
from synapse.replication.tcp.streams import PresenceStream
|
||||
@@ -103,7 +103,9 @@ class PresenceStore(PresenceBackgroundUpdateStore):
|
||||
prefilled_cache=presence_cache_prefill,
|
||||
)
|
||||
|
||||
async def update_presence(self, presence_states) -> Tuple[int, int]:
|
||||
async def update_presence(
|
||||
self, presence_states: List[UserPresenceState]
|
||||
) -> Tuple[int, int]:
|
||||
assert self._can_persist_presence
|
||||
|
||||
stream_ordering_manager = self._presence_id_gen.get_next_mult(
|
||||
@@ -121,7 +123,10 @@ class PresenceStore(PresenceBackgroundUpdateStore):
|
||||
return stream_orderings[-1], self._presence_id_gen.get_current_token()
|
||||
|
||||
def _update_presence_txn(
|
||||
self, txn: LoggingTransaction, stream_orderings, presence_states
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
stream_orderings: List[int],
|
||||
presence_states: List[UserPresenceState],
|
||||
) -> None:
|
||||
for stream_id, state in zip(stream_orderings, presence_states):
|
||||
txn.call_after(
|
||||
@@ -405,7 +410,13 @@ class PresenceStore(PresenceBackgroundUpdateStore):
|
||||
self._presence_on_startup = []
|
||||
return active_on_startup
|
||||
|
||||
def process_replication_rows(self, stream_name, instance_name, token, rows) -> None:
|
||||
def process_replication_rows(
|
||||
self,
|
||||
stream_name: str,
|
||||
instance_name: str,
|
||||
token: int,
|
||||
rows: Iterable[Any],
|
||||
) -> None:
|
||||
if stream_name == PresenceStream.NAME:
|
||||
self._presence_id_gen.advance(instance_name, token)
|
||||
for row in rows:
|
||||
|
||||
@@ -14,11 +14,25 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, List, Optional, Tuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
cast,
|
||||
)
|
||||
|
||||
from synapse.push import PusherConfig, ThrottleParams
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.util.id_generators import StreamIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
@@ -117,7 +131,7 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
return self._decode_pushers_rows(ret)
|
||||
|
||||
async def get_all_pushers(self) -> Iterator[PusherConfig]:
|
||||
def get_pushers(txn):
|
||||
def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]:
|
||||
txn.execute("SELECT * FROM pushers")
|
||||
rows = self.db_pool.cursor_to_dict(txn)
|
||||
|
||||
@@ -152,7 +166,9 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
def get_all_updated_pushers_rows_txn(txn):
|
||||
def get_all_updated_pushers_rows_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
||||
sql = """
|
||||
SELECT id, user_name, app_id, pushkey
|
||||
FROM pushers
|
||||
@@ -160,10 +176,13 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
ORDER BY id ASC LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
updates = [
|
||||
(stream_id, (user_name, app_id, pushkey, False))
|
||||
for stream_id, user_name, app_id, pushkey in txn
|
||||
]
|
||||
updates = cast(
|
||||
List[Tuple[int, tuple]],
|
||||
[
|
||||
(stream_id, (user_name, app_id, pushkey, False))
|
||||
for stream_id, user_name, app_id, pushkey in txn
|
||||
],
|
||||
)
|
||||
|
||||
sql = """
|
||||
SELECT stream_id, user_id, app_id, pushkey
|
||||
@@ -192,12 +211,12 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
@cached(num_args=1, max_entries=15000)
|
||||
async def get_if_user_has_pusher(self, user_id: str):
|
||||
async def get_if_user_has_pusher(self, user_id: str) -> None:
|
||||
# This only exists for the cachedList decorator
|
||||
raise NotImplementedError()
|
||||
|
||||
async def update_pusher_last_stream_ordering(
|
||||
self, app_id, pushkey, user_id, last_stream_ordering
|
||||
self, app_id: str, pushkey: str, user_id: str, last_stream_ordering: int
|
||||
) -> None:
|
||||
await self.db_pool.simple_update_one(
|
||||
"pushers",
|
||||
@@ -291,7 +310,7 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
|
||||
last_user = progress.get("last_user", "")
|
||||
|
||||
def _delete_pushers(txn) -> int:
|
||||
def _delete_pushers(txn: LoggingTransaction) -> int:
|
||||
|
||||
sql = """
|
||||
SELECT name FROM users
|
||||
@@ -339,7 +358,7 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
|
||||
last_pusher = progress.get("last_pusher", 0)
|
||||
|
||||
def _delete_pushers(txn) -> int:
|
||||
def _delete_pushers(txn: LoggingTransaction) -> int:
|
||||
|
||||
sql = """
|
||||
SELECT p.id, access_token FROM pushers AS p
|
||||
@@ -396,7 +415,7 @@ class PusherWorkerStore(SQLBaseStore):
|
||||
|
||||
last_pusher = progress.get("last_pusher", 0)
|
||||
|
||||
def _delete_pushers(txn) -> int:
|
||||
def _delete_pushers(txn: LoggingTransaction) -> int:
|
||||
|
||||
sql = """
|
||||
SELECT p.id, p.user_name, p.app_id, p.pushkey
|
||||
@@ -502,7 +521,7 @@ class PusherStore(PusherWorkerStore):
|
||||
async def delete_pusher_by_app_id_pushkey_user_id(
|
||||
self, app_id: str, pushkey: str, user_id: str
|
||||
) -> None:
|
||||
def delete_pusher_txn(txn, stream_id):
|
||||
def delete_pusher_txn(txn: LoggingTransaction, stream_id: int) -> None:
|
||||
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
|
||||
txn, self.get_if_user_has_pusher, (user_id,)
|
||||
)
|
||||
@@ -547,7 +566,7 @@ class PusherStore(PusherWorkerStore):
|
||||
# account.
|
||||
pushers = list(await self.get_pushers_by_user_id(user_id))
|
||||
|
||||
def delete_pushers_txn(txn, stream_ids):
|
||||
def delete_pushers_txn(txn: LoggingTransaction, stream_ids: List[int]) -> None:
|
||||
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
|
||||
txn, self.get_if_user_has_pusher, (user_id,)
|
||||
)
|
||||
|
||||
@@ -370,10 +370,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
def _update_state_for_partial_state_event_txn(
|
||||
self,
|
||||
txn,
|
||||
txn: LoggingTransaction,
|
||||
event: EventBase,
|
||||
context: EventContext,
|
||||
):
|
||||
) -> None:
|
||||
# we shouldn't have any outliers here
|
||||
assert not event.internal_metadata.is_outlier()
|
||||
|
||||
|
||||
@@ -131,7 +131,7 @@ class UIAuthWorkerStore(SQLBaseStore):
|
||||
session_id: str,
|
||||
stage_type: str,
|
||||
result: Union[str, bool, JsonDict],
|
||||
):
|
||||
) -> None:
|
||||
"""
|
||||
Mark a session stage as completed.
|
||||
|
||||
@@ -200,7 +200,9 @@ class UIAuthWorkerStore(SQLBaseStore):
|
||||
desc="set_ui_auth_client_dict",
|
||||
)
|
||||
|
||||
async def set_ui_auth_session_data(self, session_id: str, key: str, value: Any):
|
||||
async def set_ui_auth_session_data(
|
||||
self, session_id: str, key: str, value: Any
|
||||
) -> None:
|
||||
"""
|
||||
Store a key-value pair into the sessions data associated with this
|
||||
request. This data is stored server-side and cannot be modified by
|
||||
@@ -223,7 +225,7 @@ class UIAuthWorkerStore(SQLBaseStore):
|
||||
|
||||
def _set_ui_auth_session_data_txn(
|
||||
self, txn: LoggingTransaction, session_id: str, key: str, value: Any
|
||||
):
|
||||
) -> None:
|
||||
# Get the current value.
|
||||
result = cast(
|
||||
Dict[str, Any],
|
||||
@@ -275,7 +277,7 @@ class UIAuthWorkerStore(SQLBaseStore):
|
||||
session_id: str,
|
||||
user_agent: str,
|
||||
ip: str,
|
||||
):
|
||||
) -> None:
|
||||
"""Add the given user agent / IP to the tracking table"""
|
||||
await self.db_pool.simple_upsert(
|
||||
table="ui_auth_sessions_ips",
|
||||
@@ -318,7 +320,7 @@ class UIAuthWorkerStore(SQLBaseStore):
|
||||
|
||||
def _delete_old_ui_auth_sessions_txn(
|
||||
self, txn: LoggingTransaction, expiration_time: int
|
||||
):
|
||||
) -> None:
|
||||
# Get the expired sessions.
|
||||
sql = "SELECT session_id FROM ui_auth_sessions WHERE creation_time <= ?"
|
||||
txn.execute(sql, [expiration_time])
|
||||
|
||||
@@ -54,10 +54,16 @@ class DependencyException(Exception):
|
||||
|
||||
|
||||
DEV_EXTRAS = {"lint", "mypy", "test", "dev"}
|
||||
RUNTIME_EXTRAS = (
|
||||
set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS
|
||||
)
|
||||
VERSION = metadata.version(DISTRIBUTION_NAME)
|
||||
try:
|
||||
RUNTIME_EXTRAS = (
|
||||
set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS
|
||||
)
|
||||
VERSION = metadata.version(DISTRIBUTION_NAME)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"Unable to read Synapse's package installation metadata. "
|
||||
"This may be a problem with how Synapse has been packaged."
|
||||
) from e
|
||||
|
||||
|
||||
def _is_dev_dependency(req: Requirement) -> bool:
|
||||
|
||||
@@ -419,6 +419,13 @@ async def _event_to_memberships(
|
||||
return {}
|
||||
|
||||
# for each event, get the event_ids of the membership state at those events.
|
||||
#
|
||||
# TODO: this means that we request the entire membership list. If there are only
|
||||
# one or two users on this server, and the room is huge, this is very wasteful
|
||||
# (it means more db work, and churns the *stateGroupMembersCache*).
|
||||
# It might be that we could extend StateFilter to specify "give me keys matching
|
||||
# *:<server_name>", to avoid this.
|
||||
|
||||
event_to_state_ids = await storage.state.get_state_ids_for_events(
|
||||
frozenset(e.event_id for e in events),
|
||||
state_filter=StateFilter.from_types(types=((EventTypes.Member, None),)),
|
||||
|
||||
@@ -411,6 +411,88 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
|
||||
"exclusive_as_user", "password", self.exclusive_as_user_device_id
|
||||
)
|
||||
|
||||
def test_sending_read_receipt_batches_to_application_services(self):
|
||||
"""Tests that a large batch of read receipts are sent correctly to
|
||||
interested application services.
|
||||
"""
|
||||
# Register an application service that's interested in a certain user
|
||||
# and room prefix
|
||||
interested_appservice = self._register_application_service(
|
||||
namespaces={
|
||||
ApplicationService.NS_USERS: [
|
||||
{
|
||||
"regex": "@exclusive_as_user:.+",
|
||||
"exclusive": True,
|
||||
}
|
||||
],
|
||||
ApplicationService.NS_ROOMS: [
|
||||
{
|
||||
"regex": "!fakeroom_.*",
|
||||
"exclusive": True,
|
||||
}
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
# "Complete" a transaction.
|
||||
# All this really does for us is make an entry in the application_services_state
|
||||
# database table, which tracks the current stream_token per stream ID per AS.
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.complete_appservice_txn(
|
||||
0,
|
||||
interested_appservice,
|
||||
)
|
||||
)
|
||||
|
||||
# Now, pretend that we receive a large burst of read receipts (300 total) that
|
||||
# all come in at once.
|
||||
for i in range(300):
|
||||
self.get_success(
|
||||
# Insert a fake read receipt into the database
|
||||
self.hs.get_datastores().main.insert_receipt(
|
||||
# We have to use unique room ID + user ID combinations here, as the db query
|
||||
# is an upsert.
|
||||
room_id=f"!fakeroom_{i}:test",
|
||||
receipt_type="m.read",
|
||||
user_id=self.local_user,
|
||||
event_ids=[f"$eventid_{i}"],
|
||||
data={},
|
||||
)
|
||||
)
|
||||
|
||||
# Now notify the appservice handler that 300 read receipts have all arrived
|
||||
# at once. What will it do!
|
||||
# note: stream tokens start at 2
|
||||
for stream_token in range(2, 303):
|
||||
self.get_success(
|
||||
self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
|
||||
services=[interested_appservice],
|
||||
stream_key="receipt_key",
|
||||
new_token=stream_token,
|
||||
users=[self.exclusive_as_user],
|
||||
)
|
||||
)
|
||||
|
||||
# Using our txn send mock, we can see what the AS received. After iterating over every
|
||||
# transaction, we'd like to see all 300 read receipts accounted for.
|
||||
# No more, no less.
|
||||
all_ephemeral_events = []
|
||||
for call in self.send_mock.call_args_list:
|
||||
ephemeral_events = call[0][2]
|
||||
all_ephemeral_events += ephemeral_events
|
||||
|
||||
# Ensure that no duplicate events were sent
|
||||
self.assertEqual(len(all_ephemeral_events), 300)
|
||||
|
||||
# Check that the ephemeral event is a read receipt with the expected structure
|
||||
latest_read_receipt = all_ephemeral_events[-1]
|
||||
self.assertEqual(latest_read_receipt["type"], "m.receipt")
|
||||
|
||||
event_id = list(latest_read_receipt["content"].keys())[0]
|
||||
self.assertEqual(
|
||||
latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
|
||||
)
|
||||
|
||||
@unittest.override_config(
|
||||
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
|
||||
)
|
||||
|
||||
@@ -560,43 +560,6 @@ class RelationsTestCase(BaseRelationsTestCase):
|
||||
{"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
|
||||
)
|
||||
|
||||
def test_edit_thread(self) -> None:
|
||||
"""Test that editing a thread works."""
|
||||
|
||||
# Create a thread and edit the last event.
|
||||
channel = self._send_relation(
|
||||
RelationTypes.THREAD,
|
||||
"m.room.message",
|
||||
content={"msgtype": "m.text", "body": "A threaded reply!"},
|
||||
)
|
||||
threaded_event_id = channel.json_body["event_id"]
|
||||
|
||||
new_body = {"msgtype": "m.text", "body": "I've been edited!"}
|
||||
self._send_relation(
|
||||
RelationTypes.REPLACE,
|
||||
"m.room.message",
|
||||
content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
|
||||
parent_id=threaded_event_id,
|
||||
)
|
||||
|
||||
# Fetch the thread root, to get the bundled aggregation for the thread.
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/rooms/{self.room}/event/{self.parent_id}",
|
||||
access_token=self.user_token,
|
||||
)
|
||||
self.assertEqual(200, channel.code, channel.json_body)
|
||||
|
||||
# We expect that the edit message appears in the thread summary in the
|
||||
# unsigned relations section.
|
||||
relations_dict = channel.json_body["unsigned"].get("m.relations")
|
||||
self.assertIn(RelationTypes.THREAD, relations_dict)
|
||||
|
||||
thread_summary = relations_dict[RelationTypes.THREAD]
|
||||
self.assertIn("latest_event", thread_summary)
|
||||
latest_event_in_thread = thread_summary["latest_event"]
|
||||
self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!")
|
||||
|
||||
def test_edit_edit(self) -> None:
|
||||
"""Test that an edit cannot be edited."""
|
||||
new_body = {"msgtype": "m.text", "body": "Initial edit"}
|
||||
@@ -1047,7 +1010,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
|
||||
channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
|
||||
thread_2 = channel.json_body["event_id"]
|
||||
|
||||
def assert_annotations(bundled_aggregations: JsonDict) -> None:
|
||||
def assert_thread(bundled_aggregations: JsonDict) -> None:
|
||||
self.assertEqual(2, bundled_aggregations.get("count"))
|
||||
self.assertTrue(bundled_aggregations.get("current_user_participated"))
|
||||
# The latest thread event has some fields that don't matter.
|
||||
@@ -1066,7 +1029,38 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
|
||||
bundled_aggregations.get("latest_event"),
|
||||
)
|
||||
|
||||
self._test_bundled_aggregations(RelationTypes.THREAD, assert_annotations, 9)
|
||||
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9)
|
||||
|
||||
def test_thread_edit_latest_event(self) -> None:
|
||||
"""Test that editing the latest event in a thread works."""
|
||||
|
||||
# Create a thread and edit the last event.
|
||||
channel = self._send_relation(
|
||||
RelationTypes.THREAD,
|
||||
"m.room.message",
|
||||
content={"msgtype": "m.text", "body": "A threaded reply!"},
|
||||
)
|
||||
threaded_event_id = channel.json_body["event_id"]
|
||||
|
||||
new_body = {"msgtype": "m.text", "body": "I've been edited!"}
|
||||
channel = self._send_relation(
|
||||
RelationTypes.REPLACE,
|
||||
"m.room.message",
|
||||
content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
|
||||
parent_id=threaded_event_id,
|
||||
)
|
||||
|
||||
# Fetch the thread root, to get the bundled aggregation for the thread.
|
||||
relations_dict = self._get_bundled_aggregations()
|
||||
|
||||
# We expect that the edit message appears in the thread summary in the
|
||||
# unsigned relations section.
|
||||
self.assertIn(RelationTypes.THREAD, relations_dict)
|
||||
|
||||
thread_summary = relations_dict[RelationTypes.THREAD]
|
||||
self.assertIn("latest_event", thread_summary)
|
||||
latest_event_in_thread = thread_summary["latest_event"]
|
||||
self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!")
|
||||
|
||||
def test_aggregation_get_event_for_annotation(self) -> None:
|
||||
"""Test that annotations do not get bundled aggregations included
|
||||
@@ -1093,7 +1087,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
|
||||
channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
|
||||
thread_id = channel.json_body["event_id"]
|
||||
|
||||
# Annotate the annotation.
|
||||
# Annotate the thread.
|
||||
self._send_relation(
|
||||
RelationTypes.ANNOTATION, "m.reaction", "a", parent_id=thread_id
|
||||
)
|
||||
|
||||
@@ -13,10 +13,11 @@
|
||||
# limitations under the License.
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
from typing import Generator
|
||||
from typing import Generator, Tuple
|
||||
from unittest import mock
|
||||
|
||||
from twisted.enterprise.adbapi import ConnectionPool
|
||||
from twisted.internet.defer import ensureDeferred
|
||||
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersions
|
||||
@@ -281,3 +282,119 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
# This next event fetch should succeed
|
||||
self.get_success(self.store.get_event(self.event_ids[0]))
|
||||
|
||||
|
||||
class GetEventCancellationTestCase(unittest.HomeserverTestCase):
|
||||
"""Test cancellation of `get_event` calls."""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
|
||||
self.store: EventsWorkerStore = hs.get_datastores().main
|
||||
|
||||
self.user = self.register_user("user", "pass")
|
||||
self.token = self.login(self.user, "pass")
|
||||
|
||||
self.room = self.helper.create_room_as(self.user, tok=self.token)
|
||||
|
||||
res = self.helper.send(self.room, tok=self.token)
|
||||
self.event_id = res["event_id"]
|
||||
|
||||
# Reset the event cache so the tests start with it empty
|
||||
self.store._get_event_cache.clear()
|
||||
|
||||
@contextmanager
|
||||
def blocking_get_event_calls(
|
||||
self,
|
||||
) -> Generator[
|
||||
Tuple["Deferred[None]", "Deferred[None]", "Deferred[None]"], None, None
|
||||
]:
|
||||
"""Starts two concurrent `get_event` calls for the same event.
|
||||
|
||||
Both `get_event` calls will use the same database fetch, which will be blocked
|
||||
at the time this function returns.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
* A `Deferred` that unblocks the database fetch.
|
||||
* A cancellable `Deferred` for the first `get_event` call.
|
||||
* A cancellable `Deferred` for the second `get_event` call.
|
||||
"""
|
||||
# Patch `DatabasePool.runWithConnection` to block.
|
||||
unblock: "Deferred[None]" = Deferred()
|
||||
original_runWithConnection = self.store.db_pool.runWithConnection
|
||||
|
||||
async def runWithConnection(*args, **kwargs):
|
||||
await unblock
|
||||
return await original_runWithConnection(*args, **kwargs)
|
||||
|
||||
with mock.patch.object(
|
||||
self.store.db_pool,
|
||||
"runWithConnection",
|
||||
new=runWithConnection,
|
||||
):
|
||||
ctx1 = LoggingContext("get_event1")
|
||||
ctx2 = LoggingContext("get_event2")
|
||||
|
||||
async def get_event(ctx: LoggingContext) -> None:
|
||||
with ctx:
|
||||
await self.store.get_event(self.event_id)
|
||||
|
||||
get_event1 = ensureDeferred(get_event(ctx1))
|
||||
get_event2 = ensureDeferred(get_event(ctx2))
|
||||
|
||||
# Both `get_event` calls ought to be blocked.
|
||||
self.assertNoResult(get_event1)
|
||||
self.assertNoResult(get_event2)
|
||||
|
||||
yield unblock, get_event1, get_event2
|
||||
|
||||
# Confirm that the two `get_event` calls shared the same database fetch.
|
||||
self.assertEqual(ctx1.get_resource_usage().evt_db_fetch_count, 1)
|
||||
self.assertEqual(ctx2.get_resource_usage().evt_db_fetch_count, 0)
|
||||
|
||||
def test_first_get_event_cancelled(self):
|
||||
"""Test cancellation of the first `get_event` call sharing a database fetch.
|
||||
|
||||
The first `get_event` call is the one which initiates the fetch. We expect the
|
||||
fetch to complete despite the cancellation. Furthermore, the first `get_event`
|
||||
call must not abort before the fetch is complete, otherwise the fetch will be
|
||||
using a finished logging context.
|
||||
"""
|
||||
with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
|
||||
# Cancel the first `get_event` call.
|
||||
get_event1.cancel()
|
||||
# The first `get_event` call must not abort immediately, otherwise its
|
||||
# logging context will be finished while it is still in use by the database
|
||||
# fetch.
|
||||
self.assertNoResult(get_event1)
|
||||
# The second `get_event` call must not be cancelled.
|
||||
self.assertNoResult(get_event2)
|
||||
|
||||
# Unblock the database fetch.
|
||||
unblock.callback(None)
|
||||
# A `CancelledError` should be raised out of the first `get_event` call.
|
||||
exc = self.get_failure(get_event1, CancelledError).value
|
||||
self.assertIsInstance(exc, CancelledError)
|
||||
# The second `get_event` call should complete successfully.
|
||||
self.get_success(get_event2)
|
||||
|
||||
def test_second_get_event_cancelled(self):
|
||||
"""Test cancellation of the second `get_event` call sharing a database fetch."""
|
||||
with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
|
||||
# Cancel the second `get_event` call.
|
||||
get_event2.cancel()
|
||||
# The first `get_event` call must not be cancelled.
|
||||
self.assertNoResult(get_event1)
|
||||
# The second `get_event` call gets cancelled immediately.
|
||||
exc = self.get_failure(get_event2, CancelledError).value
|
||||
self.assertIsInstance(exc, CancelledError)
|
||||
|
||||
# Unblock the database fetch.
|
||||
unblock.callback(None)
|
||||
# The first `get_event` call should complete successfully.
|
||||
self.get_success(get_event1)
|
||||
|
||||
@@ -29,7 +29,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
|
||||
for device_id in device_ids:
|
||||
stream_id = self.get_success(
|
||||
self.store.add_device_change_to_streams(
|
||||
"user_id", [device_id], ["!some:room"]
|
||||
user_id, [device_id], ["!some:room"]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user