1
0

Compare commits

..

16 Commits

Author SHA1 Message Date
David Baker 53104b5e45 initial_rooms not initial_receipts 2025-04-08 15:38:54 +01:00
David Baker 1118b5c4b4 Filter for rooms that are in initial_rooms
otherwise we send too many
2025-04-08 12:54:44 +01:00
David Baker 1a4c085470 re-use user receipts in the receipts extension 2025-04-08 11:28:40 +01:00
David Baker 5bcd19babd Merge branch 'develop' into dbkr/sss_notif_counts 2025-04-07 15:46:55 +01:00
David Baker 73cd0d0aa4 Add comment 2025-04-07 15:14:22 +01:00
David Baker c0749a8ac7 Return no notifications in case of no member event at all 2025-04-04 17:30:34 +01:00
David Baker 38fc56b2a1 Add type 2025-04-04 10:33:47 +01:00
David Baker 98a5eb9fd4 Import order 2025-04-04 10:18:08 +01:00
David Baker 002e8ccf41 Add test for notification counts in SSS 2025-04-03 18:20:47 +01:00
David Baker 44b487a1b0 Change test as rooms now appear when read receipts sent 2025-04-01 16:34:52 +01:00
David Baker f72ba26e15 More iteration on types 2025-03-31 15:29:40 +01:00
David Baker 1676fa787f Iterate on types 2025-03-31 15:26:11 +01:00
David Baker 9ba2c7030b More types 2025-03-28 15:50:47 +00:00
David Baker 4ea8507bbd Fix types 2025-03-28 15:13:35 +00:00
David Baker cb9d25ffed Here is the news at six o'clock 2025-03-28 12:36:05 +00:00
David Baker d39dc3ef27 Add support for sending notification counts in simplified sliding sync 2025-03-28 12:22:05 +00:00
62 changed files with 426 additions and 536 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
run: docker buildx inspect
- name: Install Cosign
uses: sigstore/cosign-installer@3454372f43399081ed03b604cb2d021dabca52bb # v3.8.2
uses: sigstore/cosign-installer@d7d6bc7722e3daa8354c50bcb52f4837da5e9b6a # v3.8.1
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+1 -1
View File
@@ -44,6 +44,6 @@ jobs:
- run: cargo fmt
continue-on-error: true
- uses: stefanzweifel/git-auto-commit-action@b863ae1933cb653a53c021fe36dbb774e1fb9403 # v5.2.0
- uses: stefanzweifel/git-auto-commit-action@e348103e9026cc0eee72ae06630dbe30c8bf7a79 # v5.1.0
with:
commit_message: "Attempt to fix linting"
+2 -2
View File
@@ -203,7 +203,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Download all workflow run artifacts
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
uses: actions/download-artifact@95815c38cf2ff2164869cbab79da8d1f422bc89e # v4.2.1
- name: Build a tarball for the debs
# We need to merge all the debs uploads into one folder, then compress
# that.
@@ -213,7 +213,7 @@ jobs:
tar -cvJf debs.tar.xz debs
- name: Attach to release
# Pinned to work around https://github.com/softprops/action-gh-release/issues/445
uses: softprops/action-gh-release@c95fe1489396fe8a9eb87c0abf8aa5b2ef267fda # v0.1.15
uses: softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844 # v0.1.15
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
if: >
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
steps:
- uses: actions/add-to-project@5b1a254a3546aef88e0a7724a77a623fa2e47c36 # main (v1.0.2 + 10 commits)
- uses: actions/add-to-project@280af8ae1f83a494cfad2cb10f02f6d13529caa9 # main (v1.0.2 + 10 commits)
id: add_project
with:
project-url: "https://github.com/orgs/matrix-org/projects/67"
-7
View File
@@ -1,10 +1,3 @@
# Synapse 1.128.0 (2025-04-08)
No significant changes since 1.128.0rc1.
# Synapse 1.128.0rc1 (2025-04-01)
### Features
Generated
+4 -4
View File
@@ -13,9 +13,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.98"
version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]]
name = "arc-swap"
@@ -316,9 +316,9 @@ dependencies = [
[[package]]
name = "pyo3-log"
version = "0.12.3"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079e412e909af5d6be7c04a7f29f6a2837a080410e1c529c9dee2c367383db4"
checksum = "4b78e4983ba15bc62833a0e0941d965bc03690163f1127864f1408db25063466"
dependencies = [
"arc-swap",
"log",
-1
View File
@@ -1 +0,0 @@
Add `passthrough_authorization_parameters` in OIDC configuration to allow to pass parameters to the authorization grant URL.
-1
View File
@@ -1 +0,0 @@
Add documentation for configuring [Pocket ID](https://github.com/pocket-id/pocket-id) as an OIDC provider.
+1
View File
@@ -0,0 +1 @@
Add support for sending notification counts and thread notification counts in simplified sliding sync mode.
-1
View File
@@ -1 +0,0 @@
In configure_workers_and_start.py, use the same absolute path of Python in the interpreter shebang, and invoke child Python processes with `sys.executable`.
-1
View File
@@ -1 +0,0 @@
Optimize the build of the workers image.
-1
View File
@@ -1 +0,0 @@
In start_for_complement.sh, replace some external program calls with shell builtins.
-1
View File
@@ -1 +0,0 @@
When generating container scripts from templates, don't add a leading newline so that their shebangs may be handled correctly.
-1
View File
@@ -1 +0,0 @@
Fix typo in docs about the `push` config option. Contributed by @HarHarLinks.
-1
View File
@@ -1 +0,0 @@
Fix `force_tracing_for_users` config when using delegated auth.
-1
View File
@@ -1 +0,0 @@
Fix the token introspection cache logging access tokens when MAS integration is in use.
-1
View File
@@ -1 +0,0 @@
Add cache to storage functions used to auth requests when using delegated auth.
-1
View File
@@ -1 +0,0 @@
Stop caching introspection failures when delegating auth to MAS.
-1
View File
@@ -1 +0,0 @@
Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic.
-1
View File
@@ -1 +0,0 @@
Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0.
-1
View File
@@ -1 +0,0 @@
Add support for handling `GET /devices/` on workers.
-1
View File
@@ -1 +0,0 @@
Allow `/rooms/` admin API to be run on workers.
-1
View File
@@ -1 +0,0 @@
Fix longstanding bug where Synapse would immediately retry a failing push endpoint when a new event is received, ignoring any backoff timers.
-1
View File
@@ -1 +0,0 @@
Minor performance improvements to the notifier.
-1
View File
@@ -1 +0,0 @@
Slight performance increase when using the ratelimiter.
-1
View File
@@ -1 +0,0 @@
Allow client & media admin apis to coexist.
-6
View File
@@ -1,9 +1,3 @@
matrix-synapse-py3 (1.128.0) stable; urgency=medium
* New Synapse release 1.128.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 08 Apr 2025 14:09:54 +0100
matrix-synapse-py3 (1.128.0~rc1) stable; urgency=medium
* Update Poetry to 2.1.1.
+24 -28
View File
@@ -3,37 +3,18 @@
ARG SYNAPSE_VERSION=latest
ARG FROM=matrixdotorg/synapse:$SYNAPSE_VERSION
ARG DEBIAN_VERSION=bookworm
ARG PYTHON_VERSION=3.12
# first of all, we create a base image with dependencies which we can copy into the
# first of all, we create a base image with an nginx which we can copy into the
# target image. For repeated rebuilds, this is much faster than apt installing
# each time.
FROM ghcr.io/astral-sh/uv:python${PYTHON_VERSION}-${DEBIAN_VERSION} AS deps_base
# Tell apt to keep downloaded package files, as we're using cache mounts.
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
FROM docker.io/library/debian:${DEBIAN_VERSION}-slim AS deps_base
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && \
DEBIAN_FRONTEND=noninteractive apt-get install -yqq --no-install-recommends \
nginx-light
RUN \
# remove default page
rm /etc/nginx/sites-enabled/default && \
# have nginx log to stderr/out
ln -sf /dev/stdout /var/log/nginx/access.log && \
ln -sf /dev/stderr /var/log/nginx/error.log
# --link-mode=copy silences a warning as uv isn't able to do hardlinks between its cache
# (mounted as --mount=type=cache) and the target directory.
RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install --link-mode=copy --prefix="/uv/usr/local" supervisor~=4.2
RUN mkdir -p /uv/etc/supervisor/conf.d
redis-server nginx-light
# Similarly, a base to copy the redis server from.
#
@@ -46,16 +27,31 @@ FROM docker.io/library/redis:7-${DEBIAN_VERSION} AS redis_base
# now build the final image, based on the the regular Synapse docker image
FROM $FROM
# Copy over dependencies
# Install supervisord with uv pip instead of apt, to avoid installing a second
# copy of python.
# --link-mode=copy silences a warning as uv isn't able to do hardlinks between its cache
# (mounted as --mount=type=cache) and the target directory.
RUN \
--mount=type=bind,from=ghcr.io/astral-sh/uv:0.6.8,source=/uv,target=/uv \
--mount=type=cache,target=/root/.cache/uv \
/uv pip install --link-mode=copy --prefix="/usr/local" supervisor~=4.2
RUN mkdir -p /etc/supervisor/conf.d
# Copy over redis and nginx
COPY --from=redis_base /usr/local/bin/redis-server /usr/local/bin
COPY --from=deps_base /uv /
COPY --from=deps_base /usr/sbin/nginx /usr/sbin
COPY --from=deps_base /usr/share/nginx /usr/share/nginx
COPY --from=deps_base /usr/lib/nginx /usr/lib/nginx
COPY --from=deps_base /etc/nginx /etc/nginx
COPY --from=deps_base /var/log/nginx /var/log/nginx
# chown to allow non-root user to write to http-*-temp-path dirs
COPY --from=deps_base --chown=www-data:root /var/lib/nginx /var/lib/nginx
RUN rm /etc/nginx/sites-enabled/default
RUN mkdir /var/log/nginx /var/lib/nginx
RUN chown www-data /var/lib/nginx
# have nginx log to stderr/out
RUN ln -sf /dev/stdout /var/log/nginx/access.log
RUN ln -sf /dev/stderr /var/log/nginx/error.log
# Copy Synapse worker, nginx and supervisord configuration template files
COPY ./docker/conf-workers/* /conf/
@@ -74,4 +70,4 @@ FROM $FROM
# Replace the healthcheck with one which checks *all* the workers. The script
# is generated by configure_workers_and_start.py.
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
CMD ["/healthcheck.sh"]
CMD /bin/sh /healthcheck.sh
+1 -1
View File
@@ -58,4 +58,4 @@ ENTRYPOINT ["/start_for_complement.sh"]
# Update the healthcheck to have a shorter check interval
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
CMD ["/healthcheck.sh"]
CMD /bin/sh /healthcheck.sh
@@ -9,7 +9,7 @@ echo " Args: $*"
echo " Env: SYNAPSE_COMPLEMENT_DATABASE=$SYNAPSE_COMPLEMENT_DATABASE SYNAPSE_COMPLEMENT_USE_WORKERS=$SYNAPSE_COMPLEMENT_USE_WORKERS SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=$SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR"
function log {
d=$(printf '%(%Y-%m-%d %H:%M:%S)T,%.3s\n' ${EPOCHREALTIME/./ })
d=$(date +"%Y-%m-%d %H:%M:%S,%3N")
echo "$d $*"
}
@@ -103,11 +103,12 @@ fi
# Note that both the key and certificate are in PEM format (not DER).
# First generate a configuration file to set up a Subject Alternative Name.
echo "\
cat > /conf/server.tls.conf <<EOF
.include /etc/ssl/openssl.cnf
[SAN]
subjectAltName=DNS:${SERVER_NAME}" > /conf/server.tls.conf
subjectAltName=DNS:${SERVER_NAME}
EOF
# Generate an RSA key
openssl genrsa -out /conf/server.tls.key 2048
@@ -122,8 +123,8 @@ openssl x509 -req -in /conf/server.tls.csr \
-out /conf/server.tls.crt -extfile /conf/server.tls.conf -extensions SAN
# Assert that we have a Subject Alternative Name in the certificate.
# (the test will exit with 1 here if there isn't a SAN in the certificate.)
[[ $(openssl x509 -in /conf/server.tls.crt -noout -text) == *DNS:* ]]
# (grep will exit with 1 here if there isn't a SAN in the certificate.)
openssl x509 -in /conf/server.tls.crt -noout -text | grep DNS:
export SYNAPSE_TLS_CERT=/conf/server.tls.crt
export SYNAPSE_TLS_KEY=/conf/server.tls.key
+3 -6
View File
@@ -1,4 +1,4 @@
#!/usr/local/bin/python
#!/usr/bin/env python
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
@@ -376,11 +376,9 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
#
# We use append mode in case the files have already been written to by something else
# (for instance, as part of the instructions in a dockerfile).
exists = os.path.isfile(dst)
with open(dst, "a") as outfile:
# In case the existing file doesn't end with a newline
if exists:
outfile.write("\n")
outfile.write("\n")
outfile.write(rendered)
@@ -606,7 +604,7 @@ def generate_base_homeserver_config() -> None:
# start.py already does this for us, so just call that.
# note that this script is copied in in the official, monolith dockerfile
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
subprocess.run([sys.executable, "/start.py", "migrate_config"], check=True)
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def parse_worker_types(
@@ -1000,7 +998,6 @@ def generate_worker_files(
"/healthcheck.sh",
healthcheck_urls=healthcheck_urls,
)
os.chmod("/healthcheck.sh", 0o755)
# Ensure the logging directory exists
log_dir = data_dir + "/logs"
-27
View File
@@ -23,7 +23,6 @@ such as [Github][github-idp].
[auth0]: https://auth0.com/
[authentik]: https://goauthentik.io/
[lemonldap]: https://lemonldap-ng.org/
[pocket-id]: https://pocket-id.org/
[okta]: https://www.okta.com/
[dex-idp]: https://github.com/dexidp/dex
[keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
@@ -625,32 +624,6 @@ oidc_providers:
Note that the fields `client_id` and `client_secret` are taken from the CURL response above.
### Pocket ID
[Pocket ID][pocket-id] is a simple OIDC provider that allows users to authenticate with their passkeys.
1. Go to `OIDC Clients`
2. Click on `Add OIDC Client`
3. Add a name, for example `Synapse`
4. Add `"https://auth.example.org/_synapse/client/oidc/callback` to `Callback URLs` # Replace `auth.example.org` with your domain
5. Click on `Save`
6. Note down your `Client ID` and `Client secret`, these will be used later
Synapse config:
```yaml
oidc_providers:
- idp_id: pocket_id
idp_name: Pocket ID
issuer: "https://auth.example.org/" # Replace with your domain
client_id: "your-client-id" # Replace with the "Client ID" you noted down before
client_secret: "your-client-secret" # Replace with the "Client secret" you noted down before
scopes: ["openid", "profile"]
user_mapping_provider:
config:
localpart_template: "{{ user.preferred_username }}"
display_name_template: "{{ user.name }}"
```
### Shibboleth with OIDC Plugin
[Shibboleth](https://www.shibboleth.net/) is an open Standard IdP solution widely used by Universities.
@@ -3672,9 +3672,6 @@ Options for each entry include:
* `additional_authorization_parameters`: String to string dictionary that will be passed as
additional parameters to the authorization grant URL.
* `passthrough_authorization_parameters`: List of parameters that will be passed through from the redirect endpoint
to the authorization grant URL.
* `allow_existing_users`: set to true to allow a user logging in via OIDC to
match a pre-existing account instead of failing. This could be used if
switching from password logins to OIDC. Defaults to false.
@@ -3801,7 +3798,6 @@ oidc_providers:
jwks_uri: "https://accounts.example.com/.well-known/jwks.json"
additional_authorization_parameters:
acr_values: 2fa
passthrough_authorization_parameters: ["login_hint"]
skip_verification: true
enable_registration: true
user_mapping_provider:
@@ -4018,7 +4014,7 @@ This option has a number of sub-options. They are as follows:
* `include_content`: Clients requesting push notifications can either have the body of
the message sent in the notification poke along with other details
like the sender, or just the event ID and room ID (`event_id_only`).
If clients choose to have the body sent, this option controls whether the
If clients choose the to have the body sent, this option controls whether the
notification request includes the content of the event (other details
like the sender are still included). If `event_id_only` is enabled, it
has no effect.
-2
View File
@@ -249,7 +249,6 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
^/_matrix/client/(r0|v3|unstable)/capabilities$
^/_matrix/client/(r0|v3|unstable)/notifications$
^/_synapse/admin/v1/rooms/
# Encryption requests
^/_matrix/client/(r0|v3|unstable)/keys/query$
@@ -281,7 +280,6 @@ Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
^/_matrix/client/unstable/org.matrix.msc4140/delayed_events
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/
# Account data requests
^/_matrix/client/(r0|v3|unstable)/.*/tags
Generated
+9 -10
View File
@@ -2053,19 +2053,18 @@ tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"]
[[package]]
name = "pyopenssl"
version = "25.0.0"
version = "24.3.0"
description = "Python wrapper module around the OpenSSL library"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "pyOpenSSL-25.0.0-py3-none-any.whl", hash = "sha256:424c247065e46e76a37411b9ab1782541c23bb658bf003772c3405fbaa128e90"},
{file = "pyopenssl-25.0.0.tar.gz", hash = "sha256:cd2cef799efa3936bb08e8ccb9433a575722b9dd986023f1cabc4ae64e9dac16"},
{file = "pyOpenSSL-24.3.0-py3-none-any.whl", hash = "sha256:e474f5a473cd7f92221cc04976e48f4d11502804657a08a989fb3be5514c904a"},
{file = "pyopenssl-24.3.0.tar.gz", hash = "sha256:49f7a019577d834746bc55c5fce6ecbcec0f2b4ec5ce1cf43a9a173b8138bb36"},
]
[package.dependencies]
cryptography = ">=41.0.5,<45"
typing-extensions = {version = ">=4.9", markers = "python_version < \"3.13\" and python_version >= \"3.8\""}
[package.extras]
docs = ["sphinx (!=5.2.0,!=5.2.0.post0,!=7.2.5)", "sphinx_rtd_theme"]
@@ -2957,14 +2956,14 @@ files = [
[[package]]
name = "types-jsonschema"
version = "4.23.0.20241208"
version = "4.23.0.20240813"
description = "Typing stubs for jsonschema"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "types_jsonschema-4.23.0.20241208-py3-none-any.whl", hash = "sha256:87934bd9231c99d8eff94cacfc06ba668f7973577a9bd9e1f9de957c5737313e"},
{file = "types_jsonschema-4.23.0.20241208.tar.gz", hash = "sha256:e8b15ad01f290ecf6aea53f93fbdf7d4730e4600313e89e8a7f95622f7e87b7c"},
{file = "types-jsonschema-4.23.0.20240813.tar.gz", hash = "sha256:c93f48206f209a5bc4608d295ac39f172fb98b9e24159ce577dbd25ddb79a1c0"},
{file = "types_jsonschema-4.23.0.20240813-py3-none-any.whl", hash = "sha256:be283e23f0b87547316c2ee6b0fd36d95ea30e921db06478029e10b5b6aa6ac3"},
]
[package.dependencies]
@@ -3008,14 +3007,14 @@ files = [
[[package]]
name = "types-psycopg2"
version = "2.9.21.20250318"
version = "2.9.21.20250121"
description = "Typing stubs for psycopg2"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_psycopg2-2.9.21.20250318-py3-none-any.whl", hash = "sha256:7296d111ad950bbd2fc979a1ab0572acae69047f922280e77db657c00d2c79c0"},
{file = "types_psycopg2-2.9.21.20250318.tar.gz", hash = "sha256:eb6eac5bfb16adfd5f16b818918b9e26a40ede147e0f2bbffdf53a6ef7025a87"},
{file = "types_psycopg2-2.9.21.20250121-py3-none-any.whl", hash = "sha256:b890dc6f5a08b6433f0ff73a4ec9a834deedad3e914f2a4a6fd43df021f745f1"},
{file = "types_psycopg2-2.9.21.20250121.tar.gz", hash = "sha256:2b0e2cd0f3747af1ae25a7027898716d80209604770ef3cbf350fe055b9c349b"},
]
[[package]]
+1 -1
View File
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.128.0"
version = "1.128.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
+3 -62
View File
@@ -45,11 +45,10 @@ from synapse.api.errors import (
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
from synapse.rest.admin.experimental_features import ExperimentalFeature
@@ -178,7 +177,6 @@ class MSC3861DelegatedAuth(BaseAuth):
self._http_client = hs.get_proxied_http_client()
self._hostname = hs.hostname
self._admin_token: Callable[[], Optional[str]] = self._config.admin_token
self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
# # Token Introspection Cache
# This remembers what users/devices are represented by which access tokens,
@@ -203,8 +201,6 @@ class MSC3861DelegatedAuth(BaseAuth):
self._clock,
"token_introspection",
timeout_ms=120_000,
# don't log because the keys are access tokens
enable_logging=False,
)
self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata](
@@ -279,9 +275,7 @@ class MSC3861DelegatedAuth(BaseAuth):
metadata = await self._issuer_metadata.get()
return metadata.get("introspection_endpoint")
async def _introspect_token(
self, token: str, cache_context: ResponseCacheContext[str]
) -> IntrospectionResult:
async def _introspect_token(self, token: str) -> IntrospectionResult:
"""
Send a token to the introspection endpoint and returns the introspection response
@@ -297,8 +291,6 @@ class MSC3861DelegatedAuth(BaseAuth):
Returns:
The introspection response
"""
# By default, we shouldn't cache the result unless we know it's valid
cache_context.should_cache = False
introspection_endpoint = await self._introspection_endpoint()
raw_headers: Dict[str, str] = {
"Content-Type": "application/x-www-form-urlencoded",
@@ -356,8 +348,6 @@ class MSC3861DelegatedAuth(BaseAuth):
"The introspection endpoint returned an invalid JSON response."
)
# We had a valid response, so we can cache it
cache_context.should_cache = True
return IntrospectionResult(
IntrospectionToken(**resp), retrieved_at_ms=self._clock.time_msec()
)
@@ -371,55 +361,6 @@ class MSC3861DelegatedAuth(BaseAuth):
allow_guest: bool = False,
allow_expired: bool = False,
allow_locked: bool = False,
) -> Requester:
"""Get a registered user's ID.
Args:
request: An HTTP request with an access_token query parameter.
allow_guest: If False, will raise an AuthError if the user making the
request is a guest.
allow_expired: If True, allow the request through even if the account
is expired, or session token lifetime has ended. Note that
/login will deliver access tokens regardless of expiration.
Returns:
Resolves to the requester
Raises:
InvalidClientCredentialsError if no user by that token exists or the token
is invalid.
AuthError if access is denied for the user in the access token
"""
parent_span = active_span()
with start_active_span("get_user_by_req"):
requester = await self._wrapped_get_user_by_req(
request, allow_guest, allow_expired, allow_locked
)
if parent_span:
if requester.authenticated_entity in self._force_tracing_for_users:
# request tracing is enabled for this user, so we need to force it
# tracing on for the parent span (which will be the servlet span).
#
# It's too late for the get_user_by_req span to inherit the setting,
# so we also force it on for that.
force_tracing()
force_tracing(parent_span)
parent_span.set_tag(
"authenticated_entity", requester.authenticated_entity
)
parent_span.set_tag("user_id", requester.user.to_string())
if requester.device_id is not None:
parent_span.set_tag("device_id", requester.device_id)
if requester.app_service is not None:
parent_span.set_tag("appservice_id", requester.app_service.id)
return requester
async def _wrapped_get_user_by_req(
self,
request: SynapseRequest,
allow_guest: bool = False,
allow_expired: bool = False,
allow_locked: bool = False,
) -> Requester:
access_token = self.get_access_token_from_request(request)
@@ -488,7 +429,7 @@ class MSC3861DelegatedAuth(BaseAuth):
try:
introspection_result = await self._introspection_cache.wrap(
token, self._introspect_token, token, cache_context=True
token, self._introspect_token, token
)
except Exception:
logger.exception("Failed to introspect token")
+11 -8
View File
@@ -20,7 +20,8 @@
#
#
from typing import Dict, Hashable, Optional, Tuple
from collections import OrderedDict
from typing import Hashable, Optional, Tuple
from synapse.api.errors import LimitExceededError
from synapse.config.ratelimiting import RatelimitSettings
@@ -79,14 +80,12 @@ class Ratelimiter:
self.store = store
self._limiter_name = cfg.key
# A dictionary representing the token buckets tracked by this rate
# An ordered dictionary representing the token buckets tracked by this rate
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
# * The number of tokens currently in the bucket,
# * The time point when the bucket was last completely empty, and
# * The rate_hz (leak rate) of this particular bucket.
self.actions: Dict[Hashable, Tuple[float, float, float]] = {}
self.clock.looping_call(self._prune_message_counts, 60 * 1000)
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
def _get_key(
self, requester: Optional[Requester], key: Optional[Hashable]
@@ -170,6 +169,9 @@ class Ratelimiter:
rate_hz = rate_hz if rate_hz is not None else self.rate_hz
burst_count = burst_count if burst_count is not None else self.burst_count
# Remove any expired entries
self._prune_message_counts(time_now_s)
# Check if there is an existing count entry for this key
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
@@ -244,12 +246,13 @@ class Ratelimiter:
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
def _prune_message_counts(self) -> None:
def _prune_message_counts(self, time_now_s: float) -> None:
"""Remove message count entries that have not exceeded their defined
rate_hz limit
"""
time_now_s = self.clock.time()
Args:
time_now_s: The current time
"""
# We create a copy of the key list here as the dictionary is modified during
# the loop
for key in list(self.actions.keys()):
+7 -17
View File
@@ -21,7 +21,7 @@
#
import logging
import sys
from typing import Dict, List, cast
from typing import Dict, List
from twisted.web.resource import Resource
@@ -51,8 +51,8 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
from synapse.rest import ClientRestResource
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -190,11 +190,7 @@ class GenericWorkerServer(HomeServer):
resources.update(build_synapse_client_resource_tree(self))
resources["/.well-known"] = well_known_resource(self)
admin_res = resources.get("/_synapse/admin")
if admin_res is not None:
admin.register_servlets(self, cast(JsonResource, admin_res))
else:
resources["/_synapse/admin"] = AdminRestResource(self)
elif name == "federation":
resources[FEDERATION_PREFIX] = TransportLayerServer(self)
elif name == "media":
@@ -203,21 +199,15 @@ class GenericWorkerServer(HomeServer):
# We need to serve the admin servlets for media on the
# worker.
admin_res = resources.get("/_synapse/admin")
if admin_res is not None:
register_servlets_for_media_repo(
self, cast(JsonResource, admin_res)
)
else:
admin_resource = JsonResource(self, canonical_json=False)
register_servlets_for_media_repo(self, admin_resource)
resources["/_synapse/admin"] = admin_resource
admin_resource = JsonResource(self, canonical_json=False)
register_servlets_for_media_repo(self, admin_resource)
resources.update(
{
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
"/_synapse/admin": admin_resource,
}
)
-6
View File
@@ -356,9 +356,6 @@ def _parse_oidc_config_dict(
additional_authorization_parameters=oidc_config.get(
"additional_authorization_parameters", {}
),
passthrough_authorization_parameters=oidc_config.get(
"passthrough_authorization_parameters", []
),
)
@@ -504,6 +501,3 @@ class OidcProviderConfig:
# Additional parameters that will be passed to the authorization grant URL
additional_authorization_parameters: Mapping[str, str]
# Allow query parameters to the redirect endpoint that will be passed to the authorization grant URL
passthrough_authorization_parameters: Collection[str]
-2
View File
@@ -163,8 +163,6 @@ class DeviceWorkerHandler:
raise errors.NotFoundError()
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
device = dict(device)
_update_device_from_client_ips(device, ips)
set_tag("device", str(device))
+1 -11
View File
@@ -467,10 +467,6 @@ class OidcProvider:
self._sso_handler.register_identity_provider(self)
self.passthrough_authorization_parameters = (
provider.passthrough_authorization_parameters
)
def _validate_metadata(self, m: OpenIDProviderMetadata) -> None:
"""Verifies the provider metadata.
@@ -1009,6 +1005,7 @@ class OidcProvider:
when everything is done (or None for UI Auth)
ui_auth_session_id: The session ID of the ongoing UI Auth (or
None if this is a login).
Returns:
The redirect URL to the authorization endpoint.
@@ -1081,13 +1078,6 @@ class OidcProvider:
)
)
# add passthrough additional authorization parameters
passthrough_authorization_parameters = self.passthrough_authorization_parameters
for parameter in passthrough_authorization_parameters:
parameter_value = parse_string(request, parameter)
if parameter_value:
additional_authorization_parameters.update({parameter: parameter_value})
authorization_endpoint = metadata.get("authorization_endpoint")
return prepare_grant_uri(
authorization_endpoint,
+45 -7
View File
@@ -15,7 +15,17 @@
import itertools
import logging
from itertools import chain
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)
from prometheus_client import Histogram
from typing_extensions import assert_never
@@ -38,6 +48,7 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.stream import PaginateFunction
@@ -245,11 +256,31 @@ class SlidingSyncHandler:
to_token=to_token,
)
# fetch the user's receipts between the two points: these will be factor
# in deciding whether to send the room, since it may have changed their
# notification counts
receipts = await self.store.get_linearized_receipts_for_user_in_rooms(
user_id=user_id,
room_ids=interested_rooms.relevant_room_map.keys(),
from_key=from_token.stream_token.receipt_key if from_token else None,
to_key=to_token.receipt_key,
)
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = self.room_lists.filter_relevant_rooms_to_send(
sync_config.user,
previous_connection_state,
from_token.stream_token if from_token else None,
to_token,
interested_rooms.relevant_room_map,
receipts,
)
lists = interested_rooms.lists
relevant_room_map = interested_rooms.relevant_room_map
all_rooms = interested_rooms.all_rooms
room_membership_for_user_map = interested_rooms.room_membership_for_user_map
relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -272,6 +303,7 @@ class SlidingSyncHandler:
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
room_receipts=receipts[room_id] if room_id in receipts else None,
)
# Filter out empty room results during incremental sync
@@ -296,6 +328,7 @@ class SlidingSyncHandler:
actual_room_response_map=rooms,
from_token=from_token,
to_token=to_token,
user_receipts=receipts,
)
if has_lists or has_room_subscriptions:
@@ -543,6 +576,7 @@ class SlidingSyncHandler:
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
room_receipts: Optional[Sequence[ReceiptInRoom]],
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
@@ -560,6 +594,8 @@ class SlidingSyncHandler:
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
room_receipts: Any read receipts from the in question in that room between
from_token and to_token
"""
user = sync_config.user
@@ -1312,6 +1348,11 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
unread_notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
)
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
@@ -1329,11 +1370,8 @@ class SlidingSyncHandler:
bump_stamp=bump_stamp,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
notif_counts=unread_notifs,
room_receipts=room_receipts,
)
@trace
+19 -22
View File
@@ -80,6 +80,7 @@ class SlidingSyncExtensionHandler:
actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
user_receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.
@@ -95,6 +96,7 @@ class SlidingSyncExtensionHandler:
Sliding Sync response.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
user_receipts: Map of room ID to list of the syncing user's receipts in the room.
"""
if sync_config.extensions is None:
@@ -142,6 +144,7 @@ class SlidingSyncExtensionHandler:
receipts_request=sync_config.extensions.receipts,
to_token=to_token,
from_token=from_token,
user_receipts=user_receipts,
)
typing_coro = None
@@ -619,6 +622,7 @@ class SlidingSyncExtensionHandler:
receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
user_receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
"""Handle Receipts extension (MSC3960)
@@ -635,6 +639,7 @@ class SlidingSyncExtensionHandler:
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
user_receipts: Map of room ID to list of the syncing user's receipts in the room.
"""
# Skip if the extension is not enabled
if not receipts_request.enabled:
@@ -726,15 +731,6 @@ class SlidingSyncExtensionHandler:
)
if initial_rooms:
# We also always send down receipts for the current user.
user_receipts = (
await self.store.get_linearized_receipts_for_user_in_rooms(
user_id=sync_config.user.to_string(),
room_ids=initial_rooms,
to_key=to_token.receipt_key,
)
)
# For rooms we haven't previously sent down, we could send all receipts
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
@@ -752,22 +748,23 @@ class SlidingSyncExtensionHandler:
# Combine the receipts for a room and add them to
# `fetched_receipts`
for room_id in initial_receipts.keys() | user_receipts.keys():
receipt_content = ReceiptInRoom.merge_to_content(
list(
itertools.chain(
initial_receipts.get(room_id, []),
user_receipts.get(room_id, []),
if room_id in initial_rooms:
receipt_content = ReceiptInRoom.merge_to_content(
list(
itertools.chain(
initial_receipts.get(room_id, []),
user_receipts.get(room_id, []),
)
)
)
)
fetched_receipts.append(
{
"room_id": room_id,
"type": EduTypes.RECEIPT,
"content": receipt_content,
}
)
fetched_receipts.append(
{
"room_id": room_id,
"type": EduTypes.RECEIPT,
"content": receipt_content,
}
)
fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()
+14 -24
View File
@@ -24,6 +24,7 @@ from typing import (
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
@@ -44,6 +45,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import start_active_span, trace
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
@@ -102,10 +104,6 @@ class SlidingSyncInterestedRooms:
lists: A mapping from list name to the list result for the response
relevant_room_map: A map from rooms that match the sync request to
their room sync config.
relevant_rooms_to_send_map: Subset of `relevant_room_map` that
includes the rooms that *may* have relevant updates. Rooms not
in this map will definitely not have room updates (though
extensions may have updates in these rooms).
newly_joined_rooms: The set of rooms that were joined in the token range
and the user is still joined to at the end of this range.
newly_left_rooms: The set of rooms that we left in the token range
@@ -115,7 +113,6 @@ class SlidingSyncInterestedRooms:
lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
relevant_room_map: Mapping[str, RoomSyncConfig]
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
all_rooms: Set[str]
room_membership_for_user_map: Mapping[str, RoomsForUserType]
@@ -128,7 +125,6 @@ class SlidingSyncInterestedRooms:
return SlidingSyncInterestedRooms(
lists={},
relevant_room_map={},
relevant_rooms_to_send_map={},
all_rooms=set(),
room_membership_for_user_map={},
newly_joined_rooms=set(),
@@ -547,16 +543,9 @@ class SlidingSyncRoomLists:
relevant_room_map[room_id] = room_sync_config
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)
return SlidingSyncInterestedRooms(
lists=lists,
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
@@ -735,16 +724,9 @@ class SlidingSyncRoomLists:
relevant_room_map[room_id] = room_sync_config
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)
return SlidingSyncInterestedRooms(
lists=lists,
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
@@ -752,18 +734,21 @@ class SlidingSyncRoomLists:
dm_room_ids=dm_room_ids,
)
async def _filter_relevant_rooms_to_send(
def filter_relevant_rooms_to_send(
self,
user_id: UserID,
previous_connection_state: PerConnectionState,
from_token: Optional[StreamToken],
relevant_room_map: Dict[str, RoomSyncConfig],
) -> Dict[str, RoomSyncConfig]:
to_token: StreamToken,
relevant_room_map: Mapping[str, RoomSyncConfig],
receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> Mapping[str, RoomSyncConfig]:
"""Filters the `relevant_room_map` down to those rooms that may have
updates we need to fetch and return."""
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] = relevant_room_map
if relevant_room_map:
with start_active_span("filter_relevant_rooms_to_send"):
if from_token:
@@ -814,6 +799,11 @@ class SlidingSyncRoomLists:
)
)
rooms_should_send.update(rooms_that_have_updates)
# Any rooms with receipts should be considered for sending as their
# notification counts may have changed.
rooms_should_send.update(receipts.keys())
relevant_rooms_to_send_map = {
room_id: room_sync_config
for room_id, room_sync_config in relevant_room_map.items()
+29 -30
View File
@@ -66,6 +66,7 @@ from synapse.types import (
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
@@ -519,22 +520,20 @@ class Notifier:
users = users or []
rooms = rooms or []
user_streams: Set[_NotifierUserStream] = set()
with Measure(self.clock, "on_new_event"):
user_streams: Set[_NotifierUserStream] = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
# Only calculate which user streams to wake up if there are, in fact,
# any user streams registered.
if self.user_to_user_stream or self.room_to_user_streams:
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
@@ -566,25 +565,25 @@ class Notifier:
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
if listeners:
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
if user_streams:
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
self.notify_replication()
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of ephemeral events")
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception(
"Error notifying application services of ephemeral events"
)
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
-6
View File
@@ -205,12 +205,6 @@ class HttpPusher(Pusher):
if self._is_processing:
return
# Check if we are trying, but failing, to contact the pusher. If so, we
# don't try and start processing immediately and instead wait for the
# retry loop to try again later (which is controlled by the timer).
if self.failing_since and self.timed_call and self.timed_call.active():
return
run_as_background_process("httppush.process", self._process)
async def _process(self) -> None:
+2 -3
View File
@@ -275,9 +275,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
"""
Register all the admin servlets.
"""
RoomRestServlet(hs).register(http_server)
# Admin servlets below may not work on workers.
# Admin servlets aren't registered on workers.
if hs.config.worker.worker_app is not None:
return
@@ -285,6 +283,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BlockRoomRestServlet(hs).register(http_server)
ListRoomRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
RoomRestServlet(hs).register(http_server)
RoomRestV2Servlet(hs).register(http_server)
RoomMembersRestServlet(hs).register(http_server)
DeleteRoomStatusByDeleteIdRestServlet(hs).register(http_server)
+2 -18
View File
@@ -143,11 +143,11 @@ class DeviceRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_handler = handler
self.auth_handler = hs.get_auth_handler()
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
self._msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
self._is_main_process = hs.config.worker.worker_app is None
async def on_GET(
self, request: SynapseRequest, device_id: str
@@ -179,14 +179,6 @@ class DeviceRestServlet(RestServlet):
async def on_DELETE(
self, request: SynapseRequest, device_id: str
) -> Tuple[int, JsonDict]:
# Can only be run on main process, as changes to device lists must
# happen on main.
if not self._is_main_process:
error_message = "DELETE on /devices/ must be routed to main process"
logger.error(error_message)
raise SynapseError(500, error_message)
assert isinstance(self.device_handler, DeviceHandler)
requester = await self.auth.get_user_by_req(request)
try:
@@ -231,14 +223,6 @@ class DeviceRestServlet(RestServlet):
async def on_PUT(
self, request: SynapseRequest, device_id: str
) -> Tuple[int, JsonDict]:
# Can only be run on main process, as changes to device lists must
# happen on main.
if not self._is_main_process:
error_message = "PUT on /devices/ must be routed to main process"
logger.error(error_message)
raise SynapseError(500, error_message)
assert isinstance(self.device_handler, DeviceHandler)
requester = await self.auth.get_user_by_req(request, allow_guest=True)
body = parse_and_validate_json_object_from_request(request, self.PutBody)
@@ -601,9 +585,9 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
):
DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
DeviceRestServlet(hs).register(http_server)
if hs.config.worker.worker_app is None:
DeviceRestServlet(hs).register(http_server)
if hs.config.experimental.msc2697_enabled:
DehydratedDeviceServlet(hs).register(http_server)
ClaimDehydratedDeviceServlet(hs).register(http_server)
+15 -5
View File
@@ -24,7 +24,7 @@ from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.errors import Codes, LimitExceededError, StoreError, SynapseError
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.ratelimiting import Ratelimiter
@@ -248,8 +248,9 @@ class SyncRestServlet(RestServlet):
await self._server_notices_sender.on_user_syncing(user.to_string())
# ignore the presence update if the ratelimit is exceeded but do not pause the request
allowed, _ = await self._presence_per_user_limiter.can_do_action(requester)
if not allowed:
try:
await self._presence_per_user_limiter.ratelimit(requester, pause=0.0)
except LimitExceededError:
affect_presence = False
logger.debug("User set_presence ratelimit exceeded; ignoring it.")
else:
@@ -1065,10 +1066,19 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
"notification_count": room_result.notif_counts.main_timeline.notify_count,
"highlight_count": room_result.notif_counts.main_timeline.highlight_count,
}
if len(room_result.notif_counts.threads) > 0:
serialized_rooms[room_id]["unread_thread_notifications"] = {
thread_id: {
"notification_count": counts.notify_count,
"highlight_count": counts.highlight_count,
}
for thread_id, counts in room_result.notif_counts.threads.items()
}
if room_result.bump_stamp is not None:
serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp
+1 -8
View File
@@ -282,10 +282,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
"count_devices_by_users", count_devices_by_users_txn, user_ids
)
@cached()
async def get_device(
self, user_id: str, device_id: str
) -> Optional[Mapping[str, Any]]:
) -> Optional[Dict[str, Any]]:
"""Retrieve a device. Only returns devices that are not marked as
hidden.
@@ -1818,8 +1817,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
},
desc="store_device",
)
await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
if not inserted:
# if the device already exists, check if it's a real device, or
# if the device ID is reserved by something else
@@ -1885,9 +1882,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
values=device_ids,
keyvalues={"user_id": user_id},
)
self._invalidate_cache_and_stream_bulk(
txn, self.get_device, [(user_id, device_id) for device_id in device_ids]
)
for batch in batch_iter(device_ids, 100):
await self.db_pool.runInteraction(
@@ -1921,7 +1915,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
updatevalues=updates,
desc="update_device",
)
await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
async def update_remote_device_list_cache_entry(
self, user_id: str, device_id: str, content: JsonDict, stream_id: str
@@ -547,13 +547,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# If the user has no receipts in the room, retrieve the stream ordering for
# the latest membership event from this user in this room (which we assume is
# a join).
# Sometimes (usually state resets) there can be no membership event either,
# so we allow None and return no notifications which is probably about
# the best we can do short of failing outright.
event_id = self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
retcol="event_id",
allow_none=True,
)
if event_id is None:
return _EMPTY_ROOM_NOTIF_COUNTS
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)
return self._get_unread_counts_by_pos_txn(
+14 -3
View File
@@ -666,7 +666,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
return results
async def get_linearized_receipts_for_user_in_rooms(
self, user_id: str, room_ids: StrCollection, to_key: MultiWriterStreamToken
self,
user_id: str,
room_ids: StrCollection,
from_key: Optional[MultiWriterStreamToken] = None,
to_key: Optional[MultiWriterStreamToken] = None,
) -> Mapping[str, Sequence[ReceiptInRoom]]:
"""Fetch all receipts for the user in the given room.
@@ -685,11 +689,18 @@ class ReceiptsWorkerStore(SQLBaseStore):
sql = f"""
SELECT instance_name, stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE {clause} AND user_id = ? AND stream_id <= ?
WHERE {clause} AND user_id = ?
"""
args.append(user_id)
args.append(to_key.get_max_stream_pos())
if from_key is not None:
sql += " AND stream_id >= ?"
args.append(from_key.get_max_stream_pos())
if to_key is not None:
sql += " AND stream_id <= ?"
args.append(to_key.get_max_stream_pos())
txn.execute(sql, args)
+20 -35
View File
@@ -759,37 +759,17 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
external_id: id on that system
user_id: complete mxid that it is mapped to
"""
self._invalidate_cache_and_stream(
txn, self.get_user_by_external_id, (auth_provider, external_id)
)
# This INSERT ... ON CONFLICT DO NOTHING statement will cause a
# 'could not serialize access due to concurrent update'
# if the row is added concurrently by another transaction.
# This is exactly what we want, as it makes the transaction get retried
# in a new snapshot where we can check for a genuine conflict.
was_inserted = self.db_pool.simple_upsert_txn(
self.db_pool.simple_insert_txn(
txn,
table="user_external_ids",
keyvalues={"auth_provider": auth_provider, "external_id": external_id},
values={},
insertion_values={"user_id": user_id},
values={
"auth_provider": auth_provider,
"external_id": external_id,
"user_id": user_id,
},
)
if not was_inserted:
existing_id = self.db_pool.simple_select_one_onecol_txn(
txn,
table="user_external_ids",
keyvalues={"auth_provider": auth_provider, "user_id": user_id},
retcol="external_id",
allow_none=True,
)
if existing_id != external_id:
raise ExternalIDReuseException(
f"{user_id!r} has external id {existing_id!r} for {auth_provider} but trying to add {external_id!r}"
)
async def remove_user_external_id(
self, auth_provider: str, external_id: str, user_id: str
) -> None:
@@ -809,9 +789,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
},
desc="remove_user_external_id",
)
await self.invalidate_cache_and_stream(
"get_user_by_external_id", (auth_provider, external_id)
)
async def replace_user_external_id(
self,
@@ -832,20 +809,29 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
ExternalIDReuseException if the new external_id could not be mapped.
"""
def _replace_user_external_id_txn(
def _remove_user_external_ids_txn(
txn: LoggingTransaction,
user_id: str,
) -> None:
"""Remove all mappings from external user ids to a mxid
If these mappings are not found, this method does nothing.
Args:
user_id: complete mxid that it is mapped to
"""
self.db_pool.simple_delete_txn(
txn,
table="user_external_ids",
keyvalues={"user_id": user_id},
)
for auth_provider, external_id in record_external_ids:
self._invalidate_cache_and_stream(
txn, self.get_user_by_external_id, (auth_provider, external_id)
)
def _replace_user_external_id_txn(
txn: LoggingTransaction,
) -> None:
_remove_user_external_ids_txn(txn, user_id)
for auth_provider, external_id in record_external_ids:
self._record_user_external_id_txn(
txn,
auth_provider,
@@ -861,7 +847,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
except self.database_engine.module.IntegrityError:
raise ExternalIDReuseException()
@cached()
async def get_user_by_external_id(
self, auth_provider: str, external_id: str
) -> Optional[str]:
+12 -8
View File
@@ -1622,11 +1622,14 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
sql = """
UPDATE room_memberships
SET participant = true
WHERE event_id IN (
SELECT event_id FROM local_current_membership
WHERE user_id = ? AND room_id = ?
WHERE (user_id, room_id) IN (
SELECT user_id, room_id
FROM room_memberships
WHERE user_id = ?
AND room_id = ?
ORDER BY event_stream_ordering DESC
LIMIT 1
)
AND NOT participant
"""
txn.execute(sql, (user_id, room_id))
@@ -1648,10 +1651,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
) -> bool:
sql = """
SELECT participant
FROM local_current_membership AS l
INNER JOIN room_memberships AS r USING (event_id)
WHERE l.user_id = ?
AND l.room_id = ?
FROM room_memberships
WHERE user_id = ?
AND room_id = ?
ORDER BY event_stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (user_id, room_id))
res = txn.fetchone()
+9 -6
View File
@@ -40,6 +40,8 @@ import attr
from synapse._pydantic_compat import Extra
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.types import (
DeviceListUpdates,
JsonDict,
@@ -163,10 +165,10 @@ class SlidingSyncResult:
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
notif_counts: An object containing the number of unread notifications for both
the main thread and any other threads.
room_receipts: A sequence of any read receipts from the user in question in
the room, used to calculate whether the notif_counts could have changed
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -197,8 +199,8 @@ class SlidingSyncResult:
bump_stamp: Optional[int]
joined_count: Optional[int]
invited_count: Optional[int]
notification_count: int
highlight_count: int
notif_counts: RoomNotifCounts
room_receipts: Optional[Sequence[ReceiptInRoom]]
def __bool__(self) -> bool:
return (
@@ -215,6 +217,7 @@ class SlidingSyncResult:
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
or bool(self.room_receipts)
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
+10 -23
View File
@@ -101,13 +101,7 @@ class ResponseCache(Generic[KV]):
used rather than trying to compute a new response.
"""
def __init__(
self,
clock: Clock,
name: str,
timeout_ms: float = 0,
enable_logging: bool = True,
):
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
self._result_cache: Dict[KV, ResponseCacheEntry] = {}
self.clock = clock
@@ -115,7 +109,6 @@ class ResponseCache(Generic[KV]):
self._name = name
self._metrics = register_cache("response_cache", name, self, resizable=False)
self._enable_logging = enable_logging
def size(self) -> int:
return len(self._result_cache)
@@ -253,12 +246,9 @@ class ResponseCache(Generic[KV]):
"""
entry = self._get(key)
if not entry:
if self._enable_logging:
logger.debug(
"[%s]: no cached result for [%s], calculating new one",
self._name,
key,
)
logger.debug(
"[%s]: no cached result for [%s], calculating new one", self._name, key
)
context = ResponseCacheContext(cache_key=key)
if cache_context:
kwargs["cache_context"] = context
@@ -279,15 +269,12 @@ class ResponseCache(Generic[KV]):
return await make_deferred_yieldable(entry.result.observe())
result = entry.result.observe()
if self._enable_logging:
if result.called:
logger.info(
"[%s]: using completed cached result for [%s]", self._name, key
)
else:
logger.info(
"[%s]: using incomplete cached result for [%s]", self._name, key
)
if result.called:
logger.info("[%s]: using completed cached result for [%s]", self._name, key)
else:
logger.info(
"[%s]: using incomplete cached result for [%s]", self._name, key
)
span_context = entry.opentracing_span_context
with start_active_span_follows_from(
+3 -1
View File
@@ -220,7 +220,9 @@ class TestRatelimiter(unittest.HomeserverTestCase):
self.assertIn("test_id_1", limiter.actions)
self.reactor.advance(60)
self.get_success_or_raise(
limiter.can_do_action(None, key="test_id_2", _time_now_s=10)
)
self.assertNotIn("test_id_1", limiter.actions)
-26
View File
@@ -484,32 +484,6 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.assertEqual(code_verifier, "")
self.assertEqual(redirect, "http://client/redirect")
@override_config(
{
"oidc_config": {
**DEFAULT_CONFIG,
"passthrough_authorization_parameters": ["additional_parameter"],
}
}
)
def test_passthrough_parameters(self) -> None:
"""The redirect request has additional parameters, one is authorized, one is not"""
req = Mock(spec=["cookies", "args"])
req.cookies = []
req.args = {}
req.args[b"additional_parameter"] = ["a_value".encode("utf-8")]
req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")]
url = urlparse(
self.get_success(
self.provider.handle_redirect_request(req, b"http://client/redirect")
)
)
params = parse_qs(url.query)
self.assertEqual(params["additional_parameter"], ["a_value"])
self.assertNotIn("not_authorized_parameters", params)
@override_config({"oidc_config": DEFAULT_CONFIG})
def test_redirect_request_with_code_challenge(self) -> None:
"""The redirect request has the right arguments & generates a valid session cookie."""
-78
View File
@@ -1167,81 +1167,3 @@ class HTTPPusherTests(HomeserverTestCase):
self.assertEqual(
self.push_attempts[0][2]["notification"]["counts"]["unread"], 1
)
def test_push_backoff(self) -> None:
"""
The HTTP pusher will backoff correctly if it fails to contact the pusher.
"""
# Register the user who gets notified
user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")
# Register the user who sends the message
other_user_id = self.register_user("otheruser", "pass")
other_access_token = self.login("otheruser", "pass")
# Register the pusher
user_tuple = self.get_success(
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
device_display_name="pushy push",
pushkey="a@example.com",
lang=None,
data={"url": "http://example.com/_matrix/push/v1/notify"},
)
)
# Create a room with the other user
room = self.helper.create_room_as(user_id, tok=access_token)
self.helper.join(room=room, user=other_user_id, tok=other_access_token)
# The other user sends some messages
self.helper.send(room, body="Message 1", tok=other_access_token)
# One push was attempted to be sent
self.assertEqual(len(self.push_attempts), 1)
self.assertEqual(
self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify"
)
self.assertEqual(
self.push_attempts[0][2]["notification"]["content"]["body"], "Message 1"
)
self.push_attempts[0][0].callback({})
self.pump()
# Send another message, this time it fails
self.helper.send(room, body="Message 2", tok=other_access_token)
self.assertEqual(len(self.push_attempts), 2)
self.push_attempts[1][0].errback(Exception("couldn't connect"))
self.pump()
# Sending yet another message doesn't trigger a push immediately
self.helper.send(room, body="Message 3", tok=other_access_token)
self.pump()
self.assertEqual(len(self.push_attempts), 2)
# .. but waiting for a bit will cause more pushes
self.reactor.advance(10)
self.assertEqual(len(self.push_attempts), 3)
self.assertEqual(
self.push_attempts[2][2]["notification"]["content"]["body"], "Message 2"
)
self.push_attempts[2][0].callback({})
self.pump()
self.assertEqual(len(self.push_attempts), 4)
self.assertEqual(
self.push_attempts[3][2]["notification"]["content"]["body"], "Message 3"
)
self.push_attempts[3][0].callback({})
@@ -433,8 +433,11 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
set(),
exact=True,
)
# The room should be be in user1's sync because they sent a read receipt...
self.assertIn(room_id1, response_body["rooms"])
# but there should be no timeline events
# No events in the timeline since they were sent before the `from_token`
self.assertNotIn(room_id1, response_body["rooms"])
self.assertNotIn("timeline", response_body["rooms"][room_id1])
# Check room3:
#
@@ -0,0 +1,143 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from http import HTTPStatus
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, ReceiptTypes, RelationTypes
from synapse.rest.client import login, receipts, room, sync
from synapse.server import HomeServer
from synapse.util import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
class SlidingSyncNotificationCountsTestCase(SlidingSyncBase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
sync.register_servlets,
receipts.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
super().prepare(reactor, clock, hs)
def setUp(self) -> None:
super().setUp()
self.user1_id = self.register_user("user1", "pass")
self.user1_tok = self.login(self.user1_id, "pass")
self.user2_id = self.register_user("user2", "pass")
self.user2_tok = self.login(self.user2_id, "pass")
# Create room1
self.room_id1 = self.helper.create_room_as(self.user2_id, tok=self.user2_tok)
self.helper.join(self.room_id1, self.user1_id, tok=self.user1_tok)
self.helper.join(self.room_id1, self.user2_id, tok=self.user2_tok)
self.sync_req = {
"lists": {},
"room_subscriptions": {
self.room_id1: {
"required_state": [],
"timeline_limit": 1,
},
},
}
sync_resp, self.user1_start_token = self.do_sync(
self.sync_req, tok=self.user1_tok
)
# send a read receipt to make sure the counts are 0
channel = self.make_request(
"POST",
f"/rooms/{self.room_id1}/receipt/{ReceiptTypes.READ}/{sync_resp['rooms'][self.room_id1]['timeline'][0]['event_id']}",
{},
access_token=self.user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
def test_main_thread_notification_count(self) -> None:
# send an event from user 2
self.helper.send(self.room_id1, body="new event", tok=self.user2_tok)
# user 1 syncs
sync_resp, from_token = self.do_sync(
self.sync_req, tok=self.user1_tok, since=self.user1_start_token
)
# notification count should now be 1
self.assertEqual(sync_resp["rooms"][self.room_id1]["notification_count"], 1)
def test_main_thread_highlight_count(self) -> None:
# send an event that mentions user1
self.helper.send(self.room_id1, body="Hello user1", tok=self.user2_tok)
# user 1 syncs
sync_resp, from_token = self.do_sync(
self.sync_req, tok=self.user1_tok, since=self.user1_start_token
)
# notification and highlight count should be 1
self.assertEqual(sync_resp["rooms"][self.room_id1]["notification_count"], 1)
self.assertEqual(sync_resp["rooms"][self.room_id1]["highlight_count"], 1)
def test_thread_notification_count(self) -> None:
room1_event_response1 = self.helper.send(
self.room_id1, body="Thread root", tok=self.user2_tok
)
thread_id = room1_event_response1["event_id"]
_, from_token = self.do_sync(
self.sync_req, tok=self.user1_tok, since=self.user1_start_token
)
threaded_event_content = {
"msgtype": "m.text",
"body": "threaded response",
"m.relates_to": {
"event_id": thread_id,
"rel_type": RelationTypes.THREAD,
},
}
self.helper.send_event(
self.room_id1,
EventTypes.Message,
threaded_event_content,
None,
self.user2_tok,
HTTPStatus.OK,
custom_headers=None,
)
sync_resp, _ = self.do_sync(self.sync_req, tok=self.user1_tok, since=from_token)
self.assertEqual(
sync_resp["rooms"][self.room_id1]["unread_thread_notifications"][thread_id][
"notification_count"
],
1,
)
self.assertEqual(
sync_resp["rooms"][self.room_id1]["unread_thread_notifications"][thread_id][
"highlight_count"
],
0,
)