Merge commit '78e48f61b' into anoa/dinsic_release_1_31_0
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# this script is run by buildkite in a plain `xenial` container; it installs the
|
||||
# minimal requirements for tox and hands over to the py35-old tox environment.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
|
||||
# with additional dependencies needed for the test (such as coverage or the PostgreSQL
|
||||
|
||||
79
CHANGES.md
79
CHANGES.md
@@ -1,8 +1,83 @@
|
||||
Synapse 1.31.0rc1 (2021-03-30)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Add support to OpenID Connect login for requiring attributes on the `userinfo` response. Contributed by Hubbe King. ([\#9609](https://github.com/matrix-org/synapse/issues/9609))
|
||||
- Add initial experimental support for a "space summary" API. ([\#9643](https://github.com/matrix-org/synapse/issues/9643), [\#9652](https://github.com/matrix-org/synapse/issues/9652), [\#9653](https://github.com/matrix-org/synapse/issues/9653))
|
||||
- Add support for the busy presence state as described in [MSC3026](https://github.com/matrix-org/matrix-doc/pull/3026). ([\#9644](https://github.com/matrix-org/synapse/issues/9644))
|
||||
- Add support for credentials for proxy authentication in the `HTTPS_PROXY` environment variable. ([\#9657](https://github.com/matrix-org/synapse/issues/9657))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a longstanding bug that could cause issues when editing a reply to a message. ([\#9585](https://github.com/matrix-org/synapse/issues/9585))
|
||||
- Fix the `/capabilities` endpoint to return `m.change_password` as disabled if the local password database is not used for authentication. Contributed by @dklimpel. ([\#9588](https://github.com/matrix-org/synapse/issues/9588))
|
||||
- Checks if passwords are allowed before setting it for the user. ([\#9636](https://github.com/matrix-org/synapse/issues/9636))
|
||||
- Fix a bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind. ([\#9639](https://github.com/matrix-org/synapse/issues/9639))
|
||||
- Fix a bug introduced in Synapse 1.30.1 which meant the suggested `pip` incantation to install an updated `cryptography` was incorrect. ([\#9699](https://github.com/matrix-org/synapse/issues/9699))
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
---------------------------
|
||||
|
||||
- Speed up Docker builds and make it nicer to test against Complement while developing (install all dependencies before copying the project). ([\#9610](https://github.com/matrix-org/synapse/issues/9610))
|
||||
- Include [opencontainers labels](https://github.com/opencontainers/image-spec/blob/master/annotations.md#pre-defined-annotation-keys) in the Docker image. ([\#9612](https://github.com/matrix-org/synapse/issues/9612))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Clarify that `register_new_matrix_user` is present also when installed via non-pip package. ([\#9074](https://github.com/matrix-org/synapse/issues/9074))
|
||||
- Update source install documentation to mention platform prerequisites before the source install steps. ([\#9667](https://github.com/matrix-org/synapse/issues/9667))
|
||||
- Improve worker documentation for fallback/web auth endpoints. ([\#9679](https://github.com/matrix-org/synapse/issues/9679))
|
||||
- Update the sample configuration for OIDC authentication. ([\#9695](https://github.com/matrix-org/synapse/issues/9695))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column. ([\#9411](https://github.com/matrix-org/synapse/issues/9411))
|
||||
- Add type hints to the caching module. ([\#9442](https://github.com/matrix-org/synapse/issues/9442))
|
||||
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9499](https://github.com/matrix-org/synapse/issues/9499), [\#9659](https://github.com/matrix-org/synapse/issues/9659))
|
||||
- Add additional type hints to the Homeserver object. ([\#9631](https://github.com/matrix-org/synapse/issues/9631), [\#9638](https://github.com/matrix-org/synapse/issues/9638), [\#9675](https://github.com/matrix-org/synapse/issues/9675), [\#9681](https://github.com/matrix-org/synapse/issues/9681))
|
||||
- Only save remote cross-signing and device keys if they're different from the current ones. ([\#9634](https://github.com/matrix-org/synapse/issues/9634))
|
||||
- Rename storage function to fix spelling and not conflict with another functions name. ([\#9637](https://github.com/matrix-org/synapse/issues/9637))
|
||||
- Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server. ([\#9640](https://github.com/matrix-org/synapse/issues/9640), [\#9664](https://github.com/matrix-org/synapse/issues/9664))
|
||||
- In the `federation_client` commandline client, stop automatically adding the URL prefix, so that servlets on other prefixes can be tested. ([\#9645](https://github.com/matrix-org/synapse/issues/9645))
|
||||
- In the `federation_client` commandline client, handle inline `signing_key`s in `homeserver.yaml`. ([\#9647](https://github.com/matrix-org/synapse/issues/9647))
|
||||
- Fixed some antipattern issues to improve code quality. ([\#9649](https://github.com/matrix-org/synapse/issues/9649))
|
||||
- Add a storage method for pulling all current user presence state from the database. ([\#9650](https://github.com/matrix-org/synapse/issues/9650))
|
||||
- Import `HomeServer` from the proper module. ([\#9665](https://github.com/matrix-org/synapse/issues/9665))
|
||||
- Increase default join ratelimiting burst rate. ([\#9674](https://github.com/matrix-org/synapse/issues/9674))
|
||||
- Add type hints to third party event rules and visibility modules. ([\#9676](https://github.com/matrix-org/synapse/issues/9676))
|
||||
- Bump mypy-zope to 0.2.13 to fix "Cannot determine consistent method resolution order (MRO)" errors when running mypy a second time. ([\#9678](https://github.com/matrix-org/synapse/issues/9678))
|
||||
- Use interpreter from `$PATH` via `/usr/bin/env` instead of absolute paths in various scripts. ([\#9689](https://github.com/matrix-org/synapse/issues/9689))
|
||||
- Make it possible to use `dmypy`. ([\#9692](https://github.com/matrix-org/synapse/issues/9692))
|
||||
- Suppress "CryptographyDeprecationWarning: int_from_bytes is deprecated". ([\#9698](https://github.com/matrix-org/synapse/issues/9698))
|
||||
- Use `dmypy run` in lint script for improved performance in type-checking while developing. ([\#9701](https://github.com/matrix-org/synapse/issues/9701))
|
||||
- Fix undetected mypy error when using Python 3.6. ([\#9703](https://github.com/matrix-org/synapse/issues/9703))
|
||||
- Fix type-checking CI on develop. ([\#9709](https://github.com/matrix-org/synapse/issues/9709))
|
||||
|
||||
|
||||
Synapse 1.30.1 (2021-03-26)
|
||||
===========================
|
||||
|
||||
This is a security release to ensure that Synapse is running with a
|
||||
`cryptography` package built against a patched version of OpenSSL.
|
||||
This release is identical to Synapse 1.30.0, with the exception of explicitly
|
||||
setting a minimum version of Python's Cryptography library to ensure that users
|
||||
of Synapse are protected from the recent [OpenSSL security advisories](https://mta.openssl.org/pipermail/openssl-announce/2021-March/000198.html),
|
||||
especially CVE-2021-3449.
|
||||
|
||||
Note that Cryptography defaults to bundling its own statically linked copy of
|
||||
OpenSSL, which means that you may not be protected by your operating system's
|
||||
security updates.
|
||||
|
||||
It's also worth noting that Cryptography no longer supports Python 3.5, so
|
||||
admins deploying to older environments may not be protected against this or
|
||||
future vulnerabilities. Synapse will be dropping support for Python 3.5 at the
|
||||
end of March.
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
|
||||
22
INSTALL.md
22
INSTALL.md
@@ -527,14 +527,24 @@ email will be disabled.
|
||||
|
||||
The easiest way to create a new user is to do so from a client like [Element](https://element.io/).
|
||||
|
||||
Alternatively you can do so from the command line if you have installed via pip.
|
||||
Alternatively, you can do so from the command line. This can be done as follows:
|
||||
|
||||
This can be done as follows:
|
||||
1. If synapse was installed via pip, activate the virtualenv as follows (if Synapse was
|
||||
installed via a prebuilt package, `register_new_matrix_user` should already be
|
||||
on the search path):
|
||||
```sh
|
||||
cd ~/synapse
|
||||
source env/bin/activate
|
||||
synctl start # if not already running
|
||||
```
|
||||
2. Run the following command:
|
||||
```sh
|
||||
register_new_matrix_user -c homeserver.yaml http://localhost:8008
|
||||
```
|
||||
|
||||
```sh
|
||||
$ source ~/synapse/env/bin/activate
|
||||
$ synctl start # if not already running
|
||||
$ register_new_matrix_user -c homeserver.yaml http://localhost:8008
|
||||
This will prompt you to add details for the new user, and will then connect to
|
||||
the running Synapse to create the new user. For example:
|
||||
```
|
||||
New user localpart: erikj
|
||||
Password:
|
||||
Confirm password:
|
||||
|
||||
@@ -98,9 +98,12 @@ will log a warning on each received request.
|
||||
|
||||
To avoid the warning, administrators using a reverse proxy should ensure that
|
||||
the reverse proxy sets `X-Forwarded-Proto` header to `https` or `http` to
|
||||
indicate the protocol used by the client. See the `reverse proxy documentation
|
||||
<docs/reverse_proxy.md>`_, where the example configurations have been updated to
|
||||
show how to set this header.
|
||||
indicate the protocol used by the client.
|
||||
|
||||
Synapse also requires the `Host` header to be preserved.
|
||||
|
||||
See the `reverse proxy documentation <docs/reverse_proxy.md>`_, where the
|
||||
example configurations have been updated to show how to set these headers.
|
||||
|
||||
(Users of `Caddy <https://caddyserver.com/>`_ are unaffected, since we believe it
|
||||
sets `X-Forwarded-Proto` by default.)
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column.
|
||||
@@ -1 +0,0 @@
|
||||
Introduce flake8-bugbear to the test suite and fix some of its lint violations.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a longstanding bug that could cause issues when editing a reply to a message.
|
||||
@@ -1 +0,0 @@
|
||||
Fix the `/capabilities` endpoint to return `m.change_password` as disabled if the local password database is not used for authentication. Contributed by @dklimpel.
|
||||
@@ -1 +0,0 @@
|
||||
Logins using OpenID Connect can require attributes on the `userinfo` response in order to login. Contributed by Hubbe King.
|
||||
@@ -1 +0,0 @@
|
||||
Include [opencontainers labels](https://github.com/opencontainers/image-spec/blob/master/annotations.md#pre-defined-annotation-keys) in the Docker image.
|
||||
@@ -1 +0,0 @@
|
||||
Add additional type hints to the Homeserver object.
|
||||
@@ -1 +0,0 @@
|
||||
Only save remote cross-signing and device keys if they're different from the current ones.
|
||||
@@ -1 +0,0 @@
|
||||
Checks if passwords are allowed before setting it for the user.
|
||||
@@ -1 +0,0 @@
|
||||
Rename storage function to fix spelling and not conflict with another functions name.
|
||||
@@ -1 +0,0 @@
|
||||
Add additional type hints to the Homeserver object.
|
||||
@@ -1 +0,0 @@
|
||||
Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.
|
||||
@@ -1 +0,0 @@
|
||||
Add initial experimental support for a "space summary" API.
|
||||
@@ -1 +0,0 @@
|
||||
Implement the busy presence state as described in [MSC3026](https://github.com/matrix-org/matrix-doc/pull/3026).
|
||||
@@ -1 +0,0 @@
|
||||
In the `federation_client` commandline client, stop automatically adding the URL prefix, so that servlets on other prefixes can be tested.
|
||||
@@ -1 +0,0 @@
|
||||
In the `federation_client` commandline client, handle inline `signing_key`s in `homeserver.yaml`.
|
||||
@@ -1 +0,0 @@
|
||||
Fixed some antipattern issues to improve code quality.
|
||||
@@ -1 +0,0 @@
|
||||
Add a storage method for pulling all current user presence state from the database.
|
||||
@@ -1 +0,0 @@
|
||||
Add initial experimental support for a "space summary" API.
|
||||
@@ -1 +0,0 @@
|
||||
Add initial experimental support for a "space summary" API.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for credentials for proxy authentication in the `HTTPS_PROXY` environment variable.
|
||||
@@ -1 +0,0 @@
|
||||
Introduce flake8-bugbear to the test suite and fix some of its lint violations.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance of federation catch up by sending events the latest events in the room to the remote, rather than just the last event sent by the local server.
|
||||
@@ -1 +0,0 @@
|
||||
Import `HomeServer` from the proper module.
|
||||
@@ -1 +0,0 @@
|
||||
Update source install documentation to mention platform prerequisites before the source install steps.
|
||||
@@ -1 +0,0 @@
|
||||
Increase default join ratelimiting burst rate.
|
||||
@@ -1 +0,0 @@
|
||||
Add additional type hints to the Homeserver object.
|
||||
@@ -1 +0,0 @@
|
||||
Add type hints to third party event rules and visibility modules.
|
||||
@@ -1 +0,0 @@
|
||||
Bump mypy-zope to 0.2.13 to fix "Cannot determine consistent method resolution order (MRO)" errors when running mypy a second time.
|
||||
@@ -1 +0,0 @@
|
||||
Improve worker documentation for fallback/web auth endpoints.
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# this script will use the api:
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/admin_api/purge_history_api.rst
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DOMAIN=yourserver.tld
|
||||
# add this user as admin in your home server:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DIR="$( cd "$( dirname "$0" )" && pwd )"
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DIR="$( cd "$( dirname "$0" )" && pwd )"
|
||||
|
||||
|
||||
@@ -25,42 +25,40 @@ LABEL org.opencontainers.image.licenses='Apache-2.0'
|
||||
|
||||
# install the OS build deps
|
||||
RUN apt-get update && apt-get install -y \
|
||||
build-essential \
|
||||
libffi-dev \
|
||||
libjpeg-dev \
|
||||
libpq-dev \
|
||||
libssl-dev \
|
||||
libwebp-dev \
|
||||
libxml++2.6-dev \
|
||||
libxslt1-dev \
|
||||
openssl \
|
||||
rustc \
|
||||
zlib1g-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
build-essential \
|
||||
libffi-dev \
|
||||
libjpeg-dev \
|
||||
libpq-dev \
|
||||
libssl-dev \
|
||||
libwebp-dev \
|
||||
libxml++2.6-dev \
|
||||
libxslt1-dev \
|
||||
openssl \
|
||||
rustc \
|
||||
zlib1g-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Build dependencies that are not available as wheels, to speed up rebuilds
|
||||
RUN pip install --prefix="/install" --no-warn-script-location \
|
||||
cryptography \
|
||||
frozendict \
|
||||
jaeger-client \
|
||||
opentracing \
|
||||
# Match the version constraints of Synapse
|
||||
"prometheus_client>=0.4.0" \
|
||||
psycopg2 \
|
||||
pycparser \
|
||||
pyrsistent \
|
||||
pyyaml \
|
||||
simplejson \
|
||||
threadloop \
|
||||
thrift
|
||||
|
||||
# now install synapse and all of the python deps to /install.
|
||||
COPY synapse /synapse/synapse/
|
||||
# Copy just what we need to pip install
|
||||
COPY scripts /synapse/scripts/
|
||||
COPY MANIFEST.in README.rst setup.py synctl /synapse/
|
||||
COPY synapse/__init__.py /synapse/synapse/__init__.py
|
||||
COPY synapse/python_dependencies.py /synapse/synapse/python_dependencies.py
|
||||
|
||||
# To speed up rebuilds, install all of the dependencies before we copy over
|
||||
# the whole synapse project so that we this layer in the Docker cache can be
|
||||
# used while you develop on the source
|
||||
#
|
||||
# This is aiming at installing the `install_requires` and `extras_require` from `setup.py`
|
||||
RUN pip install --prefix="/install" --no-warn-script-location \
|
||||
/synapse[all]
|
||||
/synapse[all]
|
||||
|
||||
# Copy over the rest of the project
|
||||
COPY synapse /synapse/synapse/
|
||||
|
||||
# Install the synapse package itself and all of its children packages.
|
||||
#
|
||||
# This is aiming at installing only the `packages=find_packages(...)` from `setup.py
|
||||
RUN pip install --prefix="/install" --no-deps --no-warn-script-location /synapse
|
||||
|
||||
###
|
||||
### Stage 1: runtime
|
||||
@@ -69,16 +67,16 @@ RUN pip install --prefix="/install" --no-warn-script-location \
|
||||
FROM docker.io/python:${PYTHON_VERSION}-slim
|
||||
|
||||
RUN apt-get update && apt-get install -y \
|
||||
curl \
|
||||
gosu \
|
||||
libjpeg62-turbo \
|
||||
libpq5 \
|
||||
libwebp6 \
|
||||
xmlsec1 \
|
||||
libjemalloc2 \
|
||||
libssl-dev \
|
||||
openssl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
curl \
|
||||
gosu \
|
||||
libjpeg62-turbo \
|
||||
libpq5 \
|
||||
libwebp6 \
|
||||
xmlsec1 \
|
||||
libjemalloc2 \
|
||||
libssl-dev \
|
||||
openssl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /install /usr/local
|
||||
COPY ./docker/start.py /start.py
|
||||
@@ -91,4 +89,4 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
||||
ENTRYPOINT ["/start.py"]
|
||||
|
||||
HEALTHCHECK --interval=1m --timeout=5s \
|
||||
CMD curl -fSs http://localhost:8008/health || exit 1
|
||||
CMD curl -fSs http://localhost:8008/health || exit 1
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# The script to build the Debian package, as ran inside the Docker image.
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# This script runs the PostgreSQL tests inside a Docker container. It expects
|
||||
# the relevant source files to be mounted into /src (done automatically by the
|
||||
|
||||
@@ -104,10 +104,11 @@ example.com:8448 {
|
||||
```
|
||||
<VirtualHost *:443>
|
||||
SSLEngine on
|
||||
ServerName matrix.example.com;
|
||||
ServerName matrix.example.com
|
||||
|
||||
RequestHeader set "X-Forwarded-Proto" expr=%{REQUEST_SCHEME}
|
||||
AllowEncodedSlashes NoDecode
|
||||
ProxyPreserveHost on
|
||||
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
|
||||
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
|
||||
ProxyPass /_synapse/client http://127.0.0.1:8008/_synapse/client nocanon
|
||||
@@ -116,7 +117,7 @@ example.com:8448 {
|
||||
|
||||
<VirtualHost *:8448>
|
||||
SSLEngine on
|
||||
ServerName example.com;
|
||||
ServerName example.com
|
||||
|
||||
RequestHeader set "X-Forwarded-Proto" expr=%{REQUEST_SCHEME}
|
||||
AllowEncodedSlashes NoDecode
|
||||
@@ -135,6 +136,8 @@ example.com:8448 {
|
||||
</IfModule>
|
||||
```
|
||||
|
||||
**NOTE 3**: Missing `ProxyPreserveHost on` can lead to a redirect loop.
|
||||
|
||||
### HAProxy
|
||||
|
||||
```
|
||||
|
||||
@@ -1938,6 +1938,9 @@ saml2_config:
|
||||
# Note that, if this is changed, users authenticating via that provider
|
||||
# will no longer be recognised as the same user!
|
||||
#
|
||||
# (Use "oidc" here if you are migrating from an old "oidc_config"
|
||||
# configuration.)
|
||||
#
|
||||
# idp_name: A user-facing name for this identity provider, which is used to
|
||||
# offer the user a choice of login mechanisms.
|
||||
#
|
||||
@@ -2107,37 +2110,6 @@ oidc_providers:
|
||||
# - attribute: userGroup
|
||||
# value: "synapseUsers"
|
||||
|
||||
# For use with Keycloak
|
||||
#
|
||||
#- idp_id: keycloak
|
||||
# idp_name: Keycloak
|
||||
# issuer: "https://127.0.0.1:8443/auth/realms/my_realm_name"
|
||||
# client_id: "synapse"
|
||||
# client_secret: "copy secret generated in Keycloak UI"
|
||||
# scopes: ["openid", "profile"]
|
||||
# attribute_requirements:
|
||||
# - attribute: groups
|
||||
# value: "admin"
|
||||
|
||||
# For use with Github
|
||||
#
|
||||
#- idp_id: github
|
||||
# idp_name: Github
|
||||
# idp_brand: github
|
||||
# discover: false
|
||||
# issuer: "https://github.com/"
|
||||
# client_id: "your-client-id" # TO BE FILLED
|
||||
# client_secret: "your-client-secret" # TO BE FILLED
|
||||
# authorization_endpoint: "https://github.com/login/oauth/authorize"
|
||||
# token_endpoint: "https://github.com/login/oauth/access_token"
|
||||
# userinfo_endpoint: "https://api.github.com/user"
|
||||
# scopes: ["read:user"]
|
||||
# user_mapping_provider:
|
||||
# config:
|
||||
# subject_claim: "id"
|
||||
# localpart_template: "{{ user.login }}"
|
||||
# display_name_template: "{{ user.name }}"
|
||||
|
||||
|
||||
# Enable Central Authentication Service (CAS) for registration and login.
|
||||
#
|
||||
|
||||
3
mypy.ini
3
mypy.ini
@@ -1,12 +1,13 @@
|
||||
[mypy]
|
||||
namespace_packages = True
|
||||
plugins = mypy_zope:plugin, scripts-dev/mypy_synapse_plugin.py
|
||||
follow_imports = silent
|
||||
follow_imports = normal
|
||||
check_untyped_defs = True
|
||||
show_error_codes = True
|
||||
show_traceback = True
|
||||
mypy_path = stubs
|
||||
warn_unreachable = True
|
||||
local_partial_types = True
|
||||
|
||||
# To find all folders that pass mypy you run:
|
||||
#
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# A script which checks that an appropriate news file has been added on this
|
||||
# branch.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
# Find linting errors in Synapse's default config file.
|
||||
# Exits with 0 if there are no problems, or another code otherwise.
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Update/check the docs/sample_config.yaml
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Runs linting scripts over the local Synapse checkout
|
||||
# isort - sorts import statements
|
||||
@@ -95,4 +95,4 @@ isort "${files[@]}"
|
||||
python3 -m black "${files[@]}"
|
||||
./scripts-dev/config-lint.sh
|
||||
flake8 "${files[@]}"
|
||||
mypy
|
||||
dmypy run
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# This script generates SQL files for creating a brand new Synapse DB with the latest
|
||||
# schema, on both SQLite3 and Postgres.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
@@ -6,4 +6,4 @@ set -e
|
||||
# next PR number.
|
||||
CURRENT_NUMBER=`curl -s "https://api.github.com/repos/matrix-org/synapse/issues?state=all&per_page=1" | jq -r ".[0].number"`
|
||||
CURRENT_NUMBER=$((CURRENT_NUMBER+1))
|
||||
echo $CURRENT_NUMBER
|
||||
echo $CURRENT_NUMBER
|
||||
|
||||
@@ -48,7 +48,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.30.1"
|
||||
__version__ = "1.31.0rc1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -563,6 +563,9 @@ class Auth:
|
||||
Returns:
|
||||
bool: False if no access_token was given, True otherwise.
|
||||
"""
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
query_params = request.args.get(b"access_token")
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
|
||||
return bool(query_params) or bool(auth_headers)
|
||||
@@ -579,6 +582,8 @@ class Auth:
|
||||
MissingClientTokenError: If there isn't a single access_token in the
|
||||
request
|
||||
"""
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
|
||||
query_params = request.args.get(b"access_token")
|
||||
|
||||
@@ -21,8 +21,10 @@ import signal
|
||||
import socket
|
||||
import sys
|
||||
import traceback
|
||||
import warnings
|
||||
from typing import Awaitable, Callable, Iterable
|
||||
|
||||
from cryptography.utils import CryptographyDeprecationWarning
|
||||
from typing_extensions import NoReturn
|
||||
|
||||
from twisted.internet import defer, error, reactor
|
||||
@@ -195,6 +197,25 @@ def listen_metrics(bind_addresses, port):
|
||||
start_http_server(port, addr=host, registry=RegistryProxy)
|
||||
|
||||
|
||||
def listen_manhole(bind_addresses: Iterable[str], port: int, manhole_globals: dict):
|
||||
# twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing
|
||||
# warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so
|
||||
# suppress the warning for now.
|
||||
warnings.filterwarnings(
|
||||
action="ignore",
|
||||
category=CryptographyDeprecationWarning,
|
||||
message="int_from_bytes is deprecated",
|
||||
)
|
||||
|
||||
from synapse.util.manhole import manhole
|
||||
|
||||
listen_tcp(
|
||||
bind_addresses,
|
||||
port,
|
||||
manhole(username="matrix", password="rabbithole", globals=manhole_globals),
|
||||
)
|
||||
|
||||
|
||||
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
|
||||
"""
|
||||
Create a TCP socket for a port and several addresses
|
||||
|
||||
@@ -147,7 +147,6 @@ from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
||||
from synapse.types import ReadReceipt
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger("synapse.app.generic_worker")
|
||||
@@ -640,12 +639,8 @@ class GenericWorkerServer(HomeServer):
|
||||
if listener.type == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener.type == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
_base.listen_manhole(
|
||||
listener.bind_addresses, listener.port, manhole_globals={"hs": self}
|
||||
)
|
||||
elif listener.type == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
@@ -792,13 +787,6 @@ class FederationSenderHandler:
|
||||
|
||||
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
|
||||
|
||||
def on_start(self):
|
||||
# There may be some events that are persisted but haven't been sent,
|
||||
# so send them now.
|
||||
self.federation_sender.notify_new_events(
|
||||
self.store.get_room_max_stream_ordering()
|
||||
)
|
||||
|
||||
def wake_destination(self, server: str):
|
||||
self.federation_sender.wake_destination(server)
|
||||
|
||||
|
||||
@@ -67,7 +67,6 @@ from synapse.storage import DataStore
|
||||
from synapse.storage.engines import IncorrectDatabaseSetup
|
||||
from synapse.storage.prepare_database import UpgradeDatabaseException
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.module_loader import load_module
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
@@ -288,12 +287,8 @@ class SynapseHomeServer(HomeServer):
|
||||
if listener.type == "http":
|
||||
self._listening_services.extend(self._listener_http(config, listener))
|
||||
elif listener.type == "manhole":
|
||||
listen_tcp(
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
_base.listen_manhole(
|
||||
listener.bind_addresses, listener.port, manhole_globals={"hs": self}
|
||||
)
|
||||
elif listener.type == "replication":
|
||||
services = listen_tcp(
|
||||
|
||||
@@ -24,7 +24,7 @@ from ._base import Config, ConfigError
|
||||
_CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
|
||||
|
||||
# Map from canonicalised cache name to cache.
|
||||
_CACHES = {}
|
||||
_CACHES = {} # type: Dict[str, Callable[[float], None]]
|
||||
|
||||
# a lock on the contents of _CACHES
|
||||
_CACHES_LOCK = threading.Lock()
|
||||
@@ -59,7 +59,9 @@ def _canonicalise_cache_name(cache_name: str) -> str:
|
||||
return cache_name.lower()
|
||||
|
||||
|
||||
def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
|
||||
def add_resizable_cache(
|
||||
cache_name: str, cache_resize_callback: Callable[[float], None]
|
||||
):
|
||||
"""Register a cache that's size can dynamically change
|
||||
|
||||
Args:
|
||||
|
||||
@@ -79,6 +79,9 @@ class OIDCConfig(Config):
|
||||
# Note that, if this is changed, users authenticating via that provider
|
||||
# will no longer be recognised as the same user!
|
||||
#
|
||||
# (Use "oidc" here if you are migrating from an old "oidc_config"
|
||||
# configuration.)
|
||||
#
|
||||
# idp_name: A user-facing name for this identity provider, which is used to
|
||||
# offer the user a choice of login mechanisms.
|
||||
#
|
||||
@@ -247,37 +250,6 @@ class OIDCConfig(Config):
|
||||
# attribute_requirements:
|
||||
# - attribute: userGroup
|
||||
# value: "synapseUsers"
|
||||
|
||||
# For use with Keycloak
|
||||
#
|
||||
#- idp_id: keycloak
|
||||
# idp_name: Keycloak
|
||||
# issuer: "https://127.0.0.1:8443/auth/realms/my_realm_name"
|
||||
# client_id: "synapse"
|
||||
# client_secret: "copy secret generated in Keycloak UI"
|
||||
# scopes: ["openid", "profile"]
|
||||
# attribute_requirements:
|
||||
# - attribute: groups
|
||||
# value: "admin"
|
||||
|
||||
# For use with Github
|
||||
#
|
||||
#- idp_id: github
|
||||
# idp_name: Github
|
||||
# idp_brand: github
|
||||
# discover: false
|
||||
# issuer: "https://github.com/"
|
||||
# client_id: "your-client-id" # TO BE FILLED
|
||||
# client_secret: "your-client-secret" # TO BE FILLED
|
||||
# authorization_endpoint: "https://github.com/login/oauth/authorize"
|
||||
# token_endpoint: "https://github.com/login/oauth/access_token"
|
||||
# userinfo_endpoint: "https://api.github.com/user"
|
||||
# scopes: ["read:user"]
|
||||
# user_mapping_provider:
|
||||
# config:
|
||||
# subject_claim: "id"
|
||||
# localpart_template: "{{{{ user.login }}}}"
|
||||
# display_name_template: "{{{{ user.name }}}}"
|
||||
""".format(
|
||||
mapping_provider=DEFAULT_USER_MAPPING_PROVIDER
|
||||
)
|
||||
|
||||
@@ -31,25 +31,39 @@ Events are replicated via a separate events stream.
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import Dict, List, Tuple, Type
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
Hashable,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Sized,
|
||||
Tuple,
|
||||
Type,
|
||||
)
|
||||
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
from .units import Edu
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FederationRemoteSendQueue:
|
||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"""A drop in replacement for FederationSender"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
@@ -58,7 +72,7 @@ class FederationRemoteSendQueue:
|
||||
# We may have multiple federation sender instances, so we need to track
|
||||
# their positions separately.
|
||||
self._sender_instances = hs.config.worker.federation_shard_config.instances
|
||||
self._sender_positions = {}
|
||||
self._sender_positions = {} # type: Dict[str, int]
|
||||
|
||||
# Pending presence map user_id -> UserPresenceState
|
||||
self.presence_map = {} # type: Dict[str, UserPresenceState]
|
||||
@@ -71,7 +85,7 @@ class FederationRemoteSendQueue:
|
||||
# Stream position -> (user_id, destinations)
|
||||
self.presence_destinations = (
|
||||
SortedDict()
|
||||
) # type: SortedDict[int, Tuple[str, List[str]]]
|
||||
) # type: SortedDict[int, Tuple[str, Iterable[str]]]
|
||||
|
||||
# (destination, key) -> EDU
|
||||
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
|
||||
@@ -94,7 +108,7 @@ class FederationRemoteSendQueue:
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
# lambda binds to the queue rather than to the name of the queue which
|
||||
# changes. ARGH.
|
||||
def register(name, queue):
|
||||
def register(name: str, queue: Sized) -> None:
|
||||
LaterGauge(
|
||||
"synapse_federation_send_queue_%s_size" % (queue_name,),
|
||||
"",
|
||||
@@ -115,13 +129,13 @@ class FederationRemoteSendQueue:
|
||||
|
||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||
|
||||
def _next_pos(self):
|
||||
def _next_pos(self) -> int:
|
||||
pos = self.pos
|
||||
self.pos += 1
|
||||
self.pos_time[self.clock.time_msec()] = pos
|
||||
return pos
|
||||
|
||||
def _clear_queue(self):
|
||||
def _clear_queue(self) -> None:
|
||||
"""Clear the queues for anything older than N minutes"""
|
||||
|
||||
FIVE_MINUTES_AGO = 5 * 60 * 1000
|
||||
@@ -138,7 +152,7 @@ class FederationRemoteSendQueue:
|
||||
|
||||
self._clear_queue_before_pos(position_to_delete)
|
||||
|
||||
def _clear_queue_before_pos(self, position_to_delete):
|
||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
||||
"""Clear all the queues from before a given position"""
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
# Delete things out of presence maps
|
||||
@@ -188,13 +202,18 @@ class FederationRemoteSendQueue:
|
||||
for key in keys[:i]:
|
||||
del self.edus[key]
|
||||
|
||||
def notify_new_events(self, max_token):
|
||||
def notify_new_events(self, max_token: RoomStreamToken) -> None:
|
||||
"""As per FederationSender"""
|
||||
# We don't need to replicate this as it gets sent down a different
|
||||
# stream.
|
||||
pass
|
||||
# This should never get called.
|
||||
raise NotImplementedError()
|
||||
|
||||
def build_and_send_edu(self, destination, edu_type, content, key=None):
|
||||
def build_and_send_edu(
|
||||
self,
|
||||
destination: str,
|
||||
edu_type: str,
|
||||
content: JsonDict,
|
||||
key: Optional[Hashable] = None,
|
||||
) -> None:
|
||||
"""As per FederationSender"""
|
||||
if destination == self.server_name:
|
||||
logger.info("Not sending EDU to ourselves")
|
||||
@@ -218,38 +237,39 @@ class FederationRemoteSendQueue:
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_read_receipt(self, receipt):
|
||||
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
|
||||
"""As per FederationSender
|
||||
|
||||
Args:
|
||||
receipt (synapse.types.ReadReceipt):
|
||||
receipt:
|
||||
"""
|
||||
# nothing to do here: the replication listener will handle it.
|
||||
return defer.succeed(None)
|
||||
|
||||
def send_presence(self, states):
|
||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
||||
"""As per FederationSender
|
||||
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
states
|
||||
"""
|
||||
pos = self._next_pos()
|
||||
|
||||
# We only want to send presence for our own users, so lets always just
|
||||
# filter here just in case.
|
||||
local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
|
||||
local_states = [s for s in states if self.is_mine_id(s.user_id)]
|
||||
|
||||
self.presence_map.update({state.user_id: state for state in local_states})
|
||||
self.presence_changed[pos] = [state.user_id for state in local_states]
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_presence_to_destinations(self, states, destinations):
|
||||
def send_presence_to_destinations(
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
) -> None:
|
||||
"""As per FederationSender
|
||||
|
||||
Args:
|
||||
states (list[UserPresenceState])
|
||||
destinations (list[str])
|
||||
states
|
||||
destinations
|
||||
"""
|
||||
for state in states:
|
||||
pos = self._next_pos()
|
||||
@@ -258,15 +278,18 @@ class FederationRemoteSendQueue:
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_device_messages(self, destination):
|
||||
def send_device_messages(self, destination: str) -> None:
|
||||
"""As per FederationSender"""
|
||||
# We don't need to replicate this as it gets sent down a different
|
||||
# stream.
|
||||
|
||||
def get_current_token(self):
|
||||
def wake_destination(self, server: str) -> None:
|
||||
pass
|
||||
|
||||
def get_current_token(self) -> int:
|
||||
return self.pos - 1
|
||||
|
||||
def federation_ack(self, instance_name, token):
|
||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
||||
if self._sender_instances:
|
||||
# If we have configured multiple federation sender instances we need
|
||||
# to track their positions separately, and only clear the queue up
|
||||
@@ -504,13 +527,16 @@ ParsedFederationStreamData = namedtuple(
|
||||
)
|
||||
|
||||
|
||||
def process_rows_for_federation(transaction_queue, rows):
|
||||
def process_rows_for_federation(
|
||||
transaction_queue: FederationSender,
|
||||
rows: List[FederationStream.FederationStreamRow],
|
||||
) -> None:
|
||||
"""Parse a list of rows from the federation stream and put them in the
|
||||
transaction queue ready for sending to the relevant homeservers.
|
||||
|
||||
Args:
|
||||
transaction_queue (FederationSender)
|
||||
rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
|
||||
transaction_queue
|
||||
rows
|
||||
"""
|
||||
|
||||
# The federation stream contains a bunch of different types of
|
||||
|
||||
@@ -13,14 +13,14 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import logging
|
||||
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse
|
||||
import synapse.metrics
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.events import EventBase
|
||||
@@ -40,9 +40,12 @@ from synapse.metrics import (
|
||||
events_processed_counter,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import ReadReceipt, RoomStreamToken
|
||||
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_pdus_destination_dist_count = Counter(
|
||||
@@ -65,8 +68,91 @@ CATCH_UP_STARTUP_DELAY_SEC = 15
|
||||
CATCH_UP_STARTUP_INTERVAL_SEC = 5
|
||||
|
||||
|
||||
class FederationSender:
|
||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||
class AbstractFederationSender(metaclass=abc.ABCMeta):
|
||||
@abc.abstractmethod
|
||||
def notify_new_events(self, max_token: RoomStreamToken) -> None:
|
||||
"""This gets called when we have some new events we might want to
|
||||
send out to other servers.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
|
||||
"""Send a RR to any other servers in the room
|
||||
|
||||
Args:
|
||||
receipt: receipt to be sent
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
||||
"""Send the new presence states to the appropriate destinations.
|
||||
|
||||
This actually queues up the presence states ready for sending and
|
||||
triggers a background task to process them and send out the transactions.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_presence_to_destinations(
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
) -> None:
|
||||
"""Send the given presence states to the given destinations.
|
||||
|
||||
Args:
|
||||
destinations:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def build_and_send_edu(
|
||||
self,
|
||||
destination: str,
|
||||
edu_type: str,
|
||||
content: JsonDict,
|
||||
key: Optional[Hashable] = None,
|
||||
) -> None:
|
||||
"""Construct an Edu object, and queue it for sending
|
||||
|
||||
Args:
|
||||
destination: name of server to send to
|
||||
edu_type: type of EDU to send
|
||||
content: content of EDU
|
||||
key: clobbering key for this edu
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_device_messages(self, destination: str) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def wake_destination(self, destination: str) -> None:
|
||||
"""Called when we want to retry sending transactions to a remote.
|
||||
|
||||
This is mainly useful if the remote server has been down and we think it
|
||||
might have come back.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_current_token(self) -> int:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
async def get_replication_rows(
|
||||
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class FederationSender(AbstractFederationSender):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
|
||||
@@ -432,7 +518,7 @@ class FederationSender:
|
||||
queue.flush_read_receipts_for_room(room_id)
|
||||
|
||||
@preserve_fn # the caller should not yield on this
|
||||
async def send_presence(self, states: List[UserPresenceState]):
|
||||
async def send_presence(self, states: List[UserPresenceState]) -> None:
|
||||
"""Send the new presence states to the appropriate destinations.
|
||||
|
||||
This actually queues up the presence states ready for sending and
|
||||
@@ -494,7 +580,7 @@ class FederationSender:
|
||||
self._get_per_destination_queue(destination).send_presence(states)
|
||||
|
||||
@measure_func("txnqueue._process_presence")
|
||||
async def _process_presence_inner(self, states: List[UserPresenceState]):
|
||||
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
|
||||
"""Given a list of states populate self.pending_presence_by_dest and
|
||||
poke to send a new transaction to each destination
|
||||
"""
|
||||
@@ -516,9 +602,9 @@ class FederationSender:
|
||||
self,
|
||||
destination: str,
|
||||
edu_type: str,
|
||||
content: dict,
|
||||
content: JsonDict,
|
||||
key: Optional[Hashable] = None,
|
||||
):
|
||||
) -> None:
|
||||
"""Construct an Edu object, and queue it for sending
|
||||
|
||||
Args:
|
||||
@@ -545,7 +631,7 @@ class FederationSender:
|
||||
|
||||
self.send_edu(edu, key)
|
||||
|
||||
def send_edu(self, edu: Edu, key: Optional[Hashable]):
|
||||
def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
|
||||
"""Queue an EDU for sending
|
||||
|
||||
Args:
|
||||
@@ -563,7 +649,7 @@ class FederationSender:
|
||||
else:
|
||||
queue.send_edu(edu)
|
||||
|
||||
def send_device_messages(self, destination: str):
|
||||
def send_device_messages(self, destination: str) -> None:
|
||||
if destination == self.server_name:
|
||||
logger.warning("Not sending device update to ourselves")
|
||||
return
|
||||
@@ -575,7 +661,7 @@ class FederationSender:
|
||||
|
||||
self._get_per_destination_queue(destination).attempt_new_transaction()
|
||||
|
||||
def wake_destination(self, destination: str):
|
||||
def wake_destination(self, destination: str) -> None:
|
||||
"""Called when we want to retry sending transactions to a remote.
|
||||
|
||||
This is mainly useful if the remote server has been down and we think it
|
||||
@@ -599,6 +685,10 @@ class FederationSender:
|
||||
# to a worker.
|
||||
return 0
|
||||
|
||||
def federation_ack(self, instance_name: str, token: int) -> None:
|
||||
# It is not expected that this gets called on FederationSender.
|
||||
raise NotImplementedError()
|
||||
|
||||
@staticmethod
|
||||
async def get_replication_rows(
|
||||
instance_name: str, from_token: int, to_token: int, target_row_count: int
|
||||
@@ -607,7 +697,7 @@ class FederationSender:
|
||||
# to a worker.
|
||||
return [], 0, False
|
||||
|
||||
async def _wake_destinations_needing_catchup(self):
|
||||
async def _wake_destinations_needing_catchup(self) -> None:
|
||||
"""
|
||||
Wakes up destinations that need catch-up and are not currently being
|
||||
backed off from.
|
||||
|
||||
@@ -149,6 +149,9 @@ class OidcHandler:
|
||||
Args:
|
||||
request: the incoming request from the browser.
|
||||
"""
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
# The provider might redirect with an error.
|
||||
# In that case, just display it as-is.
|
||||
if b"error" in request.args:
|
||||
|
||||
@@ -71,8 +71,10 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_well_known_cache = TTLCache("well-known")
|
||||
_had_valid_well_known_cache = TTLCache("had-valid-well-known")
|
||||
_well_known_cache = TTLCache("well-known") # type: TTLCache[bytes, Optional[bytes]]
|
||||
_had_valid_well_known_cache = TTLCache(
|
||||
"had-valid-well-known"
|
||||
) # type: TTLCache[bytes, bool]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@@ -88,8 +90,8 @@ class WellKnownResolver:
|
||||
reactor: IReactorTime,
|
||||
agent: IAgent,
|
||||
user_agent: bytes,
|
||||
well_known_cache: Optional[TTLCache] = None,
|
||||
had_well_known_cache: Optional[TTLCache] = None,
|
||||
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
|
||||
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
||||
@@ -169,7 +169,7 @@ import inspect
|
||||
import logging
|
||||
import re
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Dict, Optional, Type
|
||||
from typing import TYPE_CHECKING, Dict, Optional, Pattern, Type
|
||||
|
||||
import attr
|
||||
|
||||
@@ -262,7 +262,7 @@ logger = logging.getLogger(__name__)
|
||||
# Block everything by default
|
||||
# A regex which matches the server_names to expose traces for.
|
||||
# None means 'block everything'.
|
||||
_homeserver_whitelist = None
|
||||
_homeserver_whitelist = None # type: Optional[Pattern[str]]
|
||||
|
||||
# Util methods
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import List, Set
|
||||
|
||||
@@ -101,7 +102,7 @@ CONDITIONAL_REQUIREMENTS = {
|
||||
"txacme>=0.9.2",
|
||||
# txacme depends on eliot. Eliot 1.8.0 is incompatible with
|
||||
# python 3.5.2, as per https://github.com/itamarst/eliot/issues/418
|
||||
'eliot<1.8.0;python_version<"3.5.3"',
|
||||
"eliot<1.8.0;python_version<'3.5.3'",
|
||||
],
|
||||
"saml2": [
|
||||
# pysaml2 6.4.0 is incompatible with Python 3.5 (see https://github.com/IdentityPython/pysaml2/issues/749)
|
||||
@@ -133,6 +134,18 @@ for name, optional_deps in CONDITIONAL_REQUIREMENTS.items():
|
||||
ALL_OPTIONAL_REQUIREMENTS = set(optional_deps) | ALL_OPTIONAL_REQUIREMENTS
|
||||
|
||||
|
||||
# ensure there are no double-quote characters in any of the deps (otherwise the
|
||||
# 'pip install' incantation in DependencyException will break)
|
||||
for dep in itertools.chain(
|
||||
REQUIREMENTS,
|
||||
*CONDITIONAL_REQUIREMENTS.values(),
|
||||
):
|
||||
if '"' in dep:
|
||||
raise Exception(
|
||||
"Dependency `%s` contains double-quote; use single-quotes instead" % (dep,)
|
||||
)
|
||||
|
||||
|
||||
def list_requirements():
|
||||
return list(set(REQUIREMENTS) | ALL_OPTIONAL_REQUIREMENTS)
|
||||
|
||||
@@ -152,7 +165,7 @@ class DependencyException(Exception):
|
||||
@property
|
||||
def dependencies(self):
|
||||
for i in self.args[0]:
|
||||
yield "'" + i + "'"
|
||||
yield '"' + i + '"'
|
||||
|
||||
|
||||
def check_requirements(for_feature=None):
|
||||
|
||||
@@ -312,16 +312,16 @@ class FederationAckCommand(Command):
|
||||
|
||||
NAME = "FEDERATION_ACK"
|
||||
|
||||
def __init__(self, instance_name, token):
|
||||
def __init__(self, instance_name: str, token: int):
|
||||
self.instance_name = instance_name
|
||||
self.token = token
|
||||
|
||||
@classmethod
|
||||
def from_line(cls, line):
|
||||
def from_line(cls, line: str) -> "FederationAckCommand":
|
||||
instance_name, token = line.split(" ")
|
||||
return cls(instance_name, int(token))
|
||||
|
||||
def to_line(self):
|
||||
def to_line(self) -> str:
|
||||
return "%s %s" % (self.instance_name, self.token)
|
||||
|
||||
|
||||
|
||||
@@ -104,7 +104,7 @@ tcp_outbound_commands_counter = Counter(
|
||||
|
||||
# A list of all connected protocols. This allows us to send metrics about the
|
||||
# connections.
|
||||
connected_connections = []
|
||||
connected_connections = [] # type: List[BaseReplicationStreamProtocol]
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from collections import namedtuple
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
|
||||
|
||||
from synapse.replication.tcp.streams._base import (
|
||||
Stream,
|
||||
@@ -21,6 +22,9 @@ from synapse.replication.tcp.streams._base import (
|
||||
make_http_update_function,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class FederationStream(Stream):
|
||||
"""Data to be sent over federation. Only available when master has federation
|
||||
@@ -38,7 +42,7 @@ class FederationStream(Stream):
|
||||
NAME = "federation"
|
||||
ROW_TYPE = FederationStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
if hs.config.worker_app is None:
|
||||
# master process: get updates from the FederationRemoteSendQueue.
|
||||
# (if the master is configured to send federation itself, federation_sender
|
||||
@@ -48,7 +52,9 @@ class FederationStream(Stream):
|
||||
current_token = current_token_without_instance(
|
||||
federation_sender.get_current_token
|
||||
)
|
||||
update_function = federation_sender.get_replication_rows
|
||||
update_function = (
|
||||
federation_sender.get_replication_rows
|
||||
) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
|
||||
|
||||
elif hs.should_send_federation():
|
||||
# federation sender: Query master process
|
||||
@@ -69,5 +75,7 @@ class FederationStream(Stream):
|
||||
return 0
|
||||
|
||||
@staticmethod
|
||||
async def _stub_update_function(instance_name, from_token, upto_token, limit):
|
||||
async def _stub_update_function(
|
||||
instance_name: str, from_token: int, upto_token: int, limit: int
|
||||
) -> Tuple[list, int, bool]:
|
||||
return [], upto_token, False
|
||||
|
||||
@@ -390,6 +390,9 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, RestServlet):
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_identifier: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
|
||||
@@ -833,6 +833,9 @@ class UserMediaRestServlet(RestServlet):
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
|
||||
@@ -90,6 +90,9 @@ class SyncRestServlet(RestServlet):
|
||||
self._event_serializer = hs.get_event_client_serializer()
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
if b"from" in request.args:
|
||||
# /events used to use 'from', but /sync uses 'since'.
|
||||
# Lets be helpful and whine if we see a 'from'.
|
||||
|
||||
@@ -187,6 +187,8 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
respond_with_json(request, 200, {}, send_cors=True)
|
||||
|
||||
async def _async_render_GET(self, request: SynapseRequest) -> None:
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
# XXX: if get_user_by_req fails, what should we do in an async render?
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
@@ -104,6 +104,9 @@ class AccountDetailsResource(DirectServeHtmlResource):
|
||||
respond_with_html(request, 200, html)
|
||||
|
||||
async def _async_render_POST(self, request: SynapseRequest):
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
try:
|
||||
session_id = get_username_mapping_session_cookie_from_request(request)
|
||||
except SynapseError as e:
|
||||
|
||||
@@ -60,7 +60,7 @@ from synapse.federation.federation_server import (
|
||||
FederationServer,
|
||||
)
|
||||
from synapse.federation.send_queue import FederationRemoteSendQueue
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.federation.sender import AbstractFederationSender, FederationSender
|
||||
from synapse.federation.transport.client import TransportLayerClient
|
||||
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
|
||||
from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
|
||||
@@ -571,7 +571,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
return TransportLayerClient(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_federation_sender(self):
|
||||
def get_federation_sender(self) -> AbstractFederationSender:
|
||||
if self.should_send_federation():
|
||||
return FederationSender(self)
|
||||
elif not self.config.worker_app:
|
||||
|
||||
@@ -183,12 +183,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
requests state from the cache, if False we need to query the DB for the
|
||||
missing state.
|
||||
"""
|
||||
is_all, known_absent, state_dict_ids = cache.get(group)
|
||||
cache_entry = cache.get(group)
|
||||
state_dict_ids = cache_entry.value
|
||||
|
||||
if is_all or state_filter.is_full():
|
||||
if cache_entry.full or state_filter.is_full():
|
||||
# Either we have everything or want everything, either way
|
||||
# `is_all` tells us whether we've gotten everything.
|
||||
return state_filter.filter_state(state_dict_ids), is_all
|
||||
return state_filter.filter_state(state_dict_ids), cache_entry.full
|
||||
|
||||
# tracks whether any of our requested types are missing from the cache
|
||||
missing_types = False
|
||||
@@ -202,7 +203,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
# There aren't any wild cards, so `concrete_types()` returns the
|
||||
# complete list of event types we're wanting.
|
||||
for key in state_filter.concrete_types():
|
||||
if key not in state_dict_ids and key not in known_absent:
|
||||
if key not in state_dict_ids and key not in cache_entry.known_absent:
|
||||
missing_types = True
|
||||
break
|
||||
|
||||
|
||||
@@ -25,8 +25,8 @@ from synapse.config.cache import add_resizable_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
caches_by_name = {}
|
||||
collectors_by_name = {} # type: Dict
|
||||
caches_by_name = {} # type: Dict[str, Sized]
|
||||
collectors_by_name = {} # type: Dict[str, CacheMetric]
|
||||
|
||||
cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
|
||||
cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
|
||||
|
||||
@@ -15,26 +15,38 @@
|
||||
import enum
|
||||
import logging
|
||||
import threading
|
||||
from collections import namedtuple
|
||||
from typing import Any
|
||||
from typing import Any, Dict, Generic, Iterable, Optional, Set, TypeVar
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))):
|
||||
# The type of the cache keys.
|
||||
KT = TypeVar("KT")
|
||||
# The type of the dictionary keys.
|
||||
DKT = TypeVar("DKT")
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class DictionaryEntry:
|
||||
"""Returned when getting an entry from the cache
|
||||
|
||||
Attributes:
|
||||
full (bool): Whether the cache has the full or dict or just some keys.
|
||||
full: Whether the cache has the full or dict or just some keys.
|
||||
If not full then not all requested keys will necessarily be present
|
||||
in `value`
|
||||
known_absent (set): Keys that were looked up in the dict and were not
|
||||
known_absent: Keys that were looked up in the dict and were not
|
||||
there.
|
||||
value (dict): The full or partial dict value
|
||||
value: The full or partial dict value
|
||||
"""
|
||||
|
||||
full = attr.ib(type=bool)
|
||||
known_absent = attr.ib()
|
||||
value = attr.ib()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.value)
|
||||
|
||||
@@ -45,21 +57,21 @@ class _Sentinel(enum.Enum):
|
||||
sentinel = object()
|
||||
|
||||
|
||||
class DictionaryCache:
|
||||
class DictionaryCache(Generic[KT, DKT]):
|
||||
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
|
||||
fetching a subset of dictionary keys for a particular key.
|
||||
"""
|
||||
|
||||
def __init__(self, name, max_entries=1000):
|
||||
def __init__(self, name: str, max_entries: int = 1000):
|
||||
self.cache = LruCache(
|
||||
max_size=max_entries, cache_name=name, size_callback=len
|
||||
) # type: LruCache[Any, DictionaryEntry]
|
||||
) # type: LruCache[KT, DictionaryEntry]
|
||||
|
||||
self.name = name
|
||||
self.sequence = 0
|
||||
self.thread = None
|
||||
self.thread = None # type: Optional[threading.Thread]
|
||||
|
||||
def check_thread(self):
|
||||
def check_thread(self) -> None:
|
||||
expected_thread = self.thread
|
||||
if expected_thread is None:
|
||||
self.thread = threading.current_thread()
|
||||
@@ -69,12 +81,14 @@ class DictionaryCache:
|
||||
"Cache objects can only be accessed from the main thread"
|
||||
)
|
||||
|
||||
def get(self, key, dict_keys=None):
|
||||
def get(
|
||||
self, key: KT, dict_keys: Optional[Iterable[DKT]] = None
|
||||
) -> DictionaryEntry:
|
||||
"""Fetch an entry out of the cache
|
||||
|
||||
Args:
|
||||
key
|
||||
dict_key(list): If given a set of keys then return only those keys
|
||||
dict_key: If given a set of keys then return only those keys
|
||||
that exist in the cache.
|
||||
|
||||
Returns:
|
||||
@@ -95,7 +109,7 @@ class DictionaryCache:
|
||||
|
||||
return DictionaryEntry(False, set(), {})
|
||||
|
||||
def invalidate(self, key):
|
||||
def invalidate(self, key: KT) -> None:
|
||||
self.check_thread()
|
||||
|
||||
# Increment the sequence number so that any SELECT statements that
|
||||
@@ -103,19 +117,25 @@ class DictionaryCache:
|
||||
self.sequence += 1
|
||||
self.cache.pop(key, None)
|
||||
|
||||
def invalidate_all(self):
|
||||
def invalidate_all(self) -> None:
|
||||
self.check_thread()
|
||||
self.sequence += 1
|
||||
self.cache.clear()
|
||||
|
||||
def update(self, sequence, key, value, fetched_keys=None):
|
||||
def update(
|
||||
self,
|
||||
sequence: int,
|
||||
key: KT,
|
||||
value: Dict[DKT, Any],
|
||||
fetched_keys: Optional[Set[DKT]] = None,
|
||||
) -> None:
|
||||
"""Updates the entry in the cache
|
||||
|
||||
Args:
|
||||
sequence
|
||||
key (K)
|
||||
value (dict[X,Y]): The value to update the cache with.
|
||||
fetched_keys (None|set[X]): All of the dictionary keys which were
|
||||
key
|
||||
value: The value to update the cache with.
|
||||
fetched_keys: All of the dictionary keys which were
|
||||
fetched from the database.
|
||||
|
||||
If None, this is the complete value for key K. Otherwise, it
|
||||
@@ -131,7 +151,9 @@ class DictionaryCache:
|
||||
else:
|
||||
self._update_or_insert(key, value, fetched_keys)
|
||||
|
||||
def _update_or_insert(self, key, value, known_absent):
|
||||
def _update_or_insert(
|
||||
self, key: KT, value: Dict[DKT, Any], known_absent: Set[DKT]
|
||||
) -> None:
|
||||
# We pop and reinsert as we need to tell the cache the size may have
|
||||
# changed
|
||||
|
||||
@@ -140,5 +162,5 @@ class DictionaryCache:
|
||||
entry.known_absent.update(known_absent)
|
||||
self.cache[key] = entry
|
||||
|
||||
def _insert(self, key, value, known_absent):
|
||||
def _insert(self, key: KT, value: Dict[DKT, Any], known_absent: Set[DKT]) -> None:
|
||||
self.cache[key] = DictionaryEntry(True, known_absent, value)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Callable, Dict, Generic, Tuple, TypeVar, Union
|
||||
|
||||
import attr
|
||||
from sortedcontainers import SortedList
|
||||
@@ -23,15 +24,19 @@ from synapse.util.caches import register_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SENTINEL = object()
|
||||
SENTINEL = object() # type: Any
|
||||
|
||||
T = TypeVar("T")
|
||||
KT = TypeVar("KT")
|
||||
VT = TypeVar("VT")
|
||||
|
||||
|
||||
class TTLCache:
|
||||
class TTLCache(Generic[KT, VT]):
|
||||
"""A key/value cache implementation where each entry has its own TTL"""
|
||||
|
||||
def __init__(self, cache_name, timer=time.time):
|
||||
def __init__(self, cache_name: str, timer: Callable[[], float] = time.time):
|
||||
# map from key to _CacheEntry
|
||||
self._data = {}
|
||||
self._data = {} # type: Dict[KT, _CacheEntry]
|
||||
|
||||
# the _CacheEntries, sorted by expiry time
|
||||
self._expiry_list = SortedList() # type: SortedList[_CacheEntry]
|
||||
@@ -40,26 +45,27 @@ class TTLCache:
|
||||
|
||||
self._metrics = register_cache("ttl", cache_name, self, resizable=False)
|
||||
|
||||
def set(self, key, value, ttl):
|
||||
def set(self, key: KT, value: VT, ttl: float) -> None:
|
||||
"""Add/update an entry in the cache
|
||||
|
||||
Args:
|
||||
key: key for this entry
|
||||
value: value for this entry
|
||||
ttl (float): TTL for this entry, in seconds
|
||||
ttl: TTL for this entry, in seconds
|
||||
"""
|
||||
expiry = self._timer() + ttl
|
||||
|
||||
self.expire()
|
||||
e = self._data.pop(key, SENTINEL)
|
||||
if e != SENTINEL:
|
||||
if e is not SENTINEL:
|
||||
assert isinstance(e, _CacheEntry)
|
||||
self._expiry_list.remove(e)
|
||||
|
||||
entry = _CacheEntry(expiry_time=expiry, ttl=ttl, key=key, value=value)
|
||||
self._data[key] = entry
|
||||
self._expiry_list.add(entry)
|
||||
|
||||
def get(self, key, default=SENTINEL):
|
||||
def get(self, key: KT, default: T = SENTINEL) -> Union[VT, T]:
|
||||
"""Get a value from the cache
|
||||
|
||||
Args:
|
||||
@@ -72,23 +78,23 @@ class TTLCache:
|
||||
"""
|
||||
self.expire()
|
||||
e = self._data.get(key, SENTINEL)
|
||||
if e == SENTINEL:
|
||||
if e is SENTINEL:
|
||||
self._metrics.inc_misses()
|
||||
if default == SENTINEL:
|
||||
if default is SENTINEL:
|
||||
raise KeyError(key)
|
||||
return default
|
||||
assert isinstance(e, _CacheEntry)
|
||||
self._metrics.inc_hits()
|
||||
return e.value
|
||||
|
||||
def get_with_expiry(self, key):
|
||||
def get_with_expiry(self, key: KT) -> Tuple[VT, float, float]:
|
||||
"""Get a value, and its expiry time, from the cache
|
||||
|
||||
Args:
|
||||
key: key to look up
|
||||
|
||||
Returns:
|
||||
Tuple[Any, float, float]: the value from the cache, the expiry time
|
||||
and the TTL
|
||||
A tuple of the value from the cache, the expiry time and the TTL
|
||||
|
||||
Raises:
|
||||
KeyError if the entry is not found
|
||||
@@ -102,7 +108,7 @@ class TTLCache:
|
||||
self._metrics.inc_hits()
|
||||
return e.value, e.expiry_time, e.ttl
|
||||
|
||||
def pop(self, key, default=SENTINEL):
|
||||
def pop(self, key: KT, default: T = SENTINEL) -> Union[VT, T]: # type: ignore
|
||||
"""Remove a value from the cache
|
||||
|
||||
If key is in the cache, remove it and return its value, else return default.
|
||||
@@ -118,29 +124,30 @@ class TTLCache:
|
||||
"""
|
||||
self.expire()
|
||||
e = self._data.pop(key, SENTINEL)
|
||||
if e == SENTINEL:
|
||||
if e is SENTINEL:
|
||||
self._metrics.inc_misses()
|
||||
if default == SENTINEL:
|
||||
if default is SENTINEL:
|
||||
raise KeyError(key)
|
||||
return default
|
||||
assert isinstance(e, _CacheEntry)
|
||||
self._expiry_list.remove(e)
|
||||
self._metrics.inc_hits()
|
||||
return e.value
|
||||
|
||||
def __getitem__(self, key):
|
||||
def __getitem__(self, key: KT) -> VT:
|
||||
return self.get(key)
|
||||
|
||||
def __delitem__(self, key):
|
||||
def __delitem__(self, key: KT) -> None:
|
||||
self.pop(key)
|
||||
|
||||
def __contains__(self, key):
|
||||
def __contains__(self, key: KT) -> bool:
|
||||
return key in self._data
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
self.expire()
|
||||
return len(self._data)
|
||||
|
||||
def expire(self):
|
||||
def expire(self) -> None:
|
||||
"""Run the expiry on the cache. Any entries whose expiry times are due will
|
||||
be removed
|
||||
"""
|
||||
@@ -158,7 +165,7 @@ class _CacheEntry:
|
||||
"""TTLCache entry"""
|
||||
|
||||
# expiry_time is the first attribute, so that entries are sorted by expiry.
|
||||
expiry_time = attr.ib()
|
||||
ttl = attr.ib()
|
||||
expiry_time = attr.ib(type=float)
|
||||
ttl = attr.ib(type=float)
|
||||
key = attr.ib()
|
||||
value = attr.ib()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# This script builds the Docker image to run the PostgreSQL tests, and then runs
|
||||
# the tests.
|
||||
|
||||
@@ -44,7 +44,7 @@ from tests.server import FakeTransport
|
||||
try:
|
||||
import hiredis
|
||||
except ImportError:
|
||||
hiredis = None
|
||||
hiredis = None # type: ignore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -69,6 +69,7 @@ class TypingStreamTestCase(BaseStreamTestCase):
|
||||
self.assert_request_is_get_repl_stream_updates(request, "typing")
|
||||
|
||||
# The from token should be the token from the last RDATA we got.
|
||||
assert request.args is not None
|
||||
self.assertEqual(int(request.args[b"from_token"][0]), token)
|
||||
|
||||
self.test_handler.on_rdata.assert_called_once()
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
import os
|
||||
from binascii import unhexlify
|
||||
from typing import Tuple
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from twisted.internet.protocol import Factory
|
||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||
@@ -32,7 +32,7 @@ from tests.server import FakeChannel, FakeSite, FakeTransport, make_request
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
test_server_connection_factory = None
|
||||
test_server_connection_factory = None # type: Optional[TestServerTLSConnectionFactory]
|
||||
|
||||
|
||||
class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase):
|
||||
|
||||
@@ -2,7 +2,7 @@ import json
|
||||
import logging
|
||||
from collections import deque
|
||||
from io import SEEK_END, BytesIO
|
||||
from typing import Callable, Iterable, MutableMapping, Optional, Tuple, Union
|
||||
from typing import Callable, Dict, Iterable, MutableMapping, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from typing_extensions import Deque
|
||||
@@ -13,8 +13,11 @@ from twisted.internet._resolver import SimpleResolverComplexifier
|
||||
from twisted.internet.defer import Deferred, fail, succeed
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.internet.interfaces import (
|
||||
IHostnameResolver,
|
||||
IProtocol,
|
||||
IPullProducer,
|
||||
IPushProducer,
|
||||
IReactorPluggableNameResolver,
|
||||
IReactorTCP,
|
||||
IResolverSimple,
|
||||
ITransport,
|
||||
)
|
||||
@@ -45,11 +48,11 @@ class FakeChannel:
|
||||
wire).
|
||||
"""
|
||||
|
||||
site = attr.ib(type=Site)
|
||||
site = attr.ib(type=Union[Site, "FakeSite"])
|
||||
_reactor = attr.ib()
|
||||
result = attr.ib(type=dict, default=attr.Factory(dict))
|
||||
_ip = attr.ib(type=str, default="127.0.0.1")
|
||||
_producer = None
|
||||
_producer = None # type: Optional[Union[IPullProducer, IPushProducer]]
|
||||
|
||||
@property
|
||||
def json_body(self):
|
||||
@@ -159,7 +162,11 @@ class FakeChannel:
|
||||
|
||||
Any cookines found are added to the given dict
|
||||
"""
|
||||
for h in self.headers.getRawHeaders("Set-Cookie"):
|
||||
headers = self.headers.getRawHeaders("Set-Cookie")
|
||||
if not headers:
|
||||
return
|
||||
|
||||
for h in headers:
|
||||
parts = h.split(";")
|
||||
k, v = parts[0].split("=", maxsplit=1)
|
||||
cookies[k] = v
|
||||
@@ -311,8 +318,8 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
|
||||
|
||||
self._tcp_callbacks = {}
|
||||
self._udp = []
|
||||
lookups = self.lookups = {}
|
||||
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()
|
||||
lookups = self.lookups = {} # type: Dict[str, str]
|
||||
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]
|
||||
|
||||
@implementer(IResolverSimple)
|
||||
class FakeResolver:
|
||||
@@ -324,6 +331,9 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
|
||||
self.nameResolver = SimpleResolverComplexifier(FakeResolver())
|
||||
super().__init__()
|
||||
|
||||
def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
|
||||
raise NotImplementedError()
|
||||
|
||||
def listenUDP(self, port, protocol, interface="", maxPacketSize=8196):
|
||||
p = udp.Port(port, protocol, interface, maxPacketSize, self)
|
||||
p.startListening()
|
||||
@@ -621,7 +631,9 @@ class FakeTransport:
|
||||
self.disconnected = True
|
||||
|
||||
|
||||
def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
|
||||
def connect_client(
|
||||
reactor: ThreadedMemoryReactorClock, client_id: int
|
||||
) -> Tuple[IProtocol, AccumulatingProtocol]:
|
||||
"""
|
||||
Connect a client to a fake TCP transport.
|
||||
|
||||
|
||||
@@ -377,14 +377,11 @@ class StateStoreTestCase(tests.unittest.TestCase):
|
||||
#######################################################
|
||||
# deliberately remove e2 (room name) from the _state_group_cache
|
||||
|
||||
(
|
||||
is_all,
|
||||
known_absent,
|
||||
state_dict_ids,
|
||||
) = self.state_datastore._state_group_cache.get(group)
|
||||
cache_entry = self.state_datastore._state_group_cache.get(group)
|
||||
state_dict_ids = cache_entry.value
|
||||
|
||||
self.assertEqual(is_all, True)
|
||||
self.assertEqual(known_absent, set())
|
||||
self.assertEqual(cache_entry.full, True)
|
||||
self.assertEqual(cache_entry.known_absent, set())
|
||||
self.assertDictEqual(
|
||||
state_dict_ids,
|
||||
{
|
||||
@@ -403,14 +400,11 @@ class StateStoreTestCase(tests.unittest.TestCase):
|
||||
fetched_keys=((e1.type, e1.state_key),),
|
||||
)
|
||||
|
||||
(
|
||||
is_all,
|
||||
known_absent,
|
||||
state_dict_ids,
|
||||
) = self.state_datastore._state_group_cache.get(group)
|
||||
cache_entry = self.state_datastore._state_group_cache.get(group)
|
||||
state_dict_ids = cache_entry.value
|
||||
|
||||
self.assertEqual(is_all, False)
|
||||
self.assertEqual(known_absent, {(e1.type, e1.state_key)})
|
||||
self.assertEqual(cache_entry.full, False)
|
||||
self.assertEqual(cache_entry.known_absent, {(e1.type, e1.state_key)})
|
||||
self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id})
|
||||
|
||||
############################################
|
||||
|
||||
@@ -27,7 +27,9 @@ class DictCacheTestCase(unittest.TestCase):
|
||||
key = "test_simple_cache_hit_full"
|
||||
|
||||
v = self.cache.get(key)
|
||||
self.assertEqual((False, set(), {}), v)
|
||||
self.assertIs(v.full, False)
|
||||
self.assertEqual(v.known_absent, set())
|
||||
self.assertEqual({}, v.value)
|
||||
|
||||
seq = self.cache.sequence
|
||||
test_value = {"test": "test_simple_cache_hit_full"}
|
||||
|
||||
Reference in New Issue
Block a user