Compare commits
3 Commits
erikj/tree
...
v1.63.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fccd72f42 | ||
|
|
097afd0e0b | ||
|
|
6faaf76a32 |
27
.github/workflows/tests.yml
vendored
27
.github/workflows/tests.yml
vendored
@@ -328,6 +328,9 @@ jobs:
|
||||
- arrangement: monolith
|
||||
database: Postgres
|
||||
|
||||
- arrangement: workers
|
||||
database: Postgres
|
||||
|
||||
steps:
|
||||
- name: Run actions/checkout@v2 for synapse
|
||||
uses: actions/checkout@v2
|
||||
@@ -343,30 +346,6 @@ jobs:
|
||||
shell: bash
|
||||
name: Run Complement Tests
|
||||
|
||||
# XXX When complement with workers is stable, move this back into the standard
|
||||
# "complement" matrix above.
|
||||
#
|
||||
# See https://github.com/matrix-org/synapse/issues/13161
|
||||
complement-workers:
|
||||
if: "${{ !failure() && !cancelled() }}"
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Run actions/checkout@v2 for synapse
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
path: synapse
|
||||
|
||||
- name: Prepare Complement's Prerequisites
|
||||
run: synapse/.ci/scripts/setup_complement_prerequisites.sh
|
||||
|
||||
- run: |
|
||||
set -o pipefail
|
||||
POSTGRES=1 WORKERS=1 COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
|
||||
shell: bash
|
||||
name: Run Complement Tests
|
||||
|
||||
# a job which marks all the other jobs as complete, thus allowing PRs to be merged.
|
||||
tests-done:
|
||||
if: ${{ always() }}
|
||||
|
||||
16
CHANGES.md
16
CHANGES.md
@@ -1,7 +1,11 @@
|
||||
Synapse vNext
|
||||
=============
|
||||
Synapse 1.63.0 (2022-07-19)
|
||||
===========================
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Clarify that homeserver server names are included in the reported data when the `report_stats` config option is enabled. ([\#13321](https://github.com/matrix-org/synapse/issues/13321))
|
||||
|
||||
As of this release, Synapse no longer allows the tasks of verifying email address ownership, and password reset confirmation, to be delegated to an identity server. For more information, see the [upgrade notes](https://matrix-org.github.io/synapse/v1.64/upgrade.html#upgrading-to-v1640).
|
||||
|
||||
Synapse 1.63.0rc1 (2022-07-12)
|
||||
==============================
|
||||
@@ -11,7 +15,7 @@ Features
|
||||
|
||||
- Add a rate limit for local users sending invites. ([\#13125](https://github.com/matrix-org/synapse/issues/13125))
|
||||
- Implement [MSC3827](https://github.com/matrix-org/matrix-spec-proposals/pull/3827): Filtering of `/publicRooms` by room type. ([\#13031](https://github.com/matrix-org/synapse/issues/13031))
|
||||
- Improve validation logic in Synapse's REST endpoints. ([\#13148](https://github.com/matrix-org/synapse/issues/13148))
|
||||
- Improve validation logic in the account data REST endpoints. ([\#13148](https://github.com/matrix-org/synapse/issues/13148))
|
||||
|
||||
|
||||
Bugfixes
|
||||
@@ -39,7 +43,7 @@ Improved Documentation
|
||||
- Add an explanation of the `--report-stats` argument to the docs. ([\#13029](https://github.com/matrix-org/synapse/issues/13029))
|
||||
- Add a helpful example bash script to the contrib directory for creating multiple worker configuration files of the same type. Contributed by @villepeh. ([\#13032](https://github.com/matrix-org/synapse/issues/13032))
|
||||
- Add missing links to config options. ([\#13166](https://github.com/matrix-org/synapse/issues/13166))
|
||||
- Add documentation for anonymised homeserver statistics collection. ([\#13086](https://github.com/matrix-org/synapse/issues/13086))
|
||||
- Add documentation for homeserver usage statistics collection. ([\#13086](https://github.com/matrix-org/synapse/issues/13086))
|
||||
- Add documentation for the existing `databases` option in the homeserver configuration manual. ([\#13212](https://github.com/matrix-org/synapse/issues/13212))
|
||||
- Clean up references to sample configuration and redirect users to the configuration manual instead. ([\#13077](https://github.com/matrix-org/synapse/issues/13077), [\#13139](https://github.com/matrix-org/synapse/issues/13139))
|
||||
- Document how the Synapse team does reviews. ([\#13132](https://github.com/matrix-org/synapse/issues/13132))
|
||||
@@ -78,6 +82,7 @@ Internal Changes
|
||||
- More aggressively rotate push actions. ([\#13211](https://github.com/matrix-org/synapse/issues/13211))
|
||||
- Add `max_line_length` setting for Python files to the `.editorconfig`. Contributed by @sumnerevans @ Beeper. ([\#13228](https://github.com/matrix-org/synapse/issues/13228))
|
||||
|
||||
|
||||
Synapse 1.62.0 (2022-07-05)
|
||||
===========================
|
||||
|
||||
@@ -85,6 +90,7 @@ No significant changes since 1.62.0rc3.
|
||||
|
||||
Authors of spam-checker plugins should consult the [upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.62/docs/upgrade.md#upgrading-to-v1620) to learn about the enriched signatures for spam checker callbacks, which are supported with this release of Synapse.
|
||||
|
||||
|
||||
Synapse 1.62.0rc3 (2022-07-04)
|
||||
==============================
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Remove code which incorrectly attempted to reconcile state with remote servers when processing incoming events.
|
||||
@@ -1 +0,0 @@
|
||||
Drop tables used for groups/communities.
|
||||
@@ -1 +0,0 @@
|
||||
Make the AS login method call `Auth.get_user_by_req` for checking the AS token.
|
||||
@@ -1 +0,0 @@
|
||||
Add prometheus counters for ephemeral events and to device messages pushed to app services. Contributed by Brad @ Beeper.
|
||||
@@ -1 +0,0 @@
|
||||
Drop support for delegating email verification to an external server.
|
||||
@@ -1 +0,0 @@
|
||||
Refactor receipts servlet logic to avoid duplicated code.
|
||||
@@ -1 +0,0 @@
|
||||
Add a `room_type` field in the responses for the list room and room details admin API. Contributed by @andrewdoh.
|
||||
@@ -1 +0,0 @@
|
||||
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
|
||||
@@ -1 +0,0 @@
|
||||
Remove unused database table `event_reference_hashes`.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for room version 10.
|
||||
@@ -1 +0,0 @@
|
||||
Further reduce queries used sending events when creating new rooms. Contributed by Nick @ Beeper (@fizzadar).
|
||||
@@ -1 +0,0 @@
|
||||
Provide an example of using the Admin API. Contributed by @jejo86.
|
||||
@@ -1 +0,0 @@
|
||||
Move the documentation for how URL previews work to the URL preview module.
|
||||
@@ -1 +0,0 @@
|
||||
Drop support for calling `/_matrix/client/v3/account/3pid/bind` without an `id_access_token`, which was not permitted by the spec. Contributed by @Vetchu.
|
||||
@@ -1 +0,0 @@
|
||||
Call the v2 identity service `/3pid/unbind` endpoint, rather than v1.
|
||||
@@ -1 +0,0 @@
|
||||
Use an asynchronous cache wrapper for the get event cache. Contributed by Nick @ Beeper (@fizzadar).
|
||||
@@ -1 +0,0 @@
|
||||
Optimise federation sender and appservice pusher event stream processing queries. Contributed by Nick @ Beeper (@fizzadar).
|
||||
@@ -1 +0,0 @@
|
||||
Preparatory work for a per-room rate limiter on joins.
|
||||
@@ -1 +0,0 @@
|
||||
Preparatory work for a per-room rate limiter on joins.
|
||||
@@ -1 +0,0 @@
|
||||
Preparatory work for a per-room rate limiter on joins.
|
||||
@@ -1 +0,0 @@
|
||||
Log the stack when waiting for an entire room to be un-partial stated.
|
||||
@@ -1 +0,0 @@
|
||||
Clean-up tests for notifications.
|
||||
@@ -1 +0,0 @@
|
||||
Move the documentation for how URL previews work to the URL preview module.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug introduced in Synapse 1.15.0 where adding a user through the Synapse Admin API with a phone number would fail if the "enable_email_notifs" and "email_notifs_for_new_users" options were enabled. Contributed by @thomasweston12.
|
||||
@@ -1 +0,0 @@
|
||||
Do not fail build if complement with workers fails.
|
||||
@@ -1 +0,0 @@
|
||||
Don't pull out state in `compute_event_context` for unconflicted state.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug introduced in Synapse 1.40 where a user invited to a restricted room would be briefly unable to join.
|
||||
@@ -1 +0,0 @@
|
||||
Don't pull out state in `compute_event_context` for unconflicted state.
|
||||
@@ -1 +0,0 @@
|
||||
Fix long-standing bug where in rare instances Synapse could store the incorrect state for a room after a state resolution.
|
||||
@@ -1 +0,0 @@
|
||||
Reduce the rebuild time for the complement-synapse docker image.
|
||||
@@ -1 +0,0 @@
|
||||
Update locked version of `frozendict` to 2.3.2, which has a fix for a memory leak.
|
||||
@@ -1 +0,0 @@
|
||||
Make `DictionaryCache` expire full entries if they haven't been queried in a while, even if specific keys have been queried recently.
|
||||
8
debian/changelog
vendored
8
debian/changelog
vendored
@@ -1,3 +1,11 @@
|
||||
matrix-synapse-py3 (1.63.0) stable; urgency=medium
|
||||
|
||||
* Clarify that homeserver server names are included in the data reported
|
||||
by opt-in server stats reporting (`report_stats` homeserver config option).
|
||||
* New Synapse release 1.63.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 19 Jul 2022 14:42:24 +0200
|
||||
|
||||
matrix-synapse-py3 (1.63.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.63.0rc1.
|
||||
|
||||
2
debian/matrix-synapse-py3.postinst
vendored
2
debian/matrix-synapse-py3.postinst
vendored
@@ -31,7 +31,7 @@ EOF
|
||||
# This file is autogenerated, and will be recreated on upgrade if it is deleted.
|
||||
# Any changes you make will be preserved.
|
||||
|
||||
# Whether to report anonymized homeserver usage statistics.
|
||||
# Whether to report homeserver usage statistics.
|
||||
report_stats: false
|
||||
EOF
|
||||
fi
|
||||
|
||||
12
debian/po/templates.pot
vendored
12
debian/po/templates.pot
vendored
@@ -37,7 +37,7 @@ msgstr ""
|
||||
#. Type: boolean
|
||||
#. Description
|
||||
#: ../templates:2001
|
||||
msgid "Report anonymous statistics?"
|
||||
msgid "Report homeserver usage statistics?"
|
||||
msgstr ""
|
||||
|
||||
#. Type: boolean
|
||||
@@ -45,11 +45,11 @@ msgstr ""
|
||||
#: ../templates:2001
|
||||
msgid ""
|
||||
"Developers of Matrix and Synapse really appreciate helping the project out "
|
||||
"by reporting anonymized usage statistics from this homeserver. Only very "
|
||||
"basic aggregate data (e.g. number of users) will be reported, but it helps "
|
||||
"track the growth of the Matrix community, and helps in making Matrix a "
|
||||
"success, as well as to convince other networks that they should peer with "
|
||||
"Matrix."
|
||||
"by reporting homeserver usage statistics from this homeserver. Your "
|
||||
"homeserver's server name, along with very basic aggregate data (e.g. "
|
||||
"number of users) will be reported. But it helps track the growth of the "
|
||||
"Matrix community, and helps in making Matrix a success, as well as to "
|
||||
"convince other networks that they should peer with Matrix."
|
||||
msgstr ""
|
||||
|
||||
#. Type: boolean
|
||||
|
||||
13
debian/templates
vendored
13
debian/templates
vendored
@@ -10,12 +10,13 @@ _Description: Name of the server:
|
||||
Template: matrix-synapse/report-stats
|
||||
Type: boolean
|
||||
Default: false
|
||||
_Description: Report anonymous statistics?
|
||||
_Description: Report homeserver usage statistics?
|
||||
Developers of Matrix and Synapse really appreciate helping the
|
||||
project out by reporting anonymized usage statistics from this
|
||||
homeserver. Only very basic aggregate data (e.g. number of users)
|
||||
will be reported, but it helps track the growth of the Matrix
|
||||
community, and helps in making Matrix a success, as well as to
|
||||
convince other networks that they should peer with Matrix.
|
||||
project out by reporting homeserver usage statistics from this
|
||||
homeserver. Your homeserver's server name, along with very basic
|
||||
aggregate data (e.g. number of users) will be reported. But it
|
||||
helps track the growth of the Matrix community, and helps in
|
||||
making Matrix a success, as well as to convince other networks
|
||||
that they should peer with Matrix.
|
||||
.
|
||||
Thank you.
|
||||
|
||||
@@ -4,58 +4,42 @@
|
||||
#
|
||||
# Instructions for building this image from those it depends on is detailed in this guide:
|
||||
# https://github.com/matrix-org/synapse/blob/develop/docker/README-testing.md#testing-with-postgresql-and-single-or-multi-process-synapse
|
||||
|
||||
ARG SYNAPSE_VERSION=latest
|
||||
|
||||
# first of all, we create a base image with a postgres server and database,
|
||||
# which we can copy into the target image. For repeated rebuilds, this is
|
||||
# much faster than apt installing postgres each time.
|
||||
#
|
||||
# This trick only works because (a) the Synapse image happens to have all the
|
||||
# shared libraries that postgres wants, (b) we use a postgres image based on
|
||||
# the same debian version as Synapse's docker image (so the versions of the
|
||||
# shared libraries match).
|
||||
|
||||
FROM postgres:13-bullseye AS postgres_base
|
||||
# initialise the database cluster in /var/lib/postgresql
|
||||
RUN gosu postgres initdb --locale=C --encoding=UTF-8 --auth-host password
|
||||
|
||||
# Configure a password and create a database for Synapse
|
||||
RUN echo "ALTER USER postgres PASSWORD 'somesecret'" | gosu postgres postgres --single
|
||||
RUN echo "CREATE DATABASE synapse" | gosu postgres postgres --single
|
||||
|
||||
# now build the final image, based on the Synapse image.
|
||||
|
||||
FROM matrixdotorg/synapse-workers:$SYNAPSE_VERSION
|
||||
# copy the postgres installation over from the image we built above
|
||||
RUN adduser --system --uid 999 postgres --home /var/lib/postgresql
|
||||
COPY --from=postgres_base /var/lib/postgresql /var/lib/postgresql
|
||||
COPY --from=postgres_base /usr/lib/postgresql /usr/lib/postgresql
|
||||
COPY --from=postgres_base /usr/share/postgresql /usr/share/postgresql
|
||||
RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql
|
||||
ENV PATH="${PATH}:/usr/lib/postgresql/13/bin"
|
||||
ENV PGDATA=/var/lib/postgresql/data
|
||||
|
||||
# Extend the shared homeserver config to disable rate-limiting,
|
||||
# set Complement's static shared secret, enable registration, amongst other
|
||||
# tweaks to get Synapse ready for testing.
|
||||
# To do this, we copy the old template out of the way and then include it
|
||||
# with Jinja2.
|
||||
RUN mv /conf/shared.yaml.j2 /conf/shared-orig.yaml.j2
|
||||
COPY conf/workers-shared-extra.yaml.j2 /conf/shared.yaml.j2
|
||||
# Install postgresql
|
||||
RUN apt-get update && \
|
||||
DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -yqq postgresql-13
|
||||
|
||||
WORKDIR /data
|
||||
# Configure a user and create a database for Synapse
|
||||
RUN pg_ctlcluster 13 main start && su postgres -c "echo \
|
||||
\"ALTER USER postgres PASSWORD 'somesecret'; \
|
||||
CREATE DATABASE synapse \
|
||||
ENCODING 'UTF8' \
|
||||
LC_COLLATE='C' \
|
||||
LC_CTYPE='C' \
|
||||
template=template0;\" | psql" && pg_ctlcluster 13 main stop
|
||||
|
||||
COPY conf/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
|
||||
# Extend the shared homeserver config to disable rate-limiting,
|
||||
# set Complement's static shared secret, enable registration, amongst other
|
||||
# tweaks to get Synapse ready for testing.
|
||||
# To do this, we copy the old template out of the way and then include it
|
||||
# with Jinja2.
|
||||
RUN mv /conf/shared.yaml.j2 /conf/shared-orig.yaml.j2
|
||||
COPY conf/workers-shared-extra.yaml.j2 /conf/shared.yaml.j2
|
||||
|
||||
# Copy the entrypoint
|
||||
COPY conf/start_for_complement.sh /
|
||||
WORKDIR /data
|
||||
|
||||
# Expose nginx's listener ports
|
||||
EXPOSE 8008 8448
|
||||
COPY conf/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
|
||||
|
||||
ENTRYPOINT ["/start_for_complement.sh"]
|
||||
# Copy the entrypoint
|
||||
COPY conf/start_for_complement.sh /
|
||||
|
||||
# Update the healthcheck to have a shorter check interval
|
||||
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
|
||||
CMD /bin/sh /healthcheck.sh
|
||||
# Expose nginx's listener ports
|
||||
EXPOSE 8008 8448
|
||||
|
||||
ENTRYPOINT ["/start_for_complement.sh"]
|
||||
|
||||
# Update the healthcheck to have a shorter check interval
|
||||
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
|
||||
CMD /bin/sh /healthcheck.sh
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[program:postgres]
|
||||
command=/usr/local/bin/prefix-log gosu postgres postgres
|
||||
command=/usr/local/bin/prefix-log /usr/bin/pg_ctlcluster 13 main start --foreground
|
||||
|
||||
# Only start if START_POSTGRES=1
|
||||
autostart=%(ENV_START_POSTGRES)s
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
- [Application Services](application_services.md)
|
||||
- [Server Notices](server_notices.md)
|
||||
- [Consent Tracking](consent_tracking.md)
|
||||
- [URL Previews](development/url_previews.md)
|
||||
- [User Directory](user_directory.md)
|
||||
- [Message Retention Policies](message_retention_policies.md)
|
||||
- [Pluggable Modules](modules/index.md)
|
||||
@@ -68,7 +69,7 @@
|
||||
- [Federation](usage/administration/admin_api/federation.md)
|
||||
- [Manhole](manhole.md)
|
||||
- [Monitoring](metrics-howto.md)
|
||||
- [Reporting Anonymised Statistics](usage/administration/monitoring/reporting_anonymised_statistics.md)
|
||||
- [Reporting Homeserver Usage Statistics](usage/administration/monitoring/reporting_homeserver_usage_statistics.md)
|
||||
- [Understanding Synapse Through Grafana Graphs](usage/administration/understanding_synapse_through_grafana_graphs.md)
|
||||
- [Useful SQL for Admins](usage/administration/useful_sql_for_admins.md)
|
||||
- [Database Maintenance Tools](usage/administration/database_maintenance_tools.md)
|
||||
|
||||
@@ -59,7 +59,6 @@ The following fields are possible in the JSON response body:
|
||||
- `guest_access` - Whether guests can join the room. One of: ["can_join", "forbidden"].
|
||||
- `history_visibility` - Who can see the room history. One of: ["invited", "joined", "shared", "world_readable"].
|
||||
- `state_events` - Total number of state_events of a room. Complexity of the room.
|
||||
- `room_type` - The type of the room taken from the room's creation event; for example "m.space" if the room is a space. If the room does not define a type, the value will be `null`.
|
||||
* `offset` - The current pagination offset in rooms. This parameter should be
|
||||
used instead of `next_token` for room offset as `next_token` is
|
||||
not intended to be parsed.
|
||||
@@ -102,8 +101,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 93534,
|
||||
"room_type": "m.space"
|
||||
"state_events": 93534
|
||||
},
|
||||
... (8 hidden items) ...
|
||||
{
|
||||
@@ -120,8 +118,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 8345,
|
||||
"room_type": null
|
||||
"state_events": 8345
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
@@ -154,8 +151,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 8,
|
||||
"room_type": null
|
||||
"state_events": 8
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
@@ -188,8 +184,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 93534,
|
||||
"room_type": null
|
||||
"state_events": 93534
|
||||
},
|
||||
... (98 hidden items) ...
|
||||
{
|
||||
@@ -206,8 +201,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 8345,
|
||||
"room_type": "m.space"
|
||||
"state_events": 8345
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
@@ -244,9 +238,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 93534,
|
||||
"room_type": "m.space"
|
||||
|
||||
"state_events": 93534
|
||||
},
|
||||
... (48 hidden items) ...
|
||||
{
|
||||
@@ -263,9 +255,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 8345,
|
||||
"room_type": null
|
||||
|
||||
"state_events": 8345
|
||||
}
|
||||
],
|
||||
"offset": 100,
|
||||
@@ -300,8 +290,6 @@ The following fields are possible in the JSON response body:
|
||||
* `guest_access` - Whether guests can join the room. One of: ["can_join", "forbidden"].
|
||||
* `history_visibility` - Who can see the room history. One of: ["invited", "joined", "shared", "world_readable"].
|
||||
* `state_events` - Total number of state_events of a room. Complexity of the room.
|
||||
* `room_type` - The type of the room taken from the room's creation event; for example "m.space" if the room is a space.
|
||||
If the room does not define a type, the value will be `null`.
|
||||
|
||||
The API is:
|
||||
|
||||
@@ -329,8 +317,7 @@ A response body like the following is returned:
|
||||
"join_rules": "invite",
|
||||
"guest_access": null,
|
||||
"history_visibility": "shared",
|
||||
"state_events": 93534,
|
||||
"room_type": "m.space"
|
||||
"state_events": 93534
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -544,7 +544,7 @@ Gets a list of all local media that a specific `user_id` has created.
|
||||
These are media that the user has uploaded themselves
|
||||
([local media](../media_repository.md#local-media)), as well as
|
||||
[URL preview images](../media_repository.md#url-previews) requested by the user if the
|
||||
[feature is enabled](../usage/configuration/config_documentation.md#url_preview_enabled).
|
||||
[feature is enabled](../development/url_previews.md).
|
||||
|
||||
By default, the response is ordered by descending creation date and ascending media ID.
|
||||
The newest media is on top. You can change the order with parameters
|
||||
|
||||
61
docs/development/url_previews.md
Normal file
61
docs/development/url_previews.md
Normal file
@@ -0,0 +1,61 @@
|
||||
URL Previews
|
||||
============
|
||||
|
||||
The `GET /_matrix/media/r0/preview_url` endpoint provides a generic preview API
|
||||
for URLs which outputs [Open Graph](https://ogp.me/) responses (with some Matrix
|
||||
specific additions).
|
||||
|
||||
This does have trade-offs compared to other designs:
|
||||
|
||||
* Pros:
|
||||
* Simple and flexible; can be used by any clients at any point
|
||||
* Cons:
|
||||
* If each homeserver provides one of these independently, all the HSes in a
|
||||
room may needlessly DoS the target URI
|
||||
* The URL metadata must be stored somewhere, rather than just using Matrix
|
||||
itself to store the media.
|
||||
* Matrix cannot be used to distribute the metadata between homeservers.
|
||||
|
||||
When Synapse is asked to preview a URL it does the following:
|
||||
|
||||
1. Checks against a URL blacklist (defined as `url_preview_url_blacklist` in the
|
||||
config).
|
||||
2. Checks the in-memory cache by URLs and returns the result if it exists. (This
|
||||
is also used to de-duplicate processing of multiple in-flight requests at once.)
|
||||
3. Kicks off a background process to generate a preview:
|
||||
1. Checks the database cache by URL and timestamp and returns the result if it
|
||||
has not expired and was successful (a 2xx return code).
|
||||
2. Checks if the URL matches an [oEmbed](https://oembed.com/) pattern. If it
|
||||
does, update the URL to download.
|
||||
3. Downloads the URL and stores it into a file via the media storage provider
|
||||
and saves the local media metadata.
|
||||
4. If the media is an image:
|
||||
1. Generates thumbnails.
|
||||
2. Generates an Open Graph response based on image properties.
|
||||
5. If the media is HTML:
|
||||
1. Decodes the HTML via the stored file.
|
||||
2. Generates an Open Graph response from the HTML.
|
||||
3. If a JSON oEmbed URL was found in the HTML via autodiscovery:
|
||||
1. Downloads the URL and stores it into a file via the media storage provider
|
||||
and saves the local media metadata.
|
||||
2. Convert the oEmbed response to an Open Graph response.
|
||||
3. Override any Open Graph data from the HTML with data from oEmbed.
|
||||
4. If an image exists in the Open Graph response:
|
||||
1. Downloads the URL and stores it into a file via the media storage
|
||||
provider and saves the local media metadata.
|
||||
2. Generates thumbnails.
|
||||
3. Updates the Open Graph response based on image properties.
|
||||
6. If the media is JSON and an oEmbed URL was found:
|
||||
1. Convert the oEmbed response to an Open Graph response.
|
||||
2. If a thumbnail or image is in the oEmbed response:
|
||||
1. Downloads the URL and stores it into a file via the media storage
|
||||
provider and saves the local media metadata.
|
||||
2. Generates thumbnails.
|
||||
3. Updates the Open Graph response based on image properties.
|
||||
7. Stores the result in the database cache.
|
||||
4. Returns the result.
|
||||
|
||||
The in-memory cache expires after 1 hour.
|
||||
|
||||
Expired entries in the database cache (and their associated media files) are
|
||||
deleted every 10 seconds. The default expiration time is 1 hour from download.
|
||||
@@ -7,7 +7,8 @@ The media repository
|
||||
users.
|
||||
* caches avatars, attachments and their thumbnails for media uploaded by remote
|
||||
users.
|
||||
* caches resources and thumbnails used for URL previews.
|
||||
* caches resources and thumbnails used for
|
||||
[URL previews](development/url_previews.md).
|
||||
|
||||
All media in Matrix can be identified by a unique
|
||||
[MXC URI](https://spec.matrix.org/latest/client-server-api/#matrix-content-mxc-uris),
|
||||
@@ -58,6 +59,8 @@ remote_thumbnail/matrix.org/aa/bb/cccccccccccccccccccc/128-96-image-jpeg
|
||||
Note that `remote_thumbnail/` does not have an `s`.
|
||||
|
||||
## URL Previews
|
||||
See [URL Previews](development/url_previews.md) for documentation on the URL preview
|
||||
process.
|
||||
|
||||
When generating previews for URLs, Synapse may download and cache various
|
||||
resources, including images. These resources are assigned temporary media IDs
|
||||
|
||||
@@ -89,21 +89,6 @@ process, for example:
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
|
||||
# Upgrading to v1.64.0
|
||||
|
||||
## Delegation of email validation no longer supported
|
||||
|
||||
As of this version, Synapse no longer allows the tasks of verifying email address
|
||||
ownership, and password reset confirmation, to be delegated to an identity server.
|
||||
|
||||
To continue to allow users to add email addresses to their homeserver accounts,
|
||||
and perform password resets, make sure that Synapse is configured with a
|
||||
working email server in the `email` configuration section (including, at a
|
||||
minimum, a `notif_from` setting.)
|
||||
|
||||
Specifying an `email` setting under `account_threepid_delegates` will now cause
|
||||
an error at startup.
|
||||
|
||||
# Upgrading to v1.62.0
|
||||
|
||||
## New signatures for spam checker callbacks
|
||||
|
||||
@@ -18,11 +18,6 @@ already on your `$PATH` depending on how Synapse was installed.
|
||||
Finding your user's `access_token` is client-dependent, but will usually be shown in the client's settings.
|
||||
|
||||
## Making an Admin API request
|
||||
For security reasons, we [recommend](reverse_proxy.md#synapse-administration-endpoints)
|
||||
that the Admin API (`/_synapse/admin/...`) should be hidden from public view using a
|
||||
reverse proxy. This means you should typically query the Admin API from a terminal on
|
||||
the machine which runs Synapse.
|
||||
|
||||
Once you have your `access_token`, you will need to authenticate each request to an Admin API endpoint by
|
||||
providing the token as either a query parameter or a request header. To add it as a request header in cURL:
|
||||
|
||||
@@ -30,17 +25,5 @@ providing the token as either a query parameter or a request header. To add it a
|
||||
curl --header "Authorization: Bearer <access_token>" <the_rest_of_your_API_request>
|
||||
```
|
||||
|
||||
For example, suppose we want to
|
||||
[query the account](user_admin_api.md#query-user-account) of the user
|
||||
`@foo:bar.com`. We need an admin access token (e.g.
|
||||
`syt_AjfVef2_L33JNpafeif_0feKJfeaf0CQpoZk`), and we need to know which port
|
||||
Synapse's [`client` listener](config_documentation.md#listeners) is listening
|
||||
on (e.g. `8008`). Then we can use the following command to request the account
|
||||
information from the Admin API.
|
||||
|
||||
```sh
|
||||
curl --header "Authorization: Bearer syt_AjfVef2_L33JNpafeif_0feKJfeaf0CQpoZk" -X GET http://127.0.0.1:8008/_synapse/admin/v2/users/@foo:bar.com
|
||||
```
|
||||
|
||||
For more details on access tokens in Matrix, please refer to the complete
|
||||
[matrix spec documentation](https://matrix.org/docs/spec/client_server/r0.6.1#using-access-tokens).
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
# Reporting Anonymised Statistics
|
||||
# Reporting Homeserver Usage Statistics
|
||||
|
||||
When generating your Synapse configuration file, you are asked whether you
|
||||
would like to report anonymised statistics to Matrix.org. These statistics
|
||||
would like to report usage statistics to Matrix.org. These statistics
|
||||
provide the foundation a glimpse into the number of Synapse homeservers
|
||||
participating in the network, as well as statistics such as the number of
|
||||
rooms being created and messages being sent. This feature is sometimes
|
||||
affectionately called "phone-home" stats. Reporting
|
||||
affectionately called "phone home" stats. Reporting
|
||||
[is optional](../../configuration/config_documentation.md#report_stats)
|
||||
and the reporting endpoint
|
||||
[can be configured](../../configuration/config_documentation.md#report_stats_endpoint),
|
||||
@@ -21,9 +21,9 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
|
||||
| Statistic Name | Type | Description |
|
||||
|----------------------------|--------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `homeserver` | string | The homeserver's server name. |
|
||||
| `memory_rss` | int | The memory usage of the process (in kilobytes on Unix-based systems, bytes on MacOS). |
|
||||
| `cpu_average` | int | CPU time in % of a single core (not % of all cores). |
|
||||
| `homeserver` | string | The homeserver's server name. |
|
||||
| `server_context` | string | An arbitrary string used to group statistics from a set of homeservers. |
|
||||
| `timestamp` | int | The current time, represented as the number of seconds since the epoch. |
|
||||
| `uptime_seconds` | int | The number of seconds since the homeserver was last started. |
|
||||
@@ -2168,26 +2168,30 @@ default_identity_server: https://matrix.org
|
||||
---
|
||||
### `account_threepid_delegates`
|
||||
|
||||
Delegate verification of phone numbers to an identity server.
|
||||
Handle threepid (email/phone etc) registration and password resets through a set of
|
||||
*trusted* identity servers. Note that this allows the configured identity server to
|
||||
reset passwords for accounts!
|
||||
|
||||
When a user wishes to add a phone number to their account, we need to verify that they
|
||||
actually own that phone number, which requires sending them a text message (SMS).
|
||||
Currently Synapse does not support sending those texts itself and instead delegates the
|
||||
task to an identity server. The base URI for the identity server to be used is
|
||||
specified by the `account_threepid_delegates.msisdn` option.
|
||||
Be aware that if `email` is not set, and SMTP options have not been
|
||||
configured in the email config block, registration and user password resets via
|
||||
email will be globally disabled.
|
||||
|
||||
If this is left unspecified, Synapse will not allow users to add phone numbers to
|
||||
their account.
|
||||
Additionally, if `msisdn` is not set, registration and password resets via msisdn
|
||||
will be disabled regardless, and users will not be able to associate an msisdn
|
||||
identifier to their account. This is due to Synapse currently not supporting
|
||||
any method of sending SMS messages on its own.
|
||||
|
||||
(Servers handling the these requests must answer the `/requestToken` endpoints defined
|
||||
by the Matrix Identity Service API
|
||||
[specification](https://matrix.org/docs/spec/identity_service/latest).)
|
||||
To enable using an identity server for operations regarding a particular third-party
|
||||
identifier type, set the value to the URL of that identity server as shown in the
|
||||
examples below.
|
||||
|
||||
*Updated in Synapse 1.64.0*: No longer accepts an `email` option.
|
||||
Servers handling the these requests must answer the `/requestToken` endpoints defined
|
||||
by the Matrix Identity Service API [specification](https://matrix.org/docs/spec/identity_service/latest).
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
account_threepid_delegates:
|
||||
email: https://example.com # Delegate email sending to example.com
|
||||
msisdn: http://localhost:8090 # Delegate SMS sending to this local process
|
||||
```
|
||||
---
|
||||
@@ -2405,9 +2409,14 @@ metrics_flags:
|
||||
---
|
||||
### `report_stats`
|
||||
|
||||
Whether or not to report anonymized homeserver usage statistics. This is originally
|
||||
Whether or not to report homeserver usage statistics. This is originally
|
||||
set when generating the config. Set this option to true or false to change the current
|
||||
behavior.
|
||||
behavior. See
|
||||
[Reporting Homeserver Usage Statistics](../administration/monitoring/reporting_homeserver_usage_statistics.md)
|
||||
for information on what data is reported.
|
||||
|
||||
Statistics will be reported 5 minutes after Synapse starts, and then every 3 hours
|
||||
after that.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
@@ -2416,7 +2425,7 @@ report_stats: true
|
||||
---
|
||||
### `report_stats_endpoint`
|
||||
|
||||
The endpoint to report the anonymized homeserver usage statistics to.
|
||||
The endpoint to report homeserver usage statistics to.
|
||||
Defaults to https://matrix.org/report-usage-stats/push
|
||||
|
||||
Example configuration:
|
||||
|
||||
36
poetry.lock
generated
36
poetry.lock
generated
@@ -290,7 +290,7 @@ importlib-metadata = {version = "*", markers = "python_version < \"3.8\""}
|
||||
|
||||
[[package]]
|
||||
name = "frozendict"
|
||||
version = "2.3.2"
|
||||
version = "2.3.0"
|
||||
description = "A simple immutable dictionary"
|
||||
category = "main"
|
||||
optional = false
|
||||
@@ -1753,23 +1753,23 @@ flake8-comprehensions = [
|
||||
{file = "flake8_comprehensions-3.8.0-py3-none-any.whl", hash = "sha256:9406314803abe1193c064544ab14fdc43c58424c0882f6ff8a581eb73fc9bb58"},
|
||||
]
|
||||
frozendict = [
|
||||
{file = "frozendict-2.3.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:4fb171d1e84d17335365877e19d17440373b47ca74a73c06f65ac0b16d01e87f"},
|
||||
{file = "frozendict-2.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c0a3640e9d7533d164160b758351aa49d9e85bbe0bd76d219d4021e90ffa6a52"},
|
||||
{file = "frozendict-2.3.2-cp310-cp310-win_amd64.whl", hash = "sha256:87cfd00fafbc147d8cd2590d1109b7db8ac8d7d5bdaa708ba46caee132b55d4d"},
|
||||
{file = "frozendict-2.3.2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:fb09761e093cfabb2f179dbfdb2521e1ec5701df714d1eb5c51fa7849027be19"},
|
||||
{file = "frozendict-2.3.2-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:82176dc7adf01cf8f0193e909401939415a230a1853f4a672ec1629a06ceae18"},
|
||||
{file = "frozendict-2.3.2-cp36-cp36m-win_amd64.whl", hash = "sha256:c1c70826aa4a50fa283fe161834ac4a3ac7c753902c980bb8b595b0998a38ddb"},
|
||||
{file = "frozendict-2.3.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:1db5035ddbed995badd1a62c4102b5e207b5aeb24472df2c60aba79639d7996b"},
|
||||
{file = "frozendict-2.3.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4246fc4cb1413645ba4d3513939b90d979a5bae724be605a10b2b26ee12f839c"},
|
||||
{file = "frozendict-2.3.2-cp37-cp37m-win_amd64.whl", hash = "sha256:680cd42fb0a255da1ce45678ccbd7f69da750d5243809524ebe8f45b2eda6e6b"},
|
||||
{file = "frozendict-2.3.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6a7f3a181d6722c92a9fab12d0c5c2b006a18ca5666098531f316d1e1c8984e3"},
|
||||
{file = "frozendict-2.3.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b1cb866eabb3c1384a7fe88e1e1033e2b6623073589012ab637c552bf03f6364"},
|
||||
{file = "frozendict-2.3.2-cp38-cp38-win_amd64.whl", hash = "sha256:952c5e5e664578c5c2ce8489ee0ab6a1855da02b58ef593ee728fc10d672641a"},
|
||||
{file = "frozendict-2.3.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:608b77904cd0117cd816df605a80d0043a5326ee62529327d2136c792165a823"},
|
||||
{file = "frozendict-2.3.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0eed41fd326f0bcc779837d8d9e1374da1bc9857fe3b9f2910195bbd5fff3aeb"},
|
||||
{file = "frozendict-2.3.2-cp39-cp39-win_amd64.whl", hash = "sha256:bde28db6b5868dd3c45b3555f9d1dc5a1cca6d93591502fa5dcecce0dde6a335"},
|
||||
{file = "frozendict-2.3.2-py3-none-any.whl", hash = "sha256:6882a9bbe08ab9b5ff96ce11bdff3fe40b114b9813bc6801261e2a7b45e20012"},
|
||||
{file = "frozendict-2.3.2.tar.gz", hash = "sha256:7fac4542f0a13fbe704db4942f41ba3abffec5af8b100025973e59dff6a09d0d"},
|
||||
{file = "frozendict-2.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e18e2abd144a9433b0a8334582843b2aa0d3b9ac8b209aaa912ad365115fe2e1"},
|
||||
{file = "frozendict-2.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:96dc7a02e78da5725e5e642269bb7ae792e0c9f13f10f2e02689175ebbfedb35"},
|
||||
{file = "frozendict-2.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:752a6dcfaf9bb20a7ecab24980e4dbe041f154509c989207caf185522ef85461"},
|
||||
{file = "frozendict-2.3.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:5346d9fc1c936c76d33975a9a9f1a067342963105d9a403a99e787c939cc2bb2"},
|
||||
{file = "frozendict-2.3.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:60dd2253f1bacb63a7c486ec541a968af4f985ffb06602ee8954a3d39ec6bd2e"},
|
||||
{file = "frozendict-2.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:b2e044602ce17e5cd86724add46660fb9d80169545164e763300a3b839cb1b79"},
|
||||
{file = "frozendict-2.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:a27a69b1ac3591e4258325108aee62b53c0eeb6ad0a993ae68d3c7eaea980420"},
|
||||
{file = "frozendict-2.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4f45ef5f6b184d84744fff97b61f6b9a855e24d36b713ea2352fc723a047afa5"},
|
||||
{file = "frozendict-2.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2d3f5016650c0e9a192f5024e68fb4d63f670d0ee58b099ed3f5b4c62ea30ecb"},
|
||||
{file = "frozendict-2.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6cf605916f50aabaaba5624c81eb270200f6c2c466c46960237a125ec8fe3ae0"},
|
||||
{file = "frozendict-2.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f6da06e44904beae4412199d7e49be4f85c6cc168ab06b77c735ea7da5ce3454"},
|
||||
{file = "frozendict-2.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:1f34793fb409c4fa70ffd25bea87b01f3bd305fb1c6b09e7dff085b126302206"},
|
||||
{file = "frozendict-2.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fd72494a559bdcd28aa71f4aa81860269cd0b7c45fff3e2614a0a053ecfd2a13"},
|
||||
{file = "frozendict-2.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:00ea9166aa68cc5feed05986206fdbf35e838a09cb3feef998cf35978ff8a803"},
|
||||
{file = "frozendict-2.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:9ffaf440648b44e0bc694c1a4701801941378ba3ba6541e17750ae4b4aeeb116"},
|
||||
{file = "frozendict-2.3.0-py3-none-any.whl", hash = "sha256:8578fe06815fcdcc672bd5603eebc98361a5317c1c3a13b28c6c810f6ea3b323"},
|
||||
{file = "frozendict-2.3.0.tar.gz", hash = "sha256:da4231adefc5928e7810da2732269d3ad7b5616295b3e693746392a8205ea0b5"},
|
||||
]
|
||||
gitdb = [
|
||||
{file = "gitdb-4.0.9-py3-none-any.whl", hash = "sha256:8033ad4e853066ba6ca92050b9df2f89301b8fc8bf7e9324d412a63f8bf1a8fd"},
|
||||
|
||||
@@ -54,7 +54,7 @@ skip_gitignore = true
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.63.0rc1"
|
||||
version = "1.63.0"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -33,7 +33,7 @@ def main() -> None:
|
||||
parser.add_argument(
|
||||
"--report-stats",
|
||||
action="store",
|
||||
help="Whether the generated config reports anonymized usage statistics",
|
||||
help="Whether the generated config reports homeserver usage statistics",
|
||||
choices=["yes", "no"],
|
||||
)
|
||||
|
||||
|
||||
@@ -166,6 +166,22 @@ IGNORED_TABLES = {
|
||||
"ui_auth_sessions",
|
||||
"ui_auth_sessions_credentials",
|
||||
"ui_auth_sessions_ips",
|
||||
# Groups/communities is no longer supported.
|
||||
"group_attestations_remote",
|
||||
"group_attestations_renewals",
|
||||
"group_invites",
|
||||
"group_roles",
|
||||
"group_room_categories",
|
||||
"group_rooms",
|
||||
"group_summary_roles",
|
||||
"group_summary_room_categories",
|
||||
"group_summary_rooms",
|
||||
"group_summary_users",
|
||||
"group_users",
|
||||
"groups",
|
||||
"local_group_membership",
|
||||
"local_group_updates",
|
||||
"remote_profile_cache",
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -27,33 +27,6 @@ class Ratelimiter:
|
||||
"""
|
||||
Ratelimit actions marked by arbitrary keys.
|
||||
|
||||
(Note that the source code speaks of "actions" and "burst_count" rather than
|
||||
"tokens" and a "bucket_size".)
|
||||
|
||||
This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
|
||||
containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
|
||||
permitted requests for that key. Each bucket starts empty, and gradually leaks
|
||||
tokens at a rate of `rate_hz`.
|
||||
|
||||
Upon an incoming request, we must determine:
|
||||
- the key that this request falls under (which bucket to inspect), and
|
||||
- the cost C of this request in tokens.
|
||||
Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
|
||||
the request is permitted and `cost` tokens are added to the bucket.
|
||||
Otherwise the request is denied, and the bucket continues to hold T tokens.
|
||||
|
||||
This means that the limiter enforces an average request frequency of `rate_hz`,
|
||||
while accumulating a buffer of up to `burst_count` requests which can be consumed
|
||||
instantaneously.
|
||||
|
||||
The tricky bit is the leaking. We do not want to have a periodic process which
|
||||
leaks every bucket! Instead, we track
|
||||
- the time point when the bucket was last completely empty, and
|
||||
- how many tokens have added to the bucket permitted since then.
|
||||
Then for each incoming request, we can calculate how many tokens have leaked
|
||||
since this time point, and use that to decide if we should accept or reject the
|
||||
request.
|
||||
|
||||
Args:
|
||||
clock: A homeserver clock, for retrieving the current time
|
||||
rate_hz: The long term number of actions that can be performed in a second.
|
||||
@@ -68,30 +41,14 @@ class Ratelimiter:
|
||||
self.burst_count = burst_count
|
||||
self.store = store
|
||||
|
||||
# An ordered dictionary representing the token buckets tracked by this rate
|
||||
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
|
||||
# * The number of tokens currently in the bucket,
|
||||
# * The time point when the bucket was last completely empty, and
|
||||
# * The rate_hz (leak rate) of this particular bucket.
|
||||
# A ordered dictionary keeping track of actions, when they were last
|
||||
# performed and how often. Each entry is a mapping from a key of arbitrary type
|
||||
# to a tuple representing:
|
||||
# * How many times an action has occurred since a point in time
|
||||
# * The point in time
|
||||
# * The rate_hz of this particular entry. This can vary per request
|
||||
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
|
||||
|
||||
def _get_key(
|
||||
self, requester: Optional[Requester], key: Optional[Hashable]
|
||||
) -> Hashable:
|
||||
"""Use the requester's MXID as a fallback key if no key is provided."""
|
||||
if key is None:
|
||||
if not requester:
|
||||
raise ValueError("Must supply at least one of `requester` or `key`")
|
||||
|
||||
key = requester.user.to_string()
|
||||
return key
|
||||
|
||||
def _get_action_counts(
|
||||
self, key: Hashable, time_now_s: float
|
||||
) -> Tuple[float, float, float]:
|
||||
"""Retrieve the action counts, with a fallback representing an empty bucket."""
|
||||
return self.actions.get(key, (0.0, time_now_s, 0.0))
|
||||
|
||||
async def can_do_action(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
@@ -131,7 +88,11 @@ class Ratelimiter:
|
||||
* The reactor timestamp for when the action can be performed next.
|
||||
-1 if rate_hz is less than or equal to zero
|
||||
"""
|
||||
key = self._get_key(requester, key)
|
||||
if key is None:
|
||||
if not requester:
|
||||
raise ValueError("Must supply at least one of `requester` or `key`")
|
||||
|
||||
key = requester.user.to_string()
|
||||
|
||||
if requester:
|
||||
# Disable rate limiting of users belonging to any AS that is configured
|
||||
@@ -160,7 +121,7 @@ class Ratelimiter:
|
||||
self._prune_message_counts(time_now_s)
|
||||
|
||||
# Check if there is an existing count entry for this key
|
||||
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
|
||||
action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0))
|
||||
|
||||
# Check whether performing another action is allowed
|
||||
time_delta = time_now_s - time_start
|
||||
@@ -203,37 +164,6 @@ class Ratelimiter:
|
||||
|
||||
return allowed, time_allowed
|
||||
|
||||
def record_action(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
key: Optional[Hashable] = None,
|
||||
n_actions: int = 1,
|
||||
_time_now_s: Optional[float] = None,
|
||||
) -> None:
|
||||
"""Record that an action(s) took place, even if they violate the rate limit.
|
||||
|
||||
This is useful for tracking the frequency of events that happen across
|
||||
federation which we still want to impose local rate limits on. For instance, if
|
||||
we are alice.com monitoring a particular room, we cannot prevent bob.com
|
||||
from joining users to that room. However, we can track the number of recent
|
||||
joins in the room and refuse to serve new joins ourselves if there have been too
|
||||
many in the room across both homeservers.
|
||||
|
||||
Args:
|
||||
requester: The requester that is doing the action, if any.
|
||||
key: An arbitrary key used to classify an action. Defaults to the
|
||||
requester's user ID.
|
||||
n_actions: The number of times the user wants to do this action. If the user
|
||||
cannot do all of the actions, the user's action count is not incremented
|
||||
at all.
|
||||
_time_now_s: The current time. Optional, defaults to the current time according
|
||||
to self.clock. Only used by tests.
|
||||
"""
|
||||
key = self._get_key(requester, key)
|
||||
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
|
||||
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
|
||||
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
|
||||
|
||||
def _prune_message_counts(self, time_now_s: float) -> None:
|
||||
"""Remove message count entries that have not exceeded their defined
|
||||
rate_hz limit
|
||||
|
||||
@@ -84,8 +84,6 @@ class RoomVersion:
|
||||
# MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of
|
||||
# knocks and restricted join rules into the same join condition.
|
||||
msc3787_knock_restricted_join_rule: bool
|
||||
# MSC3667: Enforce integer power levels
|
||||
msc3667_int_only_power_levels: bool
|
||||
|
||||
|
||||
class RoomVersions:
|
||||
@@ -105,7 +103,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V2 = RoomVersion(
|
||||
"2",
|
||||
@@ -123,7 +120,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V3 = RoomVersion(
|
||||
"3",
|
||||
@@ -141,7 +137,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V4 = RoomVersion(
|
||||
"4",
|
||||
@@ -159,7 +154,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V5 = RoomVersion(
|
||||
"5",
|
||||
@@ -177,7 +171,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V6 = RoomVersion(
|
||||
"6",
|
||||
@@ -195,7 +188,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
MSC2176 = RoomVersion(
|
||||
"org.matrix.msc2176",
|
||||
@@ -213,7 +205,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V7 = RoomVersion(
|
||||
"7",
|
||||
@@ -231,7 +222,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V8 = RoomVersion(
|
||||
"8",
|
||||
@@ -249,7 +239,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V9 = RoomVersion(
|
||||
"9",
|
||||
@@ -267,7 +256,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
MSC2716v3 = RoomVersion(
|
||||
"org.matrix.msc2716v3",
|
||||
@@ -285,7 +273,6 @@ class RoomVersions:
|
||||
msc2716_historical=True,
|
||||
msc2716_redactions=True,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
MSC3787 = RoomVersion(
|
||||
"org.matrix.msc3787",
|
||||
@@ -303,25 +290,6 @@ class RoomVersions:
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=False,
|
||||
)
|
||||
V10 = RoomVersion(
|
||||
"10",
|
||||
RoomDisposition.STABLE,
|
||||
EventFormatVersions.V3,
|
||||
StateResolutionVersions.V2,
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=False,
|
||||
strict_canonicaljson=True,
|
||||
limit_notifications_power_levels=True,
|
||||
msc2176_redaction_rules=False,
|
||||
msc3083_join_rules=True,
|
||||
msc3375_redaction_rules=True,
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -340,7 +308,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
|
||||
RoomVersions.V9,
|
||||
RoomVersions.MSC2716v3,
|
||||
RoomVersions.MSC3787,
|
||||
RoomVersions.V10,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ from synapse.app._base import (
|
||||
register_start,
|
||||
)
|
||||
from synapse.config._base import ConfigError, format_config_error
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
@@ -201,7 +202,7 @@ class SynapseHomeServer(HomeServer):
|
||||
}
|
||||
)
|
||||
|
||||
if self.config.email.can_verify_email:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
from synapse.rest.synapse.client.password_reset import (
|
||||
PasswordResetSubmitTokenResource,
|
||||
)
|
||||
|
||||
@@ -53,18 +53,6 @@ sent_events_counter = Counter(
|
||||
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
|
||||
)
|
||||
|
||||
sent_ephemeral_counter = Counter(
|
||||
"synapse_appservice_api_sent_ephemeral",
|
||||
"Number of ephemeral events sent to the AS",
|
||||
["service"],
|
||||
)
|
||||
|
||||
sent_todevice_counter = Counter(
|
||||
"synapse_appservice_api_sent_todevice",
|
||||
"Number of todevice messages sent to the AS",
|
||||
["service"],
|
||||
)
|
||||
|
||||
HOUR_IN_MS = 60 * 60 * 1000
|
||||
|
||||
|
||||
@@ -322,8 +310,6 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
)
|
||||
sent_transactions_counter.labels(service.id).inc()
|
||||
sent_events_counter.labels(service.id).inc(len(serialized_events))
|
||||
sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
|
||||
sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
|
||||
return True
|
||||
except CodeMessageException as e:
|
||||
logger.warning(
|
||||
|
||||
@@ -97,16 +97,16 @@ def format_config_error(e: ConfigError) -> Iterator[str]:
|
||||
# We split these messages out to allow packages to override with package
|
||||
# specific instructions.
|
||||
MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS = """\
|
||||
Please opt in or out of reporting anonymized homeserver usage statistics, by
|
||||
setting the `report_stats` key in your config file to either True or False.
|
||||
Please opt in or out of reporting homeserver usage statistics, by setting
|
||||
the `report_stats` key in your config file to either True or False.
|
||||
"""
|
||||
|
||||
MISSING_REPORT_STATS_SPIEL = """\
|
||||
We would really appreciate it if you could help our project out by reporting
|
||||
anonymized usage statistics from your homeserver. Only very basic aggregate
|
||||
data (e.g. number of users) will be reported, but it helps us to track the
|
||||
growth of the Matrix community, and helps us to make Matrix a success, as well
|
||||
as to convince other networks that they should peer with us.
|
||||
homeserver usage statistics from your homeserver. Your homeserver's server name,
|
||||
along with very basic aggregate data (e.g. number of users) will be reported. But
|
||||
it helps us to track the growth of the Matrix community, and helps us to make Matrix
|
||||
a success, as well as to convince other networks that they should peer with us.
|
||||
|
||||
Thank you.
|
||||
"""
|
||||
@@ -621,7 +621,7 @@ class RootConfig:
|
||||
generate_group.add_argument(
|
||||
"--report-stats",
|
||||
action="store",
|
||||
help="Whether the generated config reports anonymized usage statistics.",
|
||||
help="Whether the generated config reports homeserver usage statistics.",
|
||||
choices=["yes", "no"],
|
||||
)
|
||||
generate_group.add_argument(
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
import email.utils
|
||||
import logging
|
||||
import os
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
import attr
|
||||
@@ -130,22 +131,41 @@ class EmailConfig(Config):
|
||||
|
||||
self.email_enable_notifs = email_config.get("enable_notifs", False)
|
||||
|
||||
self.threepid_behaviour_email = (
|
||||
# Have Synapse handle the email sending if account_threepid_delegates.email
|
||||
# is not defined
|
||||
# msisdn is currently always remote while Synapse does not support any method of
|
||||
# sending SMS messages
|
||||
ThreepidBehaviour.REMOTE
|
||||
if self.root.registration.account_threepid_delegate_email
|
||||
else ThreepidBehaviour.LOCAL
|
||||
)
|
||||
|
||||
if config.get("trust_identity_server_for_password_resets"):
|
||||
raise ConfigError(
|
||||
'The config option "trust_identity_server_for_password_resets" '
|
||||
"is no longer supported. Please remove it from the config file."
|
||||
'has been replaced by "account_threepid_delegate". '
|
||||
"Please consult the configuration manual at docs/usage/configuration/config_documentation.md for "
|
||||
"details and update your config file."
|
||||
)
|
||||
|
||||
# If we have email config settings, assume that we can verify ownership of
|
||||
# email addresses.
|
||||
self.can_verify_email = email_config != {}
|
||||
self.local_threepid_handling_disabled_due_to_email_config = False
|
||||
if (
|
||||
self.threepid_behaviour_email == ThreepidBehaviour.LOCAL
|
||||
and email_config == {}
|
||||
):
|
||||
# We cannot warn the user this has happened here
|
||||
# Instead do so when a user attempts to reset their password
|
||||
self.local_threepid_handling_disabled_due_to_email_config = True
|
||||
|
||||
self.threepid_behaviour_email = ThreepidBehaviour.OFF
|
||||
|
||||
# Get lifetime of a validation token in milliseconds
|
||||
self.email_validation_token_lifetime = self.parse_duration(
|
||||
email_config.get("validation_token_lifetime", "1h")
|
||||
)
|
||||
|
||||
if self.can_verify_email:
|
||||
if self.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
missing = []
|
||||
if not self.email_notif_from:
|
||||
missing.append("email.notif_from")
|
||||
@@ -336,3 +356,18 @@ class EmailConfig(Config):
|
||||
"Config option email.invite_client_location must be a http or https URL",
|
||||
path=("email", "invite_client_location"),
|
||||
)
|
||||
|
||||
|
||||
class ThreepidBehaviour(Enum):
|
||||
"""
|
||||
Enum to define the behaviour of Synapse with regards to when it contacts an identity
|
||||
server for 3pid registration and password resets
|
||||
|
||||
REMOTE = use an external server to send tokens
|
||||
LOCAL = send tokens ourselves
|
||||
OFF = disable registration via 3pid and password resets
|
||||
"""
|
||||
|
||||
REMOTE = "remote"
|
||||
LOCAL = "local"
|
||||
OFF = "off"
|
||||
|
||||
@@ -20,13 +20,6 @@ from synapse.config._base import Config, ConfigError
|
||||
from synapse.types import JsonDict, RoomAlias, UserID
|
||||
from synapse.util.stringutils import random_string_with_symbols, strtobool
|
||||
|
||||
NO_EMAIL_DELEGATE_ERROR = """\
|
||||
Delegation of email verification to an identity server is no longer supported. To
|
||||
continue to allow users to add email addresses to their accounts, and use them for
|
||||
password resets, configure Synapse with an SMTP server via the `email` setting, and
|
||||
remove `account_threepid_delegates.email`.
|
||||
"""
|
||||
|
||||
|
||||
class RegistrationConfig(Config):
|
||||
section = "registration"
|
||||
@@ -58,9 +51,7 @@ class RegistrationConfig(Config):
|
||||
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
|
||||
|
||||
account_threepid_delegates = config.get("account_threepid_delegates") or {}
|
||||
if "email" in account_threepid_delegates:
|
||||
raise ConfigError(NO_EMAIL_DELEGATE_ERROR)
|
||||
# self.account_threepid_delegate_email = account_threepid_delegates.get("email")
|
||||
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
|
||||
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
|
||||
self.default_identity_server = config.get("default_identity_server")
|
||||
self.allow_guest_access = config.get("allow_guest_access", False)
|
||||
|
||||
@@ -740,32 +740,6 @@ def _check_power_levels(
|
||||
except Exception:
|
||||
raise SynapseError(400, "Not a valid power level: %s" % (v,))
|
||||
|
||||
# Reject events with stringy power levels if required by room version
|
||||
if (
|
||||
event.type == EventTypes.PowerLevels
|
||||
and room_version_obj.msc3667_int_only_power_levels
|
||||
):
|
||||
for k, v in event.content.items():
|
||||
if k in {
|
||||
"users_default",
|
||||
"events_default",
|
||||
"state_default",
|
||||
"ban",
|
||||
"redact",
|
||||
"kick",
|
||||
"invite",
|
||||
}:
|
||||
if not isinstance(v, int):
|
||||
raise SynapseError(400, f"{v!r} must be an integer.")
|
||||
if k in {"events", "notifications", "users"}:
|
||||
if not isinstance(v, dict) or not all(
|
||||
isinstance(v, int) for v in v.values()
|
||||
):
|
||||
raise SynapseError(
|
||||
400,
|
||||
f"{v!r} must be a dict wherein all the values are integers.",
|
||||
)
|
||||
|
||||
key = (event.type, event.state_key)
|
||||
current_state = auth_events.get(key)
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ class EventBuilder:
|
||||
The signed and hashed event.
|
||||
"""
|
||||
if auth_event_ids is None:
|
||||
state_ids = await self._state.compute_state_after_events(
|
||||
state_ids = await self._state.get_current_state_ids(
|
||||
self.room_id, prev_event_ids
|
||||
)
|
||||
auth_event_ids = self._event_auth_handler.compute_auth_events(
|
||||
|
||||
@@ -351,11 +351,7 @@ class FederationSender(AbstractFederationSender):
|
||||
self._is_processing = True
|
||||
while True:
|
||||
last_token = await self.store.get_federation_out_pos("events")
|
||||
(
|
||||
next_token,
|
||||
events,
|
||||
event_to_received_ts,
|
||||
) = await self.store.get_all_new_events_stream(
|
||||
next_token, events = await self.store.get_all_new_events_stream(
|
||||
last_token, self._last_poked_id, limit=100
|
||||
)
|
||||
|
||||
@@ -480,7 +476,7 @@ class FederationSender(AbstractFederationSender):
|
||||
await self._send_pdu(event, sharded_destinations)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
ts = event_to_received_ts[event.event_id]
|
||||
ts = await self.store.get_received_ts(event.event_id)
|
||||
assert ts is not None
|
||||
synapse.metrics.event_processing_lag_by_event.labels(
|
||||
"federation_sender"
|
||||
@@ -513,7 +509,7 @@ class FederationSender(AbstractFederationSender):
|
||||
|
||||
if events:
|
||||
now = self.clock.time_msec()
|
||||
ts = event_to_received_ts[events[-1].event_id]
|
||||
ts = await self.store.get_received_ts(events[-1].event_id)
|
||||
assert ts is not None
|
||||
|
||||
synapse.metrics.event_processing_lag.labels(
|
||||
|
||||
@@ -104,15 +104,14 @@ class ApplicationServicesHandler:
|
||||
with Measure(self.clock, "notify_interested_services"):
|
||||
self.is_processing = True
|
||||
try:
|
||||
limit = 100
|
||||
upper_bound = -1
|
||||
while upper_bound < self.current_max:
|
||||
last_token = await self.store.get_appservice_last_pos()
|
||||
(
|
||||
upper_bound,
|
||||
events,
|
||||
event_to_received_ts,
|
||||
) = await self.store.get_all_new_events_stream(
|
||||
last_token, self.current_max, limit=100, get_prev_content=True
|
||||
) = await self.store.get_new_events_for_appservice(
|
||||
self.current_max, limit
|
||||
)
|
||||
|
||||
events_by_room: Dict[str, List[EventBase]] = {}
|
||||
@@ -151,7 +150,7 @@ class ApplicationServicesHandler:
|
||||
)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
ts = event_to_received_ts[event.event_id]
|
||||
ts = await self.store.get_received_ts(event.event_id)
|
||||
assert ts is not None
|
||||
|
||||
synapse.metrics.event_processing_lag_by_event.labels(
|
||||
@@ -188,7 +187,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
if events:
|
||||
now = self.clock.time_msec()
|
||||
ts = event_to_received_ts[events[-1].event_id]
|
||||
ts = await self.store.get_received_ts(events[-1].event_id)
|
||||
assert ts is not None
|
||||
|
||||
synapse.metrics.event_processing_lag.labels(
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import itertools
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
@@ -348,7 +347,7 @@ class FederationEventHandler:
|
||||
event.internal_metadata.send_on_behalf_of = origin
|
||||
|
||||
context = await self._state_handler.compute_event_context(event)
|
||||
await self._check_event_auth(origin, event, context)
|
||||
context = await self._check_event_auth(origin, event, context)
|
||||
if context.rejected:
|
||||
raise SynapseError(
|
||||
403, f"{event.membership} event was rejected", Codes.FORBIDDEN
|
||||
@@ -486,7 +485,7 @@ class FederationEventHandler:
|
||||
partial_state=partial_state,
|
||||
)
|
||||
|
||||
await self._check_event_auth(origin, event, context)
|
||||
context = await self._check_event_auth(origin, event, context)
|
||||
if context.rejected:
|
||||
raise SynapseError(400, "Join event was rejected")
|
||||
|
||||
@@ -1117,7 +1116,11 @@ class FederationEventHandler:
|
||||
state_ids_before_event=state_ids,
|
||||
)
|
||||
try:
|
||||
await self._check_event_auth(origin, event, context)
|
||||
context = await self._check_event_auth(
|
||||
origin,
|
||||
event,
|
||||
context,
|
||||
)
|
||||
except AuthError as e:
|
||||
# This happens only if we couldn't find the auth events. We'll already have
|
||||
# logged a warning, so now we just convert to a FederationError.
|
||||
@@ -1492,8 +1495,11 @@ class FederationEventHandler:
|
||||
)
|
||||
|
||||
async def _check_event_auth(
|
||||
self, origin: str, event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
self,
|
||||
origin: str,
|
||||
event: EventBase,
|
||||
context: EventContext,
|
||||
) -> EventContext:
|
||||
"""
|
||||
Checks whether an event should be rejected (for failing auth checks).
|
||||
|
||||
@@ -1503,6 +1509,9 @@ class FederationEventHandler:
|
||||
context:
|
||||
The event context.
|
||||
|
||||
Returns:
|
||||
The updated context object.
|
||||
|
||||
Raises:
|
||||
AuthError if we were unable to find copies of the event's auth events.
|
||||
(Most other failures just cause us to set `context.rejected`.)
|
||||
@@ -1517,7 +1526,7 @@ class FederationEventHandler:
|
||||
logger.warning("While validating received event %r: %s", event, e)
|
||||
# TODO: use a different rejected reason here?
|
||||
context.rejected = RejectedReason.AUTH_ERROR
|
||||
return
|
||||
return context
|
||||
|
||||
# next, check that we have all of the event's auth events.
|
||||
#
|
||||
@@ -1529,9 +1538,6 @@ class FederationEventHandler:
|
||||
)
|
||||
|
||||
# ... and check that the event passes auth at those auth events.
|
||||
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
|
||||
# 4. Passes authorization rules based on the event’s auth events,
|
||||
# otherwise it is rejected.
|
||||
try:
|
||||
await check_state_independent_auth_rules(self._store, event)
|
||||
check_state_dependent_auth_rules(event, claimed_auth_events)
|
||||
@@ -1540,90 +1546,55 @@ class FederationEventHandler:
|
||||
"While checking auth of %r against auth_events: %s", event, e
|
||||
)
|
||||
context.rejected = RejectedReason.AUTH_ERROR
|
||||
return
|
||||
return context
|
||||
|
||||
# now check the auth rules pass against the room state before the event
|
||||
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
|
||||
# 5. Passes authorization rules based on the state before the event,
|
||||
# otherwise it is rejected.
|
||||
#
|
||||
# ... however, if we only have partial state for the room, then there is a good
|
||||
# chance that we'll be missing some of the state needed to auth the new event.
|
||||
# So, we state-resolve the auth events that we are given against the state that
|
||||
# we know about, which ensures things like bans are applied. (Note that we'll
|
||||
# already have checked we have all the auth events, in
|
||||
# _load_or_fetch_auth_events_for_event above)
|
||||
if context.partial_state:
|
||||
room_version = await self._store.get_room_version_id(event.room_id)
|
||||
|
||||
local_state_id_map = await context.get_prev_state_ids()
|
||||
claimed_auth_events_id_map = {
|
||||
(ev.type, ev.state_key): ev.event_id for ev in claimed_auth_events
|
||||
}
|
||||
|
||||
state_for_auth_id_map = (
|
||||
await self._state_resolution_handler.resolve_events_with_store(
|
||||
event.room_id,
|
||||
room_version,
|
||||
[local_state_id_map, claimed_auth_events_id_map],
|
||||
event_map=None,
|
||||
state_res_store=StateResolutionStore(self._store),
|
||||
)
|
||||
)
|
||||
else:
|
||||
event_types = event_auth.auth_types_for_event(event.room_version, event)
|
||||
state_for_auth_id_map = await context.get_prev_state_ids(
|
||||
StateFilter.from_types(event_types)
|
||||
)
|
||||
|
||||
calculated_auth_event_ids = self._event_auth_handler.compute_auth_events(
|
||||
event, state_for_auth_id_map, for_verification=True
|
||||
# now check auth against what we think the auth events *should* be.
|
||||
event_types = event_auth.auth_types_for_event(event.room_version, event)
|
||||
prev_state_ids = await context.get_prev_state_ids(
|
||||
StateFilter.from_types(event_types)
|
||||
)
|
||||
|
||||
# if those are the same, we're done here.
|
||||
if collections.Counter(event.auth_event_ids()) == collections.Counter(
|
||||
calculated_auth_event_ids
|
||||
):
|
||||
return
|
||||
|
||||
# otherwise, re-run the auth checks based on what we calculated.
|
||||
calculated_auth_events = await self._store.get_events_as_list(
|
||||
calculated_auth_event_ids
|
||||
auth_events_ids = self._event_auth_handler.compute_auth_events(
|
||||
event, prev_state_ids, for_verification=True
|
||||
)
|
||||
|
||||
# log the differences
|
||||
|
||||
claimed_auth_event_map = {(e.type, e.state_key): e for e in claimed_auth_events}
|
||||
auth_events_x = await self._store.get_events(auth_events_ids)
|
||||
calculated_auth_event_map = {
|
||||
(e.type, e.state_key): e for e in calculated_auth_events
|
||||
(e.type, e.state_key): e for e in auth_events_x.values()
|
||||
}
|
||||
logger.info(
|
||||
"event's auth_events are different to our calculated auth_events. "
|
||||
"Claimed but not calculated: %s. Calculated but not claimed: %s",
|
||||
[
|
||||
ev
|
||||
for k, ev in claimed_auth_event_map.items()
|
||||
if k not in calculated_auth_event_map
|
||||
or calculated_auth_event_map[k].event_id != ev.event_id
|
||||
],
|
||||
[
|
||||
ev
|
||||
for k, ev in calculated_auth_event_map.items()
|
||||
if k not in claimed_auth_event_map
|
||||
or claimed_auth_event_map[k].event_id != ev.event_id
|
||||
],
|
||||
)
|
||||
|
||||
try:
|
||||
check_state_dependent_auth_rules(event, calculated_auth_events)
|
||||
except AuthError as e:
|
||||
logger.warning(
|
||||
"While checking auth of %r against room state before the event: %s",
|
||||
updated_auth_events = await self._update_auth_events_for_auth(
|
||||
event,
|
||||
e,
|
||||
calculated_auth_event_map=calculated_auth_event_map,
|
||||
)
|
||||
except Exception:
|
||||
# We don't really mind if the above fails, so lets not fail
|
||||
# processing if it does. However, it really shouldn't fail so
|
||||
# let's still log as an exception since we'll still want to fix
|
||||
# any bugs.
|
||||
logger.exception(
|
||||
"Failed to double check auth events for %s with remote. "
|
||||
"Ignoring failure and continuing processing of event.",
|
||||
event.event_id,
|
||||
)
|
||||
updated_auth_events = None
|
||||
|
||||
if updated_auth_events:
|
||||
context = await self._update_context_for_auth_events(
|
||||
event, context, updated_auth_events
|
||||
)
|
||||
auth_events_for_auth = updated_auth_events
|
||||
else:
|
||||
auth_events_for_auth = calculated_auth_event_map
|
||||
|
||||
try:
|
||||
check_state_dependent_auth_rules(event, auth_events_for_auth.values())
|
||||
except AuthError as e:
|
||||
logger.warning("Failed auth resolution for %r because %s", event, e)
|
||||
context.rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
return context
|
||||
|
||||
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
|
||||
if event.type != EventTypes.GuestAccess:
|
||||
return
|
||||
@@ -1733,6 +1704,93 @@ class FederationEventHandler:
|
||||
soft_failed_event_counter.inc()
|
||||
event.internal_metadata.soft_failed = True
|
||||
|
||||
async def _update_auth_events_for_auth(
|
||||
self,
|
||||
event: EventBase,
|
||||
calculated_auth_event_map: StateMap[EventBase],
|
||||
) -> Optional[StateMap[EventBase]]:
|
||||
"""Helper for _check_event_auth. See there for docs.
|
||||
|
||||
Checks whether a given event has the expected auth events. If it
|
||||
doesn't then we talk to the remote server to compare state to see if
|
||||
we can come to a consensus (e.g. if one server missed some valid
|
||||
state).
|
||||
|
||||
This attempts to resolve any potential divergence of state between
|
||||
servers, but is not essential and so failures should not block further
|
||||
processing of the event.
|
||||
|
||||
Args:
|
||||
event:
|
||||
|
||||
calculated_auth_event_map:
|
||||
Our calculated auth_events based on the state of the room
|
||||
at the event's position in the DAG.
|
||||
|
||||
Returns:
|
||||
updated auth event map, or None if no changes are needed.
|
||||
|
||||
"""
|
||||
assert not event.internal_metadata.outlier
|
||||
|
||||
# check for events which are in the event's claimed auth_events, but not
|
||||
# in our calculated event map.
|
||||
event_auth_events = set(event.auth_event_ids())
|
||||
different_auth = event_auth_events.difference(
|
||||
e.event_id for e in calculated_auth_event_map.values()
|
||||
)
|
||||
|
||||
if not different_auth:
|
||||
return None
|
||||
|
||||
logger.info(
|
||||
"auth_events refers to events which are not in our calculated auth "
|
||||
"chain: %s",
|
||||
different_auth,
|
||||
)
|
||||
|
||||
# XXX: currently this checks for redactions but I'm not convinced that is
|
||||
# necessary?
|
||||
different_events = await self._store.get_events_as_list(different_auth)
|
||||
|
||||
# double-check they're all in the same room - we should already have checked
|
||||
# this but it doesn't hurt to check again.
|
||||
for d in different_events:
|
||||
assert (
|
||||
d.room_id == event.room_id
|
||||
), f"Event {event.event_id} refers to auth_event {d.event_id} which is in a different room"
|
||||
|
||||
# now we state-resolve between our own idea of the auth events, and the remote's
|
||||
# idea of them.
|
||||
|
||||
local_state = calculated_auth_event_map.values()
|
||||
remote_auth_events = dict(calculated_auth_event_map)
|
||||
remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
|
||||
remote_state = remote_auth_events.values()
|
||||
|
||||
room_version = await self._store.get_room_version_id(event.room_id)
|
||||
new_state = await self._state_handler.resolve_events(
|
||||
room_version, (local_state, remote_state), event
|
||||
)
|
||||
different_state = {
|
||||
(d.type, d.state_key): d
|
||||
for d in new_state.values()
|
||||
if calculated_auth_event_map.get((d.type, d.state_key)) != d
|
||||
}
|
||||
if not different_state:
|
||||
logger.info("State res returned no new state")
|
||||
return None
|
||||
|
||||
logger.info(
|
||||
"After state res: updating auth_events with new state %s",
|
||||
different_state.values(),
|
||||
)
|
||||
|
||||
# take a copy of calculated_auth_event_map before we modify it.
|
||||
auth_events = dict(calculated_auth_event_map)
|
||||
auth_events.update(different_state)
|
||||
return auth_events
|
||||
|
||||
async def _load_or_fetch_auth_events_for_event(
|
||||
self, destination: str, event: EventBase
|
||||
) -> Collection[EventBase]:
|
||||
@@ -1830,6 +1888,61 @@ class FederationEventHandler:
|
||||
|
||||
await self._auth_and_persist_outliers(room_id, remote_auth_events)
|
||||
|
||||
async def _update_context_for_auth_events(
|
||||
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
|
||||
) -> EventContext:
|
||||
"""Update the state_ids in an event context after auth event resolution,
|
||||
storing the changes as a new state group.
|
||||
|
||||
Args:
|
||||
event: The event we're handling the context for
|
||||
|
||||
context: initial event context
|
||||
|
||||
auth_events: Events to update in the event context.
|
||||
|
||||
Returns:
|
||||
new event context
|
||||
"""
|
||||
# exclude the state key of the new event from the current_state in the context.
|
||||
if event.is_state():
|
||||
event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
|
||||
else:
|
||||
event_key = None
|
||||
state_updates = {
|
||||
k: a.event_id for k, a in auth_events.items() if k != event_key
|
||||
}
|
||||
|
||||
current_state_ids = await context.get_current_state_ids()
|
||||
current_state_ids = dict(current_state_ids) # type: ignore
|
||||
|
||||
current_state_ids.update(state_updates)
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
prev_state_ids = dict(prev_state_ids)
|
||||
|
||||
prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
|
||||
|
||||
# create a new state group as a delta from the existing one.
|
||||
prev_group = context.state_group
|
||||
state_group = await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=prev_group,
|
||||
delta_ids=state_updates,
|
||||
current_state_ids=current_state_ids,
|
||||
)
|
||||
|
||||
return EventContext.with_state(
|
||||
storage=self._storage_controllers,
|
||||
state_group=state_group,
|
||||
state_group_before_event=context.state_group_before_event,
|
||||
state_delta_due_to_event=state_updates,
|
||||
prev_group=prev_group,
|
||||
delta_ids=state_updates,
|
||||
partial_state=context.partial_state,
|
||||
)
|
||||
|
||||
async def _run_push_actions_and_persist_event(
|
||||
self, event: EventBase, context: EventContext, backfilled: bool = False
|
||||
) -> None:
|
||||
|
||||
@@ -26,6 +26,7 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.http.site import SynapseRequest
|
||||
@@ -162,7 +163,8 @@ class IdentityHandler:
|
||||
sid: str,
|
||||
mxid: str,
|
||||
id_server: str,
|
||||
id_access_token: str,
|
||||
id_access_token: Optional[str] = None,
|
||||
use_v2: bool = True,
|
||||
) -> JsonDict:
|
||||
"""Bind a 3PID to an identity server
|
||||
|
||||
@@ -172,7 +174,8 @@ class IdentityHandler:
|
||||
mxid: The MXID to bind the 3PID to
|
||||
id_server: The domain of the identity server to query
|
||||
id_access_token: The access token to authenticate to the identity
|
||||
server with
|
||||
server with, if necessary. Required if use_v2 is true
|
||||
use_v2: Whether to use v2 Identity Service API endpoints. Defaults to True
|
||||
|
||||
Raises:
|
||||
SynapseError: On any of the following conditions
|
||||
@@ -184,15 +187,24 @@ class IdentityHandler:
|
||||
"""
|
||||
logger.debug("Proxying threepid bind request for %s to %s", mxid, id_server)
|
||||
|
||||
# If an id_access_token is not supplied, force usage of v1
|
||||
if id_access_token is None:
|
||||
use_v2 = False
|
||||
|
||||
if not valid_id_server_location(id_server):
|
||||
raise SynapseError(
|
||||
400,
|
||||
"id_server must be a valid hostname with optional port and path components",
|
||||
)
|
||||
|
||||
# Decide which API endpoint URLs to use
|
||||
headers = {}
|
||||
bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
|
||||
bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
|
||||
headers = {"Authorization": create_id_access_token_header(id_access_token)}
|
||||
if use_v2:
|
||||
bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
|
||||
headers["Authorization"] = create_id_access_token_header(id_access_token) # type: ignore
|
||||
else:
|
||||
bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
|
||||
|
||||
try:
|
||||
# Use the blacklisting http client as this call is only to identity servers
|
||||
@@ -211,14 +223,21 @@ class IdentityHandler:
|
||||
|
||||
return data
|
||||
except HttpResponseException as e:
|
||||
logger.error("3PID bind failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
if e.code != 404 or not use_v2:
|
||||
logger.error("3PID bind failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
except CodeMessageException as e:
|
||||
data = json_decoder.decode(e.msg) # XXX WAT?
|
||||
return data
|
||||
|
||||
logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
|
||||
res = await self.bind_threepid(
|
||||
client_secret, sid, mxid, id_server, id_access_token, use_v2=False
|
||||
)
|
||||
return res
|
||||
|
||||
async def try_unbind_threepid(self, mxid: str, threepid: dict) -> bool:
|
||||
"""Attempt to remove a 3PID from an identity server, or if one is not provided, all
|
||||
identity servers we're aware the binding is present on
|
||||
@@ -281,8 +300,8 @@ class IdentityHandler:
|
||||
"id_server must be a valid hostname with optional port and path components",
|
||||
)
|
||||
|
||||
url = "https://%s/_matrix/identity/v2/3pid/unbind" % (id_server,)
|
||||
url_bytes = b"/_matrix/identity/v2/3pid/unbind"
|
||||
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
|
||||
url_bytes = b"/_matrix/identity/api/v1/3pid/unbind"
|
||||
|
||||
content = {
|
||||
"mxid": mxid,
|
||||
@@ -415,6 +434,48 @@ class IdentityHandler:
|
||||
|
||||
return session_id
|
||||
|
||||
async def requestEmailToken(
|
||||
self,
|
||||
id_server: str,
|
||||
email: str,
|
||||
client_secret: str,
|
||||
send_attempt: int,
|
||||
next_link: Optional[str] = None,
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Request an external server send an email on our behalf for the purposes of threepid
|
||||
validation.
|
||||
|
||||
Args:
|
||||
id_server: The identity server to proxy to
|
||||
email: The email to send the message to
|
||||
client_secret: The unique client_secret sends by the user
|
||||
send_attempt: Which attempt this is
|
||||
next_link: A link to redirect the user to once they submit the token
|
||||
|
||||
Returns:
|
||||
The json response body from the server
|
||||
"""
|
||||
params = {
|
||||
"email": email,
|
||||
"client_secret": client_secret,
|
||||
"send_attempt": send_attempt,
|
||||
}
|
||||
if next_link:
|
||||
params["next_link"] = next_link
|
||||
|
||||
try:
|
||||
data = await self.http_client.post_json_get_json(
|
||||
id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
|
||||
params,
|
||||
)
|
||||
return data
|
||||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
async def requestMsisdnToken(
|
||||
self,
|
||||
id_server: str,
|
||||
@@ -488,7 +549,18 @@ class IdentityHandler:
|
||||
validation_session = None
|
||||
|
||||
# Try to validate as email
|
||||
if self.hs.config.email.can_verify_email:
|
||||
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
# Remote emails will only be used if a valid identity server is provided.
|
||||
assert (
|
||||
self.hs.config.registration.account_threepid_delegate_email is not None
|
||||
)
|
||||
|
||||
# Ask our delegated email identity server
|
||||
validation_session = await self.threepid_from_creds(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
threepid_creds,
|
||||
)
|
||||
elif self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
# Get a validated session matching these details
|
||||
validation_session = await self.store.get_threepid_validation_session(
|
||||
"email", client_secret, sid=sid, validated=True
|
||||
|
||||
@@ -1444,12 +1444,7 @@ class EventCreationHandler:
|
||||
if state_entry.state_group in self._external_cache_joined_hosts_updates:
|
||||
return
|
||||
|
||||
state = await state_entry.get_state(
|
||||
self._storage_controllers.state, StateFilter.all()
|
||||
)
|
||||
joined_hosts = await self.store.get_joined_hosts(
|
||||
event.room_id, state, state_entry
|
||||
)
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
# Note that the expiry times must be larger than the expiry time in
|
||||
# _external_cache_joined_hosts_updates.
|
||||
|
||||
@@ -889,11 +889,7 @@ class RoomCreationHandler:
|
||||
# override any attempt to set room versions via the creation_content
|
||||
creation_content["room_version"] = room_version.identifier
|
||||
|
||||
(
|
||||
last_stream_id,
|
||||
last_sent_event_id,
|
||||
depth,
|
||||
) = await self._send_events_for_new_room(
|
||||
last_stream_id = await self._send_events_for_new_room(
|
||||
requester,
|
||||
room_id,
|
||||
preset_config=preset_config,
|
||||
@@ -909,7 +905,7 @@ class RoomCreationHandler:
|
||||
if "name" in config:
|
||||
name = config["name"]
|
||||
(
|
||||
name_event,
|
||||
_,
|
||||
last_stream_id,
|
||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
requester,
|
||||
@@ -921,16 +917,12 @@ class RoomCreationHandler:
|
||||
"content": {"name": name},
|
||||
},
|
||||
ratelimit=False,
|
||||
prev_event_ids=[last_sent_event_id],
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = name_event.event_id
|
||||
depth += 1
|
||||
|
||||
if "topic" in config:
|
||||
topic = config["topic"]
|
||||
(
|
||||
topic_event,
|
||||
_,
|
||||
last_stream_id,
|
||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
requester,
|
||||
@@ -942,11 +934,7 @@ class RoomCreationHandler:
|
||||
"content": {"topic": topic},
|
||||
},
|
||||
ratelimit=False,
|
||||
prev_event_ids=[last_sent_event_id],
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = topic_event.event_id
|
||||
depth += 1
|
||||
|
||||
# we avoid dropping the lock between invites, as otherwise joins can
|
||||
# start coming in and making the createRoom slow.
|
||||
@@ -961,7 +949,7 @@ class RoomCreationHandler:
|
||||
|
||||
for invitee in invite_list:
|
||||
(
|
||||
member_event_id,
|
||||
_,
|
||||
last_stream_id,
|
||||
) = await self.room_member_handler.update_membership_locked(
|
||||
requester,
|
||||
@@ -971,11 +959,7 @@ class RoomCreationHandler:
|
||||
ratelimit=False,
|
||||
content=content,
|
||||
new_room=True,
|
||||
prev_event_ids=[last_sent_event_id],
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = member_event_id
|
||||
depth += 1
|
||||
|
||||
for invite_3pid in invite_3pid_list:
|
||||
id_server = invite_3pid["id_server"]
|
||||
@@ -984,10 +968,7 @@ class RoomCreationHandler:
|
||||
medium = invite_3pid["medium"]
|
||||
# Note that do_3pid_invite can raise a ShadowBanError, but this was
|
||||
# handled above by emptying invite_3pid_list.
|
||||
(
|
||||
member_event_id,
|
||||
last_stream_id,
|
||||
) = await self.hs.get_room_member_handler().do_3pid_invite(
|
||||
last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite(
|
||||
room_id,
|
||||
requester.user,
|
||||
medium,
|
||||
@@ -996,11 +977,7 @@ class RoomCreationHandler:
|
||||
requester,
|
||||
txn_id=None,
|
||||
id_access_token=id_access_token,
|
||||
prev_event_ids=[last_sent_event_id],
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = member_event_id
|
||||
depth += 1
|
||||
|
||||
result = {"room_id": room_id}
|
||||
|
||||
@@ -1028,22 +1005,20 @@ class RoomCreationHandler:
|
||||
power_level_content_override: Optional[JsonDict] = None,
|
||||
creator_join_profile: Optional[JsonDict] = None,
|
||||
ratelimit: bool = True,
|
||||
) -> Tuple[int, str, int]:
|
||||
) -> int:
|
||||
"""Sends the initial events into a new room.
|
||||
|
||||
`power_level_content_override` doesn't apply when initial state has
|
||||
power level state event content.
|
||||
|
||||
Returns:
|
||||
A tuple containing the stream ID, event ID and depth of the last
|
||||
event sent to the room.
|
||||
The stream_id of the last event persisted.
|
||||
"""
|
||||
|
||||
creator_id = creator.user.to_string()
|
||||
|
||||
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
|
||||
|
||||
depth = 1
|
||||
last_sent_event_id: Optional[str] = None
|
||||
|
||||
def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
|
||||
@@ -1056,7 +1031,6 @@ class RoomCreationHandler:
|
||||
|
||||
async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
|
||||
nonlocal last_sent_event_id
|
||||
nonlocal depth
|
||||
|
||||
event = create(etype, content, **kwargs)
|
||||
logger.debug("Sending %s in new room", etype)
|
||||
@@ -1073,11 +1047,9 @@ class RoomCreationHandler:
|
||||
# Note: we don't pass state_event_ids here because this triggers
|
||||
# an additional query per event to look them up from the events table.
|
||||
prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
|
||||
depth=depth,
|
||||
)
|
||||
|
||||
last_sent_event_id = sent_event.event_id
|
||||
depth += 1
|
||||
|
||||
return last_stream_id
|
||||
|
||||
@@ -1103,7 +1075,6 @@ class RoomCreationHandler:
|
||||
content=creator_join_profile,
|
||||
new_room=True,
|
||||
prev_event_ids=[last_sent_event_id],
|
||||
depth=depth,
|
||||
)
|
||||
last_sent_event_id = member_event_id
|
||||
|
||||
@@ -1197,7 +1168,7 @@ class RoomCreationHandler:
|
||||
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
|
||||
)
|
||||
|
||||
return last_sent_stream_id, last_sent_event_id, depth
|
||||
return last_sent_stream_id
|
||||
|
||||
def _generate_room_id(self) -> str:
|
||||
"""Generates a random room ID.
|
||||
|
||||
@@ -285,7 +285,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
allow_no_prev_events: bool = False,
|
||||
prev_event_ids: Optional[List[str]] = None,
|
||||
state_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
txn_id: Optional[str] = None,
|
||||
ratelimit: bool = True,
|
||||
content: Optional[dict] = None,
|
||||
@@ -316,9 +315,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
prev_events are set so we need to set them ourself via this argument.
|
||||
This should normally be left as None, which will cause the auth_event_ids
|
||||
to be calculated based on the room state at the prev_events.
|
||||
depth: Override the depth used to order the event in the DAG.
|
||||
Should normally be set to None, which will cause the depth to be calculated
|
||||
based on the prev_events.
|
||||
|
||||
txn_id:
|
||||
ratelimit:
|
||||
@@ -374,7 +370,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
allow_no_prev_events=allow_no_prev_events,
|
||||
prev_event_ids=prev_event_ids,
|
||||
state_event_ids=state_event_ids,
|
||||
depth=depth,
|
||||
require_consent=require_consent,
|
||||
outlier=outlier,
|
||||
historical=historical,
|
||||
@@ -471,7 +466,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
allow_no_prev_events: bool = False,
|
||||
prev_event_ids: Optional[List[str]] = None,
|
||||
state_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
) -> Tuple[str, int]:
|
||||
"""Update a user's membership in a room.
|
||||
|
||||
@@ -507,9 +501,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
prev_events are set so we need to set them ourself via this argument.
|
||||
This should normally be left as None, which will cause the auth_event_ids
|
||||
to be calculated based on the room state at the prev_events.
|
||||
depth: Override the depth used to order the event in the DAG.
|
||||
Should normally be set to None, which will cause the depth to be calculated
|
||||
based on the prev_events.
|
||||
|
||||
Returns:
|
||||
A tuple of the new event ID and stream ID.
|
||||
@@ -549,7 +540,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
allow_no_prev_events=allow_no_prev_events,
|
||||
prev_event_ids=prev_event_ids,
|
||||
state_event_ids=state_event_ids,
|
||||
depth=depth,
|
||||
)
|
||||
|
||||
return result
|
||||
@@ -572,7 +562,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
allow_no_prev_events: bool = False,
|
||||
prev_event_ids: Optional[List[str]] = None,
|
||||
state_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
) -> Tuple[str, int]:
|
||||
"""Helper for update_membership.
|
||||
|
||||
@@ -610,9 +599,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
prev_events are set so we need to set them ourself via this argument.
|
||||
This should normally be left as None, which will cause the auth_event_ids
|
||||
to be calculated based on the room state at the prev_events.
|
||||
depth: Override the depth used to order the event in the DAG.
|
||||
Should normally be set to None, which will cause the depth to be calculated
|
||||
based on the prev_events.
|
||||
|
||||
Returns:
|
||||
A tuple of the new event ID and stream ID.
|
||||
@@ -746,7 +732,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
allow_no_prev_events=allow_no_prev_events,
|
||||
prev_event_ids=prev_event_ids,
|
||||
state_event_ids=state_event_ids,
|
||||
depth=depth,
|
||||
content=content,
|
||||
require_consent=require_consent,
|
||||
outlier=outlier,
|
||||
@@ -755,14 +740,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
latest_event_ids = await self.store.get_prev_events_for_room(room_id)
|
||||
|
||||
state_before_join = await self.state_handler.compute_state_after_events(
|
||||
room_id, latest_event_ids
|
||||
current_state_ids = await self.state_handler.get_current_state_ids(
|
||||
room_id, latest_event_ids=latest_event_ids
|
||||
)
|
||||
|
||||
# TODO: Refactor into dictionary of explicitly allowed transitions
|
||||
# between old and new state, with specific error messages for some
|
||||
# transitions and generic otherwise
|
||||
old_state_id = state_before_join.get((EventTypes.Member, target.to_string()))
|
||||
old_state_id = current_state_ids.get((EventTypes.Member, target.to_string()))
|
||||
if old_state_id:
|
||||
old_state = await self.store.get_event(old_state_id, allow_none=True)
|
||||
old_membership = old_state.content.get("membership") if old_state else None
|
||||
@@ -813,11 +798,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
if action == "kick":
|
||||
raise AuthError(403, "The target user is not in the room")
|
||||
|
||||
is_host_in_room = await self._is_host_in_room(state_before_join)
|
||||
is_host_in_room = await self._is_host_in_room(current_state_ids)
|
||||
|
||||
if effective_membership_state == Membership.JOIN:
|
||||
if requester.is_guest:
|
||||
guest_can_join = await self._can_guest_join(state_before_join)
|
||||
guest_can_join = await self._can_guest_join(current_state_ids)
|
||||
if not guest_can_join:
|
||||
# This should be an auth check, but guests are a local concept,
|
||||
# so don't really fit into the general auth process.
|
||||
@@ -855,12 +840,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
# Check if a remote join should be performed.
|
||||
remote_join, remote_room_hosts = await self._should_perform_remote_join(
|
||||
target.to_string(),
|
||||
room_id,
|
||||
remote_room_hosts,
|
||||
content,
|
||||
is_host_in_room,
|
||||
state_before_join,
|
||||
target.to_string(), room_id, remote_room_hosts, content, is_host_in_room
|
||||
)
|
||||
if remote_join:
|
||||
if ratelimit:
|
||||
@@ -987,7 +967,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
ratelimit=ratelimit,
|
||||
prev_event_ids=latest_event_ids,
|
||||
state_event_ids=state_event_ids,
|
||||
depth=depth,
|
||||
content=content,
|
||||
require_consent=require_consent,
|
||||
outlier=outlier,
|
||||
@@ -1000,7 +979,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
remote_room_hosts: List[str],
|
||||
content: JsonDict,
|
||||
is_host_in_room: bool,
|
||||
state_before_join: StateMap[str],
|
||||
) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Check whether the server should do a remote join (as opposed to a local
|
||||
@@ -1020,8 +998,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
content: The content to use as the event body of the join. This may
|
||||
be modified.
|
||||
is_host_in_room: True if the host is in the room.
|
||||
state_before_join: The state before the join event (i.e. the resolution of
|
||||
the states after its parent events).
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
@@ -1038,17 +1014,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
# If the host is in the room, but not one of the authorised hosts
|
||||
# for restricted join rules, a remote join must be used.
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
current_state_ids = await self._storage_controllers.state.get_current_state_ids(
|
||||
room_id
|
||||
)
|
||||
|
||||
# If restricted join rules are not being used, a local join can always
|
||||
# be used.
|
||||
if not await self.event_auth_handler.has_restricted_join_rules(
|
||||
state_before_join, room_version
|
||||
current_state_ids, room_version
|
||||
):
|
||||
return False, []
|
||||
|
||||
# If the user is invited to the room or already joined, the join
|
||||
# event can always be issued locally.
|
||||
prev_member_event_id = state_before_join.get((EventTypes.Member, user_id), None)
|
||||
prev_member_event_id = current_state_ids.get((EventTypes.Member, user_id), None)
|
||||
prev_member_event = None
|
||||
if prev_member_event_id:
|
||||
prev_member_event = await self.store.get_event(prev_member_event_id)
|
||||
@@ -1063,10 +1042,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
#
|
||||
# If not, generate a new list of remote hosts based on which
|
||||
# can issue invites.
|
||||
event_map = await self.store.get_events(state_before_join.values())
|
||||
event_map = await self.store.get_events(current_state_ids.values())
|
||||
current_state = {
|
||||
state_key: event_map[event_id]
|
||||
for state_key, event_id in state_before_join.items()
|
||||
for state_key, event_id in current_state_ids.items()
|
||||
}
|
||||
allowed_servers = get_servers_from_users(
|
||||
get_users_which_can_issue_invite(current_state)
|
||||
@@ -1080,7 +1059,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
# Ensure the member should be allowed access via membership in a room.
|
||||
await self.event_auth_handler.check_restricted_join_rules(
|
||||
state_before_join, room_version, user_id, prev_member_event
|
||||
current_state_ids, room_version, user_id, prev_member_event
|
||||
)
|
||||
|
||||
# If this is going to be a local join, additional information must
|
||||
@@ -1090,7 +1069,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
EventContentFields.AUTHORISING_USER
|
||||
] = await self.event_auth_handler.get_user_which_could_invite(
|
||||
room_id,
|
||||
state_before_join,
|
||||
current_state_ids,
|
||||
)
|
||||
|
||||
return False, []
|
||||
@@ -1343,9 +1322,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
requester: Requester,
|
||||
txn_id: Optional[str],
|
||||
id_access_token: Optional[str] = None,
|
||||
prev_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
) -> Tuple[str, int]:
|
||||
) -> int:
|
||||
"""Invite a 3PID to a room.
|
||||
|
||||
Args:
|
||||
@@ -1358,13 +1335,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
txn_id: The transaction ID this is part of, or None if this is not
|
||||
part of a transaction.
|
||||
id_access_token: The optional identity server access token.
|
||||
depth: Override the depth used to order the event in the DAG.
|
||||
prev_event_ids: The event IDs to use as the prev events
|
||||
Should normally be set to None, which will cause the depth to be calculated
|
||||
based on the prev_events.
|
||||
|
||||
Returns:
|
||||
Tuple of event ID and stream ordering position
|
||||
The new stream ID.
|
||||
|
||||
Raises:
|
||||
ShadowBanError if the requester has been shadow-banned.
|
||||
@@ -1410,7 +1383,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
# We don't check the invite against the spamchecker(s) here (through
|
||||
# user_may_invite) because we'll do it further down the line anyway (in
|
||||
# update_membership_locked).
|
||||
event_id, stream_id = await self.update_membership(
|
||||
_, stream_id = await self.update_membership(
|
||||
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
|
||||
)
|
||||
else:
|
||||
@@ -1429,7 +1402,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
additional_fields=spam_check[1],
|
||||
)
|
||||
|
||||
event, stream_id = await self._make_and_store_3pid_invite(
|
||||
stream_id = await self._make_and_store_3pid_invite(
|
||||
requester,
|
||||
id_server,
|
||||
medium,
|
||||
@@ -1438,12 +1411,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
inviter,
|
||||
txn_id=txn_id,
|
||||
id_access_token=id_access_token,
|
||||
prev_event_ids=prev_event_ids,
|
||||
depth=depth,
|
||||
)
|
||||
event_id = event.event_id
|
||||
|
||||
return event_id, stream_id
|
||||
return stream_id
|
||||
|
||||
async def _make_and_store_3pid_invite(
|
||||
self,
|
||||
@@ -1455,9 +1425,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
user: UserID,
|
||||
txn_id: Optional[str],
|
||||
id_access_token: Optional[str] = None,
|
||||
prev_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
) -> Tuple[EventBase, int]:
|
||||
) -> int:
|
||||
room_state = await self._storage_controllers.state.get_current_state(
|
||||
room_id,
|
||||
StateFilter.from_types(
|
||||
@@ -1550,10 +1518,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
},
|
||||
ratelimit=False,
|
||||
txn_id=txn_id,
|
||||
prev_event_ids=prev_event_ids,
|
||||
depth=depth,
|
||||
)
|
||||
return event, stream_id
|
||||
return stream_id
|
||||
|
||||
async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool:
|
||||
# Have we just created the room, and is this about to be the very
|
||||
|
||||
@@ -19,6 +19,7 @@ from twisted.web.client import PartialDownloadError
|
||||
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import Codes, LoginError, SynapseError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -152,7 +153,7 @@ class _BaseThreepidAuthChecker:
|
||||
|
||||
logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
|
||||
|
||||
# msisdns are currently always verified via the IS
|
||||
# msisdns are currently always ThreepidBehaviour.REMOTE
|
||||
if medium == "msisdn":
|
||||
if not self.hs.config.registration.account_threepid_delegate_msisdn:
|
||||
raise SynapseError(
|
||||
@@ -163,7 +164,18 @@ class _BaseThreepidAuthChecker:
|
||||
threepid_creds,
|
||||
)
|
||||
elif medium == "email":
|
||||
if self.hs.config.email.can_verify_email:
|
||||
if (
|
||||
self.hs.config.email.threepid_behaviour_email
|
||||
== ThreepidBehaviour.REMOTE
|
||||
):
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
threepid = await identity_handler.threepid_from_creds(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
threepid_creds,
|
||||
)
|
||||
elif (
|
||||
self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
|
||||
):
|
||||
threepid = None
|
||||
row = await self.store.get_threepid_validation_session(
|
||||
medium,
|
||||
@@ -215,7 +227,10 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
|
||||
_BaseThreepidAuthChecker.__init__(self, hs)
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
return self.hs.config.email.can_verify_email
|
||||
return self.hs.config.email.threepid_behaviour_email in (
|
||||
ThreepidBehaviour.REMOTE,
|
||||
ThreepidBehaviour.LOCAL,
|
||||
)
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
return await self._check_threepid("email", authdict)
|
||||
|
||||
@@ -228,7 +228,6 @@ class Notifier:
|
||||
|
||||
# Called when there are new things to stream over replication
|
||||
self.replication_callbacks: List[Callable[[], None]] = []
|
||||
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []
|
||||
|
||||
self._federation_client = hs.get_federation_http_client()
|
||||
|
||||
@@ -281,19 +280,6 @@ class Notifier:
|
||||
"""
|
||||
self.replication_callbacks.append(cb)
|
||||
|
||||
def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
|
||||
"""Add a callback that will be called when a user joins a room.
|
||||
|
||||
This only fires on genuine membership changes, e.g. "invite" -> "join".
|
||||
Membership transitions like "join" -> "join" (for e.g. displayname changes) do
|
||||
not trigger the callback.
|
||||
|
||||
When called, the callback receives two arguments: the event ID and the room ID.
|
||||
It should *not* return a Deferred - if it needs to do any asynchronous work, a
|
||||
background thread should be started and wrapped with run_as_background_process.
|
||||
"""
|
||||
self._new_join_in_room_callbacks.append(cb)
|
||||
|
||||
async def on_new_room_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
@@ -737,10 +723,6 @@ class Notifier:
|
||||
for cb in self.replication_callbacks:
|
||||
cb()
|
||||
|
||||
def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||
for cb in self._new_join_in_room_callbacks:
|
||||
cb(event_id, room_id)
|
||||
|
||||
def notify_remote_server_up(self, server: str) -> None:
|
||||
"""Notify any replication that a remote server has come back up"""
|
||||
# We call federation_sender directly rather than registering as a
|
||||
|
||||
@@ -373,7 +373,6 @@ class UserRestServletV2(RestServlet):
|
||||
if (
|
||||
self.hs.config.email.email_enable_notifs
|
||||
and self.hs.config.email.email_notif_for_new_users
|
||||
and medium == "email"
|
||||
):
|
||||
await self.pusher_pool.add_pusher(
|
||||
user_id=user_id,
|
||||
|
||||
@@ -28,6 +28,7 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
ThreepidValidationError,
|
||||
)
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.handlers.ui_auth import UIAuthSessionDataConstants
|
||||
from synapse.http.server import HttpServer, finish_request, respond_with_html
|
||||
from synapse.http.servlet import (
|
||||
@@ -63,7 +64,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
self.config = hs.config
|
||||
self.identity_handler = hs.get_identity_handler()
|
||||
|
||||
if self.config.email.can_verify_email:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
self.mailer = Mailer(
|
||||
hs=self.hs,
|
||||
app_name=self.config.email.email_app_name,
|
||||
@@ -72,10 +73,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"User password resets have been disabled due to lack of email config"
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"User password resets have been disabled due to lack of email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Email-based password resets have been disabled on this server"
|
||||
)
|
||||
@@ -127,21 +129,35 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
|
||||
raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
|
||||
|
||||
# Send password reset emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_password_reset_mail,
|
||||
next_link,
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
|
||||
# Have the configured identity server handle the request
|
||||
ret = await self.identity_handler.requestEmailToken(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
next_link,
|
||||
)
|
||||
else:
|
||||
# Send password reset emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_password_reset_mail,
|
||||
next_link,
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
ret = {"sid": sid}
|
||||
|
||||
threepid_send_requests.labels(type="email", reason="password_reset").observe(
|
||||
send_attempt
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
return 200, {"sid": sid}
|
||||
return 200, ret
|
||||
|
||||
|
||||
class PasswordRestServlet(RestServlet):
|
||||
@@ -333,7 +349,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
self.identity_handler = hs.get_identity_handler()
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
if self.config.email.can_verify_email:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
self.mailer = Mailer(
|
||||
hs=self.hs,
|
||||
app_name=self.config.email.email_app_name,
|
||||
@@ -342,10 +358,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Adding an email to your account is disabled on this server"
|
||||
)
|
||||
@@ -396,20 +413,35 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
|
||||
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
|
||||
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_add_threepid_mail,
|
||||
next_link,
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
|
||||
# Have the configured identity server handle the request
|
||||
ret = await self.identity_handler.requestEmailToken(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
next_link,
|
||||
)
|
||||
else:
|
||||
# Send threepid validation emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_add_threepid_mail,
|
||||
next_link,
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
ret = {"sid": sid}
|
||||
|
||||
threepid_send_requests.labels(type="email", reason="add_threepid").observe(
|
||||
send_attempt
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
return 200, {"sid": sid}
|
||||
return 200, ret
|
||||
|
||||
|
||||
class MsisdnThreepidRequestTokenRestServlet(RestServlet):
|
||||
@@ -502,19 +534,26 @@ class AddThreepidEmailSubmitTokenServlet(RestServlet):
|
||||
self.config = hs.config
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
if self.config.email.can_verify_email:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
self._failure_email_template = (
|
||||
self.config.email.email_add_threepid_template_failure_html
|
||||
)
|
||||
|
||||
async def on_GET(self, request: Request) -> None:
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Adding an email to your account is disabled on this server"
|
||||
)
|
||||
elif self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"This homeserver is not validating threepids. Use an identity server "
|
||||
"instead.",
|
||||
)
|
||||
|
||||
sid = parse_string(request, "sid", required=True)
|
||||
token = parse_string(request, "token", required=True)
|
||||
@@ -704,12 +743,10 @@ class ThreepidBindRestServlet(RestServlet):
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
assert_params_in_dict(
|
||||
body, ["id_server", "sid", "id_access_token", "client_secret"]
|
||||
)
|
||||
assert_params_in_dict(body, ["id_server", "sid", "client_secret"])
|
||||
id_server = body["id_server"]
|
||||
sid = body["sid"]
|
||||
id_access_token = body["id_access_token"]
|
||||
id_access_token = body.get("id_access_token") # optional
|
||||
client_secret = body["client_secret"]
|
||||
assert_valid_client_secret(client_secret)
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ from typing import (
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from synapse.api.errors import Codes, InvalidClientTokenError, LoginError, SynapseError
|
||||
from synapse.api.errors import Codes, LoginError, SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.api.urls import CLIENT_API_PREFIX
|
||||
from synapse.appservice import ApplicationService
|
||||
@@ -172,13 +172,7 @@ class LoginRestServlet(RestServlet):
|
||||
|
||||
try:
|
||||
if login_submission["type"] == LoginRestServlet.APPSERVICE_TYPE:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
appservice = requester.app_service
|
||||
|
||||
if appservice is None:
|
||||
raise InvalidClientTokenError(
|
||||
"This login method is only valid for application services"
|
||||
)
|
||||
appservice = self.auth.get_appservice_by_req(request)
|
||||
|
||||
if appservice.is_rate_limited():
|
||||
await self._address_ratelimiter.ratelimit(
|
||||
|
||||
@@ -40,10 +40,6 @@ class ReadMarkerRestServlet(RestServlet):
|
||||
self.read_marker_handler = hs.get_read_marker_handler()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
|
||||
self._known_receipt_types = {ReceiptTypes.READ, ReceiptTypes.FULLY_READ}
|
||||
if hs.config.experimental.msc2285_enabled:
|
||||
self._known_receipt_types.add(ReceiptTypes.READ_PRIVATE)
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
@@ -53,7 +49,13 @@ class ReadMarkerRestServlet(RestServlet):
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
unrecognized_types = set(body.keys()) - self._known_receipt_types
|
||||
valid_receipt_types = {
|
||||
ReceiptTypes.READ,
|
||||
ReceiptTypes.FULLY_READ,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
}
|
||||
|
||||
unrecognized_types = set(body.keys()) - valid_receipt_types
|
||||
if unrecognized_types:
|
||||
# It's fine if there are unrecognized receipt types, but let's log
|
||||
# it to help debug clients that have typoed the receipt type.
|
||||
@@ -63,25 +65,31 @@ class ReadMarkerRestServlet(RestServlet):
|
||||
# types.
|
||||
logger.info("Ignoring unrecognized receipt types: %s", unrecognized_types)
|
||||
|
||||
for receipt_type in self._known_receipt_types:
|
||||
event_id = body.get(receipt_type, None)
|
||||
# TODO Add validation to reject non-string event IDs.
|
||||
if not event_id:
|
||||
continue
|
||||
read_event_id = body.get(ReceiptTypes.READ, None)
|
||||
if read_event_id:
|
||||
await self.receipts_handler.received_client_receipt(
|
||||
room_id,
|
||||
ReceiptTypes.READ,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=read_event_id,
|
||||
)
|
||||
|
||||
if receipt_type == ReceiptTypes.FULLY_READ:
|
||||
await self.read_marker_handler.received_client_read_marker(
|
||||
room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=event_id,
|
||||
)
|
||||
else:
|
||||
await self.receipts_handler.received_client_receipt(
|
||||
room_id,
|
||||
receipt_type,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=event_id,
|
||||
)
|
||||
read_private_event_id = body.get(ReceiptTypes.READ_PRIVATE, None)
|
||||
if read_private_event_id and self.config.experimental.msc2285_enabled:
|
||||
await self.receipts_handler.received_client_receipt(
|
||||
room_id,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=read_private_event_id,
|
||||
)
|
||||
|
||||
read_marker_event_id = body.get(ReceiptTypes.FULLY_READ, None)
|
||||
if read_marker_event_id:
|
||||
await self.read_marker_handler.received_client_read_marker(
|
||||
room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=read_marker_event_id,
|
||||
)
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
@@ -39,27 +39,31 @@ class ReceiptRestServlet(RestServlet):
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.receipts_handler = hs.get_receipts_handler()
|
||||
self.read_marker_handler = hs.get_read_marker_handler()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
|
||||
self._known_receipt_types = {ReceiptTypes.READ}
|
||||
if hs.config.experimental.msc2285_enabled:
|
||||
self._known_receipt_types.update(
|
||||
(ReceiptTypes.READ_PRIVATE, ReceiptTypes.FULLY_READ)
|
||||
)
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
if receipt_type not in self._known_receipt_types:
|
||||
if self.hs.config.experimental.msc2285_enabled and receipt_type not in [
|
||||
ReceiptTypes.READ,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
ReceiptTypes.FULLY_READ,
|
||||
]:
|
||||
raise SynapseError(
|
||||
400,
|
||||
f"Receipt type must be {', '.join(self._known_receipt_types)}",
|
||||
"Receipt type must be 'm.read', 'org.matrix.msc2285.read.private' or 'm.fully_read'",
|
||||
)
|
||||
elif (
|
||||
not self.hs.config.experimental.msc2285_enabled
|
||||
and receipt_type != ReceiptTypes.READ
|
||||
):
|
||||
raise SynapseError(400, "Receipt type must be 'm.read'")
|
||||
|
||||
parse_json_object_from_request(request, allow_empty_body=False)
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config import ConfigError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.ratelimiting import FederationRateLimitConfig
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
@@ -73,7 +74,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
self.identity_handler = hs.get_identity_handler()
|
||||
self.config = hs.config
|
||||
|
||||
if self.hs.config.email.can_verify_email:
|
||||
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
self.mailer = Mailer(
|
||||
hs=self.hs,
|
||||
app_name=self.config.email.email_app_name,
|
||||
@@ -82,10 +83,13 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
if not self.hs.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"Email registration has been disabled due to lack of email config"
|
||||
)
|
||||
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if (
|
||||
self.hs.config.email.local_threepid_handling_disabled_due_to_email_config
|
||||
):
|
||||
logger.warning(
|
||||
"Email registration has been disabled due to lack of email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Email-based registration has been disabled on this server"
|
||||
)
|
||||
@@ -134,21 +138,35 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
|
||||
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
|
||||
|
||||
# Send registration emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_registration_mail,
|
||||
next_link,
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
|
||||
# Have the configured identity server handle the request
|
||||
ret = await self.identity_handler.requestEmailToken(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
next_link,
|
||||
)
|
||||
else:
|
||||
# Send registration emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_registration_mail,
|
||||
next_link,
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
ret = {"sid": sid}
|
||||
|
||||
threepid_send_requests.labels(type="email", reason="register").observe(
|
||||
send_attempt
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
return 200, {"sid": sid}
|
||||
return 200, ret
|
||||
|
||||
|
||||
class MsisdnRegisterRequestTokenRestServlet(RestServlet):
|
||||
@@ -242,7 +260,7 @@ class RegistrationSubmitTokenServlet(RestServlet):
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
if self.config.email.can_verify_email:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
self._failure_email_template = (
|
||||
self.config.email.email_registration_template_failure_html
|
||||
)
|
||||
@@ -252,10 +270,11 @@ class RegistrationSubmitTokenServlet(RestServlet):
|
||||
raise SynapseError(
|
||||
400, "This medium is currently not supported for registration"
|
||||
)
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"User registration via email has been disabled due to lack of email config"
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"User registration via email has been disabled due to lack of email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Email-based registration is disabled on this server"
|
||||
)
|
||||
|
||||
@@ -109,64 +109,10 @@ class MediaInfo:
|
||||
|
||||
class PreviewUrlResource(DirectServeJsonResource):
|
||||
"""
|
||||
The `GET /_matrix/media/r0/preview_url` endpoint provides a generic preview API
|
||||
for URLs which outputs Open Graph (https://ogp.me/) responses (with some Matrix
|
||||
specific additions).
|
||||
Generating URL previews is a complicated task which many potential pitfalls.
|
||||
|
||||
This does have trade-offs compared to other designs:
|
||||
|
||||
* Pros:
|
||||
* Simple and flexible; can be used by any clients at any point
|
||||
* Cons:
|
||||
* If each homeserver provides one of these independently, all the homeservers in a
|
||||
room may needlessly DoS the target URI
|
||||
* The URL metadata must be stored somewhere, rather than just using Matrix
|
||||
itself to store the media.
|
||||
* Matrix cannot be used to distribute the metadata between homeservers.
|
||||
|
||||
When Synapse is asked to preview a URL it does the following:
|
||||
|
||||
1. Checks against a URL blacklist (defined as `url_preview_url_blacklist` in the
|
||||
config).
|
||||
2. Checks the URL against an in-memory cache and returns the result if it exists. (This
|
||||
is also used to de-duplicate processing of multiple in-flight requests at once.)
|
||||
3. Kicks off a background process to generate a preview:
|
||||
1. Checks URL and timestamp against the database cache and returns the result if it
|
||||
has not expired and was successful (a 2xx return code).
|
||||
2. Checks if the URL matches an oEmbed (https://oembed.com/) pattern. If it
|
||||
does, update the URL to download.
|
||||
3. Downloads the URL and stores it into a file via the media storage provider
|
||||
and saves the local media metadata.
|
||||
4. If the media is an image:
|
||||
1. Generates thumbnails.
|
||||
2. Generates an Open Graph response based on image properties.
|
||||
5. If the media is HTML:
|
||||
1. Decodes the HTML via the stored file.
|
||||
2. Generates an Open Graph response from the HTML.
|
||||
3. If a JSON oEmbed URL was found in the HTML via autodiscovery:
|
||||
1. Downloads the URL and stores it into a file via the media storage provider
|
||||
and saves the local media metadata.
|
||||
2. Convert the oEmbed response to an Open Graph response.
|
||||
3. Override any Open Graph data from the HTML with data from oEmbed.
|
||||
4. If an image exists in the Open Graph response:
|
||||
1. Downloads the URL and stores it into a file via the media storage
|
||||
provider and saves the local media metadata.
|
||||
2. Generates thumbnails.
|
||||
3. Updates the Open Graph response based on image properties.
|
||||
6. If the media is JSON and an oEmbed URL was found:
|
||||
1. Convert the oEmbed response to an Open Graph response.
|
||||
2. If a thumbnail or image is in the oEmbed response:
|
||||
1. Downloads the URL and stores it into a file via the media storage
|
||||
provider and saves the local media metadata.
|
||||
2. Generates thumbnails.
|
||||
3. Updates the Open Graph response based on image properties.
|
||||
7. Stores the result in the database cache.
|
||||
4. Returns the result.
|
||||
|
||||
The in-memory cache expires after 1 hour.
|
||||
|
||||
Expired entries in the database cache (and their associated media files) are
|
||||
deleted every 10 seconds. The default expiration time is 1 hour from download.
|
||||
See docs/development/url_previews.md for discussion of the design and
|
||||
algorithm followed in this module.
|
||||
"""
|
||||
|
||||
isLeaf = True
|
||||
|
||||
@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Tuple
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.errors import ThreepidValidationError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.http.server import DirectServeHtmlResource
|
||||
from synapse.http.servlet import parse_string
|
||||
from synapse.util.stringutils import assert_valid_client_secret
|
||||
@@ -45,6 +46,9 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
self._local_threepid_handling_disabled_due_to_email_config = (
|
||||
hs.config.email.local_threepid_handling_disabled_due_to_email_config
|
||||
)
|
||||
self._confirmation_email_template = (
|
||||
hs.config.email.email_password_reset_template_confirmation_html
|
||||
)
|
||||
@@ -55,8 +59,8 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
|
||||
hs.config.email.email_password_reset_template_failure_html
|
||||
)
|
||||
|
||||
# This resource should only be mounted if email validation is enabled
|
||||
assert hs.config.email.can_verify_email
|
||||
# This resource should not be mounted if threepid behaviour is not LOCAL
|
||||
assert hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
|
||||
|
||||
async def _async_render_GET(self, request: Request) -> Tuple[int, bytes]:
|
||||
sid = parse_string(request, "sid", required=True)
|
||||
|
||||
@@ -24,12 +24,14 @@ from typing import (
|
||||
DefaultDict,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
import attr
|
||||
@@ -45,7 +47,6 @@ from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServ
|
||||
from synapse.state import v1, v2
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.roommember import ProfileInfo
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import StateMap
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
@@ -53,7 +54,6 @@ from synapse.util.metrics import Measure, measure_func
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.controllers import StateStorageController
|
||||
from synapse.storage.databases.main import DataStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -83,23 +83,17 @@ def _gen_state_id() -> str:
|
||||
|
||||
|
||||
class _StateCacheEntry:
|
||||
__slots__ = ["_state", "state_group", "prev_group", "delta_ids"]
|
||||
__slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
state: Optional[StateMap[str]],
|
||||
state: StateMap[str],
|
||||
state_group: Optional[int],
|
||||
prev_group: Optional[int] = None,
|
||||
delta_ids: Optional[StateMap[str]] = None,
|
||||
):
|
||||
if state is None and state_group is None:
|
||||
raise Exception("Either state or state group must be not None")
|
||||
|
||||
# A map from (type, state_key) to event_id.
|
||||
#
|
||||
# This can be None if we have a `state_group` (as then we can fetch the
|
||||
# state from the DB.)
|
||||
self._state = frozendict(state) if state is not None else None
|
||||
self.state = frozendict(state)
|
||||
|
||||
# the ID of a state group if one and only one is involved.
|
||||
# otherwise, None otherwise?
|
||||
@@ -108,30 +102,20 @@ class _StateCacheEntry:
|
||||
self.prev_group = prev_group
|
||||
self.delta_ids = frozendict(delta_ids) if delta_ids is not None else None
|
||||
|
||||
async def get_state(
|
||||
self,
|
||||
state_storage: "StateStorageController",
|
||||
state_filter: Optional["StateFilter"] = None,
|
||||
) -> StateMap[str]:
|
||||
"""Get the state map for this entry, either from the in-memory state or
|
||||
looking up the state group in the DB.
|
||||
"""
|
||||
|
||||
if self._state is not None:
|
||||
return self._state
|
||||
|
||||
assert self.state_group is not None
|
||||
|
||||
return await state_storage.get_state_ids_for_group(
|
||||
self.state_group, state_filter
|
||||
)
|
||||
# The `state_id` is a unique ID we generate that can be used as ID for
|
||||
# this collection of state. Usually this would be the same as the
|
||||
# state group, but on worker instances we can't generate a new state
|
||||
# group each time we resolve state, so we generate a separate one that
|
||||
# isn't persisted and is used solely for caches.
|
||||
# `state_id` is either a state_group (and so an int) or a string. This
|
||||
# ensures we don't accidentally persist a state_id as a stateg_group
|
||||
if state_group:
|
||||
self.state_id: Union[str, int] = state_group
|
||||
else:
|
||||
self.state_id = _gen_state_id()
|
||||
|
||||
def __len__(self) -> int:
|
||||
# The len should is used to estimate how large this cache entry is, for
|
||||
# cache eviction purposes. This is why if `self.state` is None it's fine
|
||||
# to return 1.
|
||||
|
||||
return len(self._state) if self._state else 1
|
||||
return len(self.state)
|
||||
|
||||
|
||||
class StateHandler:
|
||||
@@ -153,28 +137,23 @@ class StateHandler:
|
||||
ReplicationUpdateCurrentStateRestServlet.make_client(hs)
|
||||
)
|
||||
|
||||
async def compute_state_after_events(
|
||||
async def get_current_state_ids(
|
||||
self,
|
||||
room_id: str,
|
||||
event_ids: Collection[str],
|
||||
latest_event_ids: Collection[str],
|
||||
) -> StateMap[str]:
|
||||
"""Fetch the state after each of the given event IDs. Resolve them and return.
|
||||
|
||||
This is typically used where `event_ids` is a collection of forward extremities
|
||||
in a room, intended to become the `prev_events` of a new event E. If so, the
|
||||
return value of this function represents the state before E.
|
||||
"""Get the current state, or the state at a set of events, for a room
|
||||
|
||||
Args:
|
||||
room_id: the room_id containing the given events.
|
||||
event_ids: the events whose state should be fetched and resolved.
|
||||
room_id:
|
||||
latest_event_ids: The forward extremities to resolve.
|
||||
|
||||
Returns:
|
||||
the state dict (a mapping from (event_type, state_key) -> event_id) which
|
||||
holds the resolution of the states after the given event IDs.
|
||||
the state dict, mapping from (event_type, state_key) -> event_id
|
||||
"""
|
||||
logger.debug("calling resolve_state_groups from compute_state_after_events")
|
||||
ret = await self.resolve_state_groups_for_events(room_id, event_ids)
|
||||
return await ret.get_state(self._state_storage_controller, StateFilter.all())
|
||||
logger.debug("calling resolve_state_groups from get_current_state_ids")
|
||||
ret = await self.resolve_state_groups_for_events(room_id, latest_event_ids)
|
||||
return ret.state
|
||||
|
||||
async def get_current_users_in_room(
|
||||
self, room_id: str, latest_event_ids: List[str]
|
||||
@@ -198,8 +177,7 @@ class StateHandler:
|
||||
|
||||
logger.debug("calling resolve_state_groups from get_current_users_in_room")
|
||||
entry = await self.resolve_state_groups_for_events(room_id, latest_event_ids)
|
||||
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
|
||||
return await self.store.get_joined_users_from_state(room_id, state, entry)
|
||||
return await self.store.get_joined_users_from_state(room_id, entry)
|
||||
|
||||
async def get_hosts_in_room_at_events(
|
||||
self, room_id: str, event_ids: Collection[str]
|
||||
@@ -214,8 +192,7 @@ class StateHandler:
|
||||
The hosts in the room at the given events
|
||||
"""
|
||||
entry = await self.resolve_state_groups_for_events(room_id, event_ids)
|
||||
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
|
||||
return await self.store.get_joined_hosts(room_id, state, entry)
|
||||
return await self.store.get_joined_hosts(room_id, entry)
|
||||
|
||||
async def compute_event_context(
|
||||
self,
|
||||
@@ -250,19 +227,10 @@ class StateHandler:
|
||||
#
|
||||
if state_ids_before_event:
|
||||
# if we're given the state before the event, then we use that
|
||||
state_group_before_event = None
|
||||
state_group_before_event_prev_group = None
|
||||
deltas_to_state_group_before_event = None
|
||||
|
||||
# .. though we need to get a state group for it.
|
||||
state_group_before_event = (
|
||||
await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=None,
|
||||
delta_ids=None,
|
||||
current_state_ids=state_ids_before_event,
|
||||
)
|
||||
)
|
||||
entry = None
|
||||
|
||||
else:
|
||||
# otherwise, we'll need to resolve the state across the prev_events.
|
||||
@@ -296,32 +264,36 @@ class StateHandler:
|
||||
await_full_state=False,
|
||||
)
|
||||
|
||||
state_ids_before_event = entry.state
|
||||
state_group_before_event = entry.state_group
|
||||
state_group_before_event_prev_group = entry.prev_group
|
||||
deltas_to_state_group_before_event = entry.delta_ids
|
||||
state_ids_before_event = None
|
||||
|
||||
# We make sure that we have a state group assigned to the state.
|
||||
if entry.state_group is None:
|
||||
# store_state_group requires us to have either a previous state group
|
||||
# (with deltas) or the complete state map. So, if we don't have a
|
||||
# previous state group, load the complete state map now.
|
||||
if state_group_before_event_prev_group is None:
|
||||
state_ids_before_event = await entry.get_state(
|
||||
self._state_storage_controller, StateFilter.all()
|
||||
)
|
||||
#
|
||||
# make sure that we have a state group at that point. If it's not a state event,
|
||||
# that will be the state group for the new event. If it *is* a state event,
|
||||
# it might get rejected (in which case we'll need to persist it with the
|
||||
# previous state group)
|
||||
#
|
||||
|
||||
state_group_before_event = (
|
||||
await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=state_group_before_event_prev_group,
|
||||
delta_ids=deltas_to_state_group_before_event,
|
||||
current_state_ids=state_ids_before_event,
|
||||
)
|
||||
if not state_group_before_event:
|
||||
state_group_before_event = (
|
||||
await self._state_storage_controller.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=state_group_before_event_prev_group,
|
||||
delta_ids=deltas_to_state_group_before_event,
|
||||
current_state_ids=state_ids_before_event,
|
||||
)
|
||||
)
|
||||
|
||||
# Assign the new state group to the cached state entry.
|
||||
#
|
||||
# Note that this can race in that we could generate multiple state
|
||||
# groups for the same state entry, but that is just inefficient
|
||||
# rather than dangerous.
|
||||
if entry and entry.state_group is None:
|
||||
entry.state_group = state_group_before_event
|
||||
else:
|
||||
state_group_before_event = entry.state_group
|
||||
|
||||
#
|
||||
# now if it's not a state event, we're done
|
||||
@@ -343,18 +315,13 @@ class StateHandler:
|
||||
#
|
||||
|
||||
key = (event.type, event.state_key)
|
||||
if key in state_ids_before_event:
|
||||
replaces = state_ids_before_event[key]
|
||||
if replaces != event.event_id:
|
||||
event.unsigned["replaces_state"] = replaces
|
||||
|
||||
if state_ids_before_event is not None:
|
||||
replaces = state_ids_before_event.get(key)
|
||||
else:
|
||||
replaces_state_map = await entry.get_state(
|
||||
self._state_storage_controller, StateFilter.from_types([key])
|
||||
)
|
||||
replaces = replaces_state_map.get(key)
|
||||
|
||||
if replaces and replaces != event.event_id:
|
||||
event.unsigned["replaces_state"] = replaces
|
||||
|
||||
state_ids_after_event = dict(state_ids_before_event)
|
||||
state_ids_after_event[key] = event.event_id
|
||||
delta_ids = {key: event.event_id}
|
||||
|
||||
state_group_after_event = (
|
||||
@@ -363,7 +330,7 @@ class StateHandler:
|
||||
event.room_id,
|
||||
prev_group=state_group_before_event,
|
||||
delta_ids=delta_ids,
|
||||
current_state_ids=None,
|
||||
current_state_ids=state_ids_after_event,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -405,6 +372,9 @@ class StateHandler:
|
||||
state_group_ids_set = set(state_group_ids)
|
||||
if len(state_group_ids_set) == 1:
|
||||
(state_group_id,) = state_group_ids_set
|
||||
state = await self._state_storage_controller.get_state_for_groups(
|
||||
state_group_ids_set
|
||||
)
|
||||
(
|
||||
prev_group,
|
||||
delta_ids,
|
||||
@@ -412,7 +382,7 @@ class StateHandler:
|
||||
state_group_id
|
||||
)
|
||||
return _StateCacheEntry(
|
||||
state=None,
|
||||
state=state[state_group_id],
|
||||
state_group=state_group_id,
|
||||
prev_group=prev_group,
|
||||
delta_ids=delta_ids,
|
||||
@@ -435,6 +405,31 @@ class StateHandler:
|
||||
)
|
||||
return result
|
||||
|
||||
async def resolve_events(
|
||||
self,
|
||||
room_version: str,
|
||||
state_sets: Collection[Iterable[EventBase]],
|
||||
event: EventBase,
|
||||
) -> StateMap[EventBase]:
|
||||
logger.info(
|
||||
"Resolving state for %s with %d groups", event.room_id, len(state_sets)
|
||||
)
|
||||
state_set_ids = [
|
||||
{(ev.type, ev.state_key): ev.event_id for ev in st} for st in state_sets
|
||||
]
|
||||
|
||||
state_map = {ev.event_id: ev for st in state_sets for ev in st}
|
||||
|
||||
new_state = await self._state_resolution_handler.resolve_events_with_store(
|
||||
event.room_id,
|
||||
room_version,
|
||||
state_set_ids,
|
||||
event_map=state_map,
|
||||
state_res_store=StateResolutionStore(self.store),
|
||||
)
|
||||
|
||||
return {key: state_map[ev_id] for key, ev_id in new_state.items()}
|
||||
|
||||
async def update_current_state(self, room_id: str) -> None:
|
||||
"""Recalculates the current state for a room, and persists it.
|
||||
|
||||
@@ -757,12 +752,6 @@ def _make_state_cache_entry(
|
||||
delta_ids: Optional[StateMap[str]] = None
|
||||
|
||||
for old_group, old_state in state_groups_ids.items():
|
||||
if old_state.keys() - new_state.keys():
|
||||
# Currently we don't support deltas that remove keys from the state
|
||||
# map, so we have to ignore this group as a candidate to base the
|
||||
# new group on.
|
||||
continue
|
||||
|
||||
n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v}
|
||||
if not delta_ids or len(n_delta_ids) < len(delta_ids):
|
||||
prev_group = old_group
|
||||
|
||||
@@ -43,6 +43,4 @@ class StorageControllers:
|
||||
|
||||
self.persistence = None
|
||||
if stores.persist_events:
|
||||
self.persistence = EventsPersistenceStorageController(
|
||||
hs, stores, self.state
|
||||
)
|
||||
self.persistence = EventsPersistenceStorageController(hs, stores)
|
||||
|
||||
@@ -48,11 +48,9 @@ from synapse.events.snapshot import EventContext
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.controllers.state import StateStorageController
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
@@ -310,12 +308,7 @@ class EventsPersistenceStorageController:
|
||||
current state and forward extremity changes.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: "HomeServer",
|
||||
stores: Databases,
|
||||
state_controller: StateStorageController,
|
||||
):
|
||||
def __init__(self, hs: "HomeServer", stores: Databases):
|
||||
# We ultimately want to split out the state store from the main store,
|
||||
# so we use separate variables here even though they point to the same
|
||||
# store for now.
|
||||
@@ -332,7 +325,6 @@ class EventsPersistenceStorageController:
|
||||
self._process_event_persist_queue_task
|
||||
)
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
self._state_controller = state_controller
|
||||
|
||||
async def _process_event_persist_queue_task(
|
||||
self,
|
||||
@@ -512,7 +504,7 @@ class EventsPersistenceStorageController:
|
||||
state_res_store=StateResolutionStore(self.main_store),
|
||||
)
|
||||
|
||||
return await res.get_state(self._state_controller, StateFilter.all())
|
||||
return res.state
|
||||
|
||||
async def _persist_event_batch(
|
||||
self, _room_id: str, task: _PersistEventsTask
|
||||
@@ -948,8 +940,7 @@ class EventsPersistenceStorageController:
|
||||
events_context,
|
||||
)
|
||||
|
||||
full_state = await res.get_state(self._state_controller)
|
||||
return full_state, None, new_latest_event_ids
|
||||
return res.state, None, new_latest_event_ids
|
||||
|
||||
async def _prune_extremities(
|
||||
self,
|
||||
|
||||
@@ -346,7 +346,7 @@ class StateStorageController:
|
||||
room_id: str,
|
||||
prev_group: Optional[int],
|
||||
delta_ids: Optional[StateMap[str]],
|
||||
current_state_ids: Optional[StateMap[str]],
|
||||
current_state_ids: StateMap[str],
|
||||
) -> int:
|
||||
"""Store a new set of state, returning a newly assigned state group.
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.types import Connection, Cursor
|
||||
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
|
||||
from synapse.util.async_helpers import delay_cancellation
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -818,14 +818,12 @@ class DatabasePool:
|
||||
)
|
||||
|
||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||
await maybe_awaitable(after_callback(*after_args, **after_kwargs))
|
||||
after_callback(*after_args, **after_kwargs)
|
||||
|
||||
return cast(R, result)
|
||||
except Exception:
|
||||
for exception_callback, after_args, after_kwargs in exception_callbacks:
|
||||
await maybe_awaitable(
|
||||
exception_callback(*after_args, **after_kwargs)
|
||||
)
|
||||
for after_callback, after_args, after_kwargs in exception_callbacks:
|
||||
after_callback(*after_args, **after_kwargs)
|
||||
raise
|
||||
|
||||
# To handle cancellation, we ensure that `after_callback`s and
|
||||
|
||||
@@ -371,30 +371,52 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
device_list_summary=DeviceListUpdates(),
|
||||
)
|
||||
|
||||
async def get_appservice_last_pos(self) -> int:
|
||||
"""
|
||||
Get the last stream ordering position for the appservice process.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="appservice_stream_position",
|
||||
retcol="stream_ordering",
|
||||
keyvalues={},
|
||||
desc="get_appservice_last_pos",
|
||||
)
|
||||
|
||||
async def set_appservice_last_pos(self, pos: int) -> None:
|
||||
"""
|
||||
Set the last stream ordering position for the appservice process.
|
||||
"""
|
||||
def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None:
|
||||
txn.execute(
|
||||
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
|
||||
)
|
||||
|
||||
await self.db_pool.simple_update_one(
|
||||
table="appservice_stream_position",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_ordering": pos},
|
||||
desc="set_appservice_last_pos",
|
||||
await self.db_pool.runInteraction(
|
||||
"set_appservice_last_pos", set_appservice_last_pos_txn
|
||||
)
|
||||
|
||||
async def get_new_events_for_appservice(
|
||||
self, current_id: int, limit: int
|
||||
) -> Tuple[int, List[EventBase]]:
|
||||
"""Get all new events for an appservice"""
|
||||
|
||||
def get_new_events_for_appservice_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[int, List[str]]:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id"
|
||||
" FROM events AS e"
|
||||
" WHERE"
|
||||
" (SELECT stream_ordering FROM appservice_stream_position)"
|
||||
" < e.stream_ordering"
|
||||
" AND e.stream_ordering <= ?"
|
||||
" ORDER BY e.stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (current_id, limit))
|
||||
rows = txn.fetchall()
|
||||
|
||||
upper_bound = current_id
|
||||
if len(rows) == limit:
|
||||
upper_bound = rows[-1][0]
|
||||
|
||||
return upper_bound, [row[1] for row in rows]
|
||||
|
||||
upper_bound, event_ids = await self.db_pool.runInteraction(
|
||||
"get_new_events_for_appservice", get_new_events_for_appservice_txn
|
||||
)
|
||||
|
||||
events = await self.get_events_as_list(event_ids, get_prev_content=True)
|
||||
|
||||
return upper_bound, events
|
||||
|
||||
async def get_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, type: str
|
||||
) -> int:
|
||||
|
||||
@@ -193,10 +193,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
relates_to: Optional[str],
|
||||
backfilled: bool,
|
||||
) -> None:
|
||||
# This invalidates any local in-memory cached event objects, the original
|
||||
# process triggering the invalidation is responsible for clearing any external
|
||||
# cached objects.
|
||||
self._invalidate_local_get_event_cache(event_id)
|
||||
self._invalidate_get_event_cache(event_id)
|
||||
self.have_seen_event.invalidate((room_id, event_id))
|
||||
|
||||
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
||||
@@ -211,7 +208,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
||||
|
||||
if redacts:
|
||||
self._invalidate_local_get_event_cache(redacts)
|
||||
self._invalidate_get_event_cache(redacts)
|
||||
# Caches which might leak edits must be invalidated for the event being
|
||||
# redacted.
|
||||
self.get_relations_for_event.invalidate((redacts,))
|
||||
|
||||
@@ -1669,9 +1669,9 @@ class PersistEventsStore:
|
||||
if not row["rejects"] and not row["redacts"]:
|
||||
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
|
||||
|
||||
async def prefill() -> None:
|
||||
def prefill() -> None:
|
||||
for cache_entry in to_prefill:
|
||||
await self.store._get_event_cache.set(
|
||||
self.store._get_event_cache.set(
|
||||
(cache_entry.event.event_id,), cache_entry
|
||||
)
|
||||
|
||||
|
||||
@@ -67,8 +67,6 @@ class _BackgroundUpdates:
|
||||
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
|
||||
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
|
||||
|
||||
EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _CalculateChainCover:
|
||||
@@ -255,11 +253,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
replaces_index="ev_edges_id",
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
|
||||
self._background_events_populate_state_key_rejections,
|
||||
)
|
||||
|
||||
async def _background_reindex_fields_sender(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
@@ -1406,83 +1399,3 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
async def _background_events_populate_state_key_rejections(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""Back-populate `events.state_key` and `events.rejection_reason"""
|
||||
|
||||
min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
|
||||
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]
|
||||
|
||||
def _populate_txn(txn: LoggingTransaction) -> bool:
|
||||
"""Returns True if we're done."""
|
||||
|
||||
# first we need to find an endpoint.
|
||||
# we need to find the final row in the batch of batch_size, which means
|
||||
# we need to skip over (batch_size-1) rows and get the next row.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering
|
||||
LIMIT 1 OFFSET ?
|
||||
""",
|
||||
(
|
||||
min_stream_ordering_exclusive,
|
||||
max_stream_ordering_inclusive,
|
||||
batch_size - 1,
|
||||
),
|
||||
)
|
||||
|
||||
endpoint = None
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
endpoint = row[0]
|
||||
|
||||
where_clause = "stream_ordering > ?"
|
||||
args = [min_stream_ordering_exclusive]
|
||||
if endpoint:
|
||||
where_clause += " AND stream_ordering <= ?"
|
||||
args.append(endpoint)
|
||||
|
||||
# now do the updates.
|
||||
txn.execute(
|
||||
f"""
|
||||
UPDATE events
|
||||
SET state_key = (SELECT state_key FROM state_events se WHERE se.event_id = events.event_id),
|
||||
rejection_reason = (SELECT reason FROM rejections rej WHERE rej.event_id = events.event_id)
|
||||
WHERE ({where_clause})
|
||||
""",
|
||||
args,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"populated new `events` columns up to %s/%i: updated %i rows",
|
||||
endpoint,
|
||||
max_stream_ordering_inclusive,
|
||||
txn.rowcount,
|
||||
)
|
||||
|
||||
if endpoint is None:
|
||||
# we're done
|
||||
return True
|
||||
|
||||
progress["min_stream_ordering_exclusive"] = endpoint
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
|
||||
progress,
|
||||
)
|
||||
return False
|
||||
|
||||
done = await self.db_pool.runInteraction(
|
||||
desc="events_populate_state_key_rejections", func=_populate_txn
|
||||
)
|
||||
|
||||
if done:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
@@ -79,7 +79,7 @@ from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.lrucache import AsyncLruCache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.iterutils import batch_iter
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -238,9 +238,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
5 * 60 * 1000,
|
||||
)
|
||||
|
||||
self._get_event_cache: AsyncLruCache[
|
||||
Tuple[str], EventCacheEntry
|
||||
] = AsyncLruCache(
|
||||
self._get_event_cache: LruCache[Tuple[str], EventCacheEntry] = LruCache(
|
||||
cache_name="*getEvent*",
|
||||
max_size=hs.config.caches.event_cache_size,
|
||||
)
|
||||
@@ -294,6 +292,25 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
async def get_received_ts(self, event_id: str) -> Optional[int]:
|
||||
"""Get received_ts (when it was persisted) for the event.
|
||||
|
||||
Raises an exception for unknown events.
|
||||
|
||||
Args:
|
||||
event_id: The event ID to query.
|
||||
|
||||
Returns:
|
||||
Timestamp in milliseconds, or None for events that were persisted
|
||||
before received_ts was implemented.
|
||||
"""
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
keyvalues={"event_id": event_id},
|
||||
retcol="received_ts",
|
||||
desc="get_received_ts",
|
||||
)
|
||||
|
||||
async def have_censored_event(self, event_id: str) -> bool:
|
||||
"""Check if an event has been censored, i.e. if the content of the event has been erased
|
||||
from the database due to a redaction.
|
||||
@@ -600,7 +617,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
Returns:
|
||||
map from event id to result
|
||||
"""
|
||||
event_entry_map = await self._get_events_from_cache(
|
||||
event_entry_map = self._get_events_from_cache(
|
||||
event_ids,
|
||||
)
|
||||
|
||||
@@ -712,22 +729,12 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
return event_entry_map
|
||||
|
||||
async def _invalidate_get_event_cache(self, event_id: str) -> None:
|
||||
# First we invalidate the asynchronous cache instance. This may include
|
||||
# out-of-process caches such as Redis/memcache. Once complete we can
|
||||
# invalidate any in memory cache. The ordering is important here to
|
||||
# ensure we don't pull in any remote invalid value after we invalidate
|
||||
# the in-memory cache.
|
||||
await self._get_event_cache.invalidate((event_id,))
|
||||
def _invalidate_get_event_cache(self, event_id: str) -> None:
|
||||
self._get_event_cache.invalidate((event_id,))
|
||||
self._event_ref.pop(event_id, None)
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
|
||||
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
|
||||
self._get_event_cache.invalidate_local((event_id,))
|
||||
self._event_ref.pop(event_id, None)
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
|
||||
async def _get_events_from_cache(
|
||||
def _get_events_from_cache(
|
||||
self, events: Iterable[str], update_metrics: bool = True
|
||||
) -> Dict[str, EventCacheEntry]:
|
||||
"""Fetch events from the caches.
|
||||
@@ -742,7 +749,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
for event_id in events:
|
||||
# First check if it's in the event cache
|
||||
ret = await self._get_event_cache.get(
|
||||
ret = self._get_event_cache.get(
|
||||
(event_id,), None, update_metrics=update_metrics
|
||||
)
|
||||
if ret:
|
||||
@@ -764,7 +771,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
# We add the entry back into the cache as we want to keep
|
||||
# recently queried events in the cache.
|
||||
await self._get_event_cache.set((event_id,), cache_entry)
|
||||
self._get_event_cache.set((event_id,), cache_entry)
|
||||
|
||||
return event_map
|
||||
|
||||
@@ -1141,7 +1148,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
event=original_ev, redacted_event=redacted_event
|
||||
)
|
||||
|
||||
await self._get_event_cache.set((event_id,), cache_entry)
|
||||
self._get_event_cache.set((event_id,), cache_entry)
|
||||
result_map[event_id] = cache_entry
|
||||
|
||||
if not redacted_event:
|
||||
@@ -1375,9 +1382,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
# if the event cache contains the event, obviously we've seen it.
|
||||
|
||||
cache_results = {
|
||||
(rid, eid)
|
||||
for (rid, eid) in keys
|
||||
if await self._get_event_cache.contains((eid,))
|
||||
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
|
||||
}
|
||||
results = dict.fromkeys(cache_results, True)
|
||||
remaining = [k for k in keys if k not in cache_results]
|
||||
|
||||
@@ -302,7 +302,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.have_seen_event, (room_id, event_id)
|
||||
)
|
||||
txn.call_after(self._invalidate_get_event_cache, event_id)
|
||||
self._invalidate_get_event_cache(event_id)
|
||||
|
||||
logger.info("[purge] done")
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
rooms.creator, state.encryption, state.is_federatable AS federatable,
|
||||
rooms.is_public AS public, state.join_rules, state.guest_access,
|
||||
state.history_visibility, curr.current_state_events AS state_events,
|
||||
state.avatar, state.topic, state.room_type
|
||||
state.avatar, state.topic
|
||||
FROM rooms
|
||||
LEFT JOIN room_stats_state state USING (room_id)
|
||||
LEFT JOIN room_stats_current curr USING (room_id)
|
||||
@@ -596,8 +596,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members,
|
||||
curr.local_users_in_room, rooms.room_version, rooms.creator,
|
||||
state.encryption, state.is_federatable, rooms.is_public, state.join_rules,
|
||||
state.guest_access, state.history_visibility, curr.current_state_events,
|
||||
state.room_type
|
||||
state.guest_access, state.history_visibility, curr.current_state_events
|
||||
FROM room_stats_state state
|
||||
INNER JOIN room_stats_current curr USING (room_id)
|
||||
INNER JOIN rooms USING (room_id)
|
||||
@@ -647,7 +646,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
"guest_access": room[11],
|
||||
"history_visibility": room[12],
|
||||
"state_events": room[13],
|
||||
"room_type": room[14],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ import attr
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
@@ -779,8 +780,26 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
return shared_room_ids or frozenset()
|
||||
|
||||
async def get_joined_users_from_context(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
state_group: Union[object, int] = context.state_group
|
||||
if not state_group:
|
||||
# If state_group is None it means it has yet to be assigned a
|
||||
# state group, i.e. we need to make sure that calls with a state_group
|
||||
# of None don't hit previous cached calls with a None state_group.
|
||||
# To do this we set the state_group to a new object as object() != object()
|
||||
state_group = object()
|
||||
|
||||
current_state_ids = await context.get_current_state_ids()
|
||||
assert current_state_ids is not None
|
||||
assert state_group is not None
|
||||
return await self._get_joined_users_from_context(
|
||||
event.room_id, state_group, current_state_ids, event=event, context=context
|
||||
)
|
||||
|
||||
async def get_joined_users_from_state(
|
||||
self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
|
||||
self, room_id: str, state_entry: "_StateCacheEntry"
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
state_group: Union[object, int] = state_entry.state_group
|
||||
if not state_group:
|
||||
@@ -793,17 +812,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
assert state_group is not None
|
||||
with Measure(self._clock, "get_joined_users_from_state"):
|
||||
return await self._get_joined_users_from_context(
|
||||
room_id, state_group, state, context=state_entry
|
||||
room_id, state_group, state_entry.state, context=state_entry
|
||||
)
|
||||
|
||||
@cached(num_args=2, iterable=True, max_entries=100000)
|
||||
@cached(num_args=2, cache_context=True, iterable=True, max_entries=100000)
|
||||
async def _get_joined_users_from_context(
|
||||
self,
|
||||
room_id: str,
|
||||
state_group: Union[object, int],
|
||||
current_state_ids: StateMap[str],
|
||||
cache_context: _CacheContext,
|
||||
event: Optional[EventBase] = None,
|
||||
context: Optional["_StateCacheEntry"] = None,
|
||||
context: Optional[Union[EventContext, "_StateCacheEntry"]] = None,
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
# We don't use `state_group`, it's there so that we can cache based
|
||||
# on it. However, it's important that it's never None, since two current_states
|
||||
@@ -843,9 +863,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
# We don't update the event cache hit ratio as it completely throws off
|
||||
# the hit ratio counts. After all, we don't populate the cache if we
|
||||
# miss it here
|
||||
event_map = await self._get_events_from_cache(
|
||||
member_event_ids, update_metrics=False
|
||||
)
|
||||
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
|
||||
|
||||
missing_member_event_ids = []
|
||||
for event_id in member_event_ids:
|
||||
@@ -999,7 +1017,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
)
|
||||
|
||||
async def get_joined_hosts(
|
||||
self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
|
||||
self, room_id: str, state_entry: "_StateCacheEntry"
|
||||
) -> FrozenSet[str]:
|
||||
state_group: Union[object, int] = state_entry.state_group
|
||||
if not state_group:
|
||||
@@ -1012,7 +1030,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
assert state_group is not None
|
||||
with Measure(self._clock, "get_joined_hosts"):
|
||||
return await self._get_joined_hosts(
|
||||
room_id, state_group, state, state_entry=state_entry
|
||||
room_id, state_group, state_entry=state_entry
|
||||
)
|
||||
|
||||
@cached(num_args=2, max_entries=10000, iterable=True)
|
||||
@@ -1020,7 +1038,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
self,
|
||||
room_id: str,
|
||||
state_group: Union[object, int],
|
||||
state: StateMap[str],
|
||||
state_entry: "_StateCacheEntry",
|
||||
) -> FrozenSet[str]:
|
||||
# We don't use `state_group`, it's there so that we can cache based on
|
||||
@@ -1076,7 +1093,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
# The cache doesn't match the state group or prev state group,
|
||||
# so we calculate the result from first principles.
|
||||
joined_users = await self.get_joined_users_from_state(
|
||||
room_id, state, state_entry
|
||||
room_id, state_entry
|
||||
)
|
||||
|
||||
cache.hosts_to_joined_users = {}
|
||||
|
||||
@@ -1022,8 +1022,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
}
|
||||
|
||||
async def get_all_new_events_stream(
|
||||
self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
|
||||
) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
|
||||
self, from_id: int, current_id: int, limit: int
|
||||
) -> Tuple[int, List[EventBase]]:
|
||||
"""Get all new events
|
||||
|
||||
Returns all events with from_id < stream_ordering <= current_id.
|
||||
@@ -1032,21 +1032,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
from_id: the stream_ordering of the last event we processed
|
||||
current_id: the stream_ordering of the most recently processed event
|
||||
limit: the maximum number of events to return
|
||||
get_prev_content: whether to fetch previous event content
|
||||
|
||||
Returns:
|
||||
A tuple of (next_id, events, event_to_received_ts), where `next_id`
|
||||
is the next value to pass as `from_id` (it will either be the
|
||||
stream_ordering of the last returned event, or, if fewer than `limit`
|
||||
events were found, the `current_id`). The `event_to_received_ts` is
|
||||
a dictionary mapping event ID to the event `received_ts`.
|
||||
A tuple of (next_id, events), where `next_id` is the next value to
|
||||
pass as `from_id` (it will either be the stream_ordering of the
|
||||
last returned event, or, if fewer than `limit` events were found,
|
||||
the `current_id`).
|
||||
"""
|
||||
|
||||
def get_all_new_events_stream_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[int, Dict[str, Optional[int]]]:
|
||||
) -> Tuple[int, List[str]]:
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id, e.received_ts"
|
||||
"SELECT e.stream_ordering, e.event_id"
|
||||
" FROM events AS e"
|
||||
" WHERE"
|
||||
" ? < e.stream_ordering AND e.stream_ordering <= ?"
|
||||
@@ -1061,21 +1059,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
if len(rows) == limit:
|
||||
upper_bound = rows[-1][0]
|
||||
|
||||
event_to_received_ts: Dict[str, Optional[int]] = {
|
||||
row[1]: row[2] for row in rows
|
||||
}
|
||||
return upper_bound, event_to_received_ts
|
||||
return upper_bound, [row[1] for row in rows]
|
||||
|
||||
upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
|
||||
upper_bound, event_ids = await self.db_pool.runInteraction(
|
||||
"get_all_new_events_stream", get_all_new_events_stream_txn
|
||||
)
|
||||
|
||||
events = await self.get_events_as_list(
|
||||
event_to_received_ts.keys(),
|
||||
get_prev_content=get_prev_content,
|
||||
)
|
||||
events = await self.get_events_as_list(event_ids)
|
||||
|
||||
return upper_bound, events, event_to_received_ts
|
||||
return upper_bound, events
|
||||
|
||||
async def get_federation_out_pos(self, typ: str) -> int:
|
||||
if self._need_to_reset_federation_stream_positions:
|
||||
|
||||
@@ -202,14 +202,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
requests state from the cache, if False we need to query the DB for the
|
||||
missing state.
|
||||
"""
|
||||
# If we are asked explicitly for a subset of keys, we only ask for those
|
||||
# from the cache. This ensures that the `DictionaryCache` can make
|
||||
# better decisions about what to cache and what to expire.
|
||||
dict_keys = None
|
||||
if not state_filter.has_wildcards():
|
||||
dict_keys = state_filter.concrete_types()
|
||||
|
||||
cache_entry = cache.get(group, dict_keys=dict_keys)
|
||||
cache_entry = cache.get(group)
|
||||
state_dict_ids = cache_entry.value
|
||||
|
||||
if cache_entry.full or state_filter.is_full():
|
||||
@@ -407,17 +400,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
room_id: str,
|
||||
prev_group: Optional[int],
|
||||
delta_ids: Optional[StateMap[str]],
|
||||
current_state_ids: Optional[StateMap[str]],
|
||||
current_state_ids: StateMap[str],
|
||||
) -> int:
|
||||
"""Store a new set of state, returning a newly assigned state group.
|
||||
|
||||
At least one of `current_state_ids` and `prev_group` must be provided. Whenever
|
||||
`prev_group` is not None, `delta_ids` must also not be None.
|
||||
|
||||
Args:
|
||||
event_id: The event ID for which the state was calculated
|
||||
room_id
|
||||
prev_group: A previous state group for the room.
|
||||
prev_group: A previous state group for the room, optional.
|
||||
delta_ids: The delta between state at `prev_group` and
|
||||
`current_state_ids`, if `prev_group` was given. Same format as
|
||||
`current_state_ids`.
|
||||
@@ -428,89 +418,64 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
The state group ID
|
||||
"""
|
||||
|
||||
if prev_group is None and current_state_ids is None:
|
||||
raise Exception("current_state_ids and prev_group can't both be None")
|
||||
def _store_state_group_txn(txn: LoggingTransaction) -> int:
|
||||
if current_state_ids is None:
|
||||
# AFAIK, this can never happen
|
||||
raise Exception("current_state_ids cannot be None")
|
||||
|
||||
if prev_group is not None and delta_ids is None:
|
||||
raise Exception("delta_ids is None when prev_group is not None")
|
||||
state_group = self._state_group_seq_gen.get_next_id_txn(txn)
|
||||
|
||||
def insert_delta_group_txn(
|
||||
txn: LoggingTransaction, prev_group: int, delta_ids: StateMap[str]
|
||||
) -> Optional[int]:
|
||||
"""Try and persist the new group as a delta.
|
||||
|
||||
Requires that we have the state as a delta from a previous state group.
|
||||
|
||||
Returns:
|
||||
The state group if successfully created, or None if the state
|
||||
needs to be persisted as a full state.
|
||||
"""
|
||||
is_in_db = self.db_pool.simple_select_one_onecol_txn(
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
values={"id": state_group, "room_id": room_id, "event_id": event_id},
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (prev_group,)
|
||||
|
||||
# We persist as a delta if we can, while also ensuring the chain
|
||||
# of deltas isn't tooo long, as otherwise read performance degrades.
|
||||
if prev_group:
|
||||
is_in_db = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
keyvalues={"id": prev_group},
|
||||
retcol="id",
|
||||
allow_none=True,
|
||||
)
|
||||
if not is_in_db:
|
||||
raise Exception(
|
||||
"Trying to persist state with unpersisted prev_group: %r"
|
||||
% (prev_group,)
|
||||
)
|
||||
|
||||
potential_hops = self._count_state_group_hops_txn(txn, prev_group)
|
||||
if prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
|
||||
assert delta_ids is not None
|
||||
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
values={"state_group": state_group, "prev_state_group": prev_group},
|
||||
)
|
||||
|
||||
# if the chain of state group deltas is going too long, we fall back to
|
||||
# persisting a complete state group.
|
||||
potential_hops = self._count_state_group_hops_txn(txn, prev_group)
|
||||
if potential_hops >= MAX_STATE_DELTA_HOPS:
|
||||
return None
|
||||
|
||||
state_group = self._state_group_seq_gen.get_next_id_txn(txn)
|
||||
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
values={"id": state_group, "room_id": room_id, "event_id": event_id},
|
||||
)
|
||||
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
values={"state_group": state_group, "prev_state_group": prev_group},
|
||||
)
|
||||
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||
values=[
|
||||
(state_group, room_id, key[0], key[1], state_id)
|
||||
for key, state_id in delta_ids.items()
|
||||
],
|
||||
)
|
||||
|
||||
return state_group
|
||||
|
||||
def insert_full_state_txn(
|
||||
txn: LoggingTransaction, current_state_ids: StateMap[str]
|
||||
) -> int:
|
||||
"""Persist the full state, returning the new state group."""
|
||||
state_group = self._state_group_seq_gen.get_next_id_txn(txn)
|
||||
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
values={"id": state_group, "room_id": room_id, "event_id": event_id},
|
||||
)
|
||||
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||
values=[
|
||||
(state_group, room_id, key[0], key[1], state_id)
|
||||
for key, state_id in current_state_ids.items()
|
||||
],
|
||||
)
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||
values=[
|
||||
(state_group, room_id, key[0], key[1], state_id)
|
||||
for key, state_id in delta_ids.items()
|
||||
],
|
||||
)
|
||||
else:
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
keys=("state_group", "room_id", "type", "state_key", "event_id"),
|
||||
values=[
|
||||
(state_group, room_id, key[0], key[1], state_id)
|
||||
for key, state_id in current_state_ids.items()
|
||||
],
|
||||
)
|
||||
|
||||
# Prefill the state group caches with this group.
|
||||
# It's fine to use the sequence like this as the state group map
|
||||
@@ -526,7 +491,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
self._state_group_members_cache.update,
|
||||
self._state_group_members_cache.sequence,
|
||||
key=state_group,
|
||||
value=current_member_state_ids,
|
||||
value=dict(current_member_state_ids),
|
||||
)
|
||||
|
||||
current_non_member_state_ids = {
|
||||
@@ -538,35 +503,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
self._state_group_cache.update,
|
||||
self._state_group_cache.sequence,
|
||||
key=state_group,
|
||||
value=current_non_member_state_ids,
|
||||
value=dict(current_non_member_state_ids),
|
||||
)
|
||||
|
||||
return state_group
|
||||
|
||||
if prev_group is not None:
|
||||
state_group = await self.db_pool.runInteraction(
|
||||
"store_state_group.insert_delta_group",
|
||||
insert_delta_group_txn,
|
||||
prev_group,
|
||||
delta_ids,
|
||||
)
|
||||
if state_group is not None:
|
||||
return state_group
|
||||
|
||||
# We're going to persist the state as a complete group rather than
|
||||
# a delta, so first we need to ensure we have loaded the state map
|
||||
# from the database.
|
||||
if current_state_ids is None:
|
||||
assert prev_group is not None
|
||||
assert delta_ids is not None
|
||||
groups = await self._get_state_for_groups([prev_group])
|
||||
current_state_ids = dict(groups[prev_group])
|
||||
current_state_ids.update(delta_ids)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"store_state_group.insert_full_state",
|
||||
insert_full_state_txn,
|
||||
current_state_ids,
|
||||
"store_state_group", _store_state_group_txn
|
||||
)
|
||||
|
||||
async def purge_unreferenced_state_groups(
|
||||
|
||||
@@ -74,14 +74,13 @@ Changes in SCHEMA_VERSION = 71:
|
||||
|
||||
Changes in SCHEMA_VERSION = 72:
|
||||
- event_edges.(room_id, is_state) are no longer written to.
|
||||
- Tables related to groups are dropped.
|
||||
"""
|
||||
|
||||
|
||||
SCHEMA_COMPAT_VERSION = (
|
||||
# The groups tables are no longer accessible, so synapses with SCHEMA_VERSION < 72
|
||||
# could break.
|
||||
72
|
||||
# We no longer maintain `event_edges.room_id`, so synapses with SCHEMA_VERSION < 71
|
||||
# will break.
|
||||
71
|
||||
)
|
||||
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
|
||||
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
|
||||
def run_create(cur: Cursor, database_engine, *args, **kwargs):
|
||||
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
|
||||
|
||||
# we know that any new events will have the columns populated (and that has been
|
||||
# the case since schema_version 68, so there is no chance of rolling back now).
|
||||
#
|
||||
# So, we only need to make sure that existing rows are updated. We read the
|
||||
# current min and max stream orderings, since that is guaranteed to include all
|
||||
# the events that were stored before the new columns were added.
|
||||
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
|
||||
(min_stream_ordering, max_stream_ordering) = cur.fetchone()
|
||||
|
||||
if min_stream_ordering is None:
|
||||
# no rows, nothing to do.
|
||||
return
|
||||
|
||||
cur.execute(
|
||||
"INSERT into background_updates (ordering, update_name, progress_json)"
|
||||
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
|
||||
(
|
||||
json.dumps(
|
||||
{
|
||||
"min_stream_ordering_exclusive": min_stream_ordering - 1,
|
||||
"max_stream_ordering_inclusive": max_stream_ordering,
|
||||
}
|
||||
),
|
||||
),
|
||||
)
|
||||
@@ -1,17 +0,0 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- event_reference_hashes is unused, so we can drop it
|
||||
DROP TABLE event_reference_hashes;
|
||||
@@ -1,31 +0,0 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Remove the tables which powered the unspecced groups/communities feature.
|
||||
DROP TABLE IF EXISTS group_attestations_remote;
|
||||
DROP TABLE IF EXISTS group_attestations_renewals;
|
||||
DROP TABLE IF EXISTS group_invites;
|
||||
DROP TABLE IF EXISTS group_roles;
|
||||
DROP TABLE IF EXISTS group_room_categories;
|
||||
DROP TABLE IF EXISTS group_rooms;
|
||||
DROP TABLE IF EXISTS group_summary_roles;
|
||||
DROP TABLE IF EXISTS group_summary_room_categories;
|
||||
DROP TABLE IF EXISTS group_summary_rooms;
|
||||
DROP TABLE IF EXISTS group_summary_users;
|
||||
DROP TABLE IF EXISTS group_users;
|
||||
DROP TABLE IF EXISTS groups;
|
||||
DROP TABLE IF EXISTS local_group_membership;
|
||||
DROP TABLE IF EXISTS local_group_updates;
|
||||
DROP TABLE IF EXISTS remote_profile_cache;
|
||||
@@ -166,7 +166,6 @@ class PartialCurrentStateTracker:
|
||||
logger.info(
|
||||
"Awaiting un-partial-stating of room %s",
|
||||
room_id,
|
||||
stack_info=True,
|
||||
)
|
||||
|
||||
await make_deferred_yieldable(d)
|
||||
|
||||
@@ -14,13 +14,11 @@
|
||||
import enum
|
||||
import logging
|
||||
import threading
|
||||
from typing import Any, Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union
|
||||
from typing import Any, Dict, Generic, Iterable, Optional, Set, TypeVar
|
||||
|
||||
import attr
|
||||
from typing_extensions import Literal
|
||||
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_items
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -55,67 +53,20 @@ class DictionaryEntry: # should be: Generic[DKT, DV].
|
||||
return len(self.value)
|
||||
|
||||
|
||||
class _FullCacheKey(enum.Enum):
|
||||
"""The key we use to cache the full dict."""
|
||||
|
||||
KEY = object()
|
||||
|
||||
|
||||
class _Sentinel(enum.Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
# type of a dictionary lookup.
|
||||
sentinel = object()
|
||||
|
||||
|
||||
class _PerKeyValue(Generic[DV]):
|
||||
"""The cached value of a dictionary key. If `value` is the sentinel,
|
||||
indicates that the requested key is known to *not* be in the full dict.
|
||||
"""
|
||||
|
||||
__slots__ = ["value"]
|
||||
|
||||
def __init__(self, value: Union[DV, Literal[_Sentinel.sentinel]]) -> None:
|
||||
self.value = value
|
||||
|
||||
def __len__(self) -> int:
|
||||
# We add a `__len__` implementation as we use this class in a cache
|
||||
# where the values are variable length.
|
||||
return 1
|
||||
|
||||
|
||||
class DictionaryCache(Generic[KT, DKT, DV]):
|
||||
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
|
||||
fetching a subset of dictionary keys for a particular key.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, max_entries: int = 1000):
|
||||
# We use a single cache to cache two different types of entries:
|
||||
# 1. Map from (key, dict_key) -> dict value (or sentinel, indicating
|
||||
# the key doesn't exist in the dict); and
|
||||
# 2. Map from (key, _FullCacheKey.KEY) -> full dict.
|
||||
#
|
||||
# The former is used when explicit keys of the dictionary are looked up,
|
||||
# and the latter when the full dictionary is requested.
|
||||
#
|
||||
# If when explicit keys are requested and not in the cache, we then look
|
||||
# to see if we have the full dict and use that if we do. If found in the
|
||||
# full dict each key is added into the cache.
|
||||
#
|
||||
# This set up allows the `LruCache` to prune the full dict entries if
|
||||
# they haven't been used in a while, even when there have been recent
|
||||
# queries for subsets of the dict.
|
||||
#
|
||||
# Typing:
|
||||
# * A key of `(KT, DKT)` has a value of `_PerKeyValue`
|
||||
# * A key of `(KT, _FullCacheKey.KEY)` has a value of `Dict[DKT, DV]`
|
||||
self.cache: LruCache[
|
||||
Tuple[KT, Union[DKT, Literal[_FullCacheKey.KEY]]],
|
||||
Union[_PerKeyValue, Dict[DKT, DV]],
|
||||
] = LruCache(
|
||||
max_size=max_entries,
|
||||
cache_name=name,
|
||||
cache_type=TreeCache,
|
||||
size_callback=len,
|
||||
self.cache: LruCache[KT, DictionaryEntry] = LruCache(
|
||||
max_size=max_entries, cache_name=name, size_callback=len
|
||||
)
|
||||
|
||||
self.name = name
|
||||
@@ -145,97 +96,20 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
||||
Returns:
|
||||
DictionaryEntry
|
||||
"""
|
||||
|
||||
if dict_keys is None:
|
||||
# First we check if we have cached the full dict.
|
||||
entry = self.cache.get((key, _FullCacheKey.KEY), _Sentinel.sentinel)
|
||||
if entry is not _Sentinel.sentinel:
|
||||
assert isinstance(entry, dict)
|
||||
return DictionaryEntry(True, set(), entry)
|
||||
|
||||
# If not, check if we have cached any of dict keys.
|
||||
all_entries = self.cache.get_multi(
|
||||
(key,),
|
||||
_Sentinel.sentinel,
|
||||
)
|
||||
if all_entries is _Sentinel.sentinel:
|
||||
return DictionaryEntry(False, set(), {})
|
||||
|
||||
# If there are entries we need to unwrap the returned cache nodes
|
||||
# and `_PerKeyValue` into the `DictionaryEntry`.
|
||||
values = {}
|
||||
known_absent = set()
|
||||
for dict_key, dict_value in iterate_tree_cache_items((), all_entries):
|
||||
dict_key = dict_key[0]
|
||||
dict_value = dict_value.value
|
||||
|
||||
# We have explicitly looked for a full cache key, so we
|
||||
# shouldn't see one.
|
||||
assert dict_key != _FullCacheKey.KEY
|
||||
|
||||
# ... therefore the values must be `_PerKeyValue`
|
||||
assert isinstance(dict_value, _PerKeyValue)
|
||||
|
||||
if dict_value.value is _Sentinel.sentinel:
|
||||
known_absent.add(dict_key)
|
||||
else:
|
||||
values[dict_key] = dict_value.value
|
||||
|
||||
return DictionaryEntry(False, known_absent, values)
|
||||
|
||||
# We are being asked for a subset of keys.
|
||||
|
||||
# First got and check for each requested dict key in the cache, tracking
|
||||
# which we couldn't find.
|
||||
values = {}
|
||||
known_absent = set()
|
||||
missing = set()
|
||||
for dict_key in dict_keys:
|
||||
entry = self.cache.get((key, dict_key), _Sentinel.sentinel)
|
||||
if entry is _Sentinel.sentinel:
|
||||
missing.add(dict_key)
|
||||
continue
|
||||
|
||||
assert isinstance(entry, _PerKeyValue)
|
||||
|
||||
if entry.value is _Sentinel.sentinel:
|
||||
known_absent.add(dict_key)
|
||||
entry = self.cache.get(key, _Sentinel.sentinel)
|
||||
if entry is not _Sentinel.sentinel:
|
||||
if dict_keys is None:
|
||||
return DictionaryEntry(
|
||||
entry.full, entry.known_absent, dict(entry.value)
|
||||
)
|
||||
else:
|
||||
values[dict_key] = entry.value
|
||||
return DictionaryEntry(
|
||||
entry.full,
|
||||
entry.known_absent,
|
||||
{k: entry.value[k] for k in dict_keys if k in entry.value},
|
||||
)
|
||||
|
||||
# If we found everything we can return immediately.
|
||||
if not missing:
|
||||
return DictionaryEntry(False, known_absent, values)
|
||||
|
||||
# If we are missing any keys check if we happen to have the full dict in
|
||||
# the cache.
|
||||
#
|
||||
# We don't update the last access time for this cache fetch, as we
|
||||
# aren't explicitly interested in the full dict and so we don't want
|
||||
# requests for explicit dict keys to keep the full dict in the cache.
|
||||
entry = self.cache.get(
|
||||
(key, _FullCacheKey.KEY),
|
||||
_Sentinel.sentinel,
|
||||
update_last_access=False,
|
||||
)
|
||||
if entry is _Sentinel.sentinel:
|
||||
# Not in the cache, return the subset of keys we found.
|
||||
return DictionaryEntry(False, known_absent, values)
|
||||
|
||||
# We have the full dict!
|
||||
assert isinstance(entry, dict)
|
||||
|
||||
values = {}
|
||||
for dict_key in dict_keys:
|
||||
# We explicitly add each dict key to the cache, so that cache hit
|
||||
# rates for each key can be tracked separately.
|
||||
value = entry.get(dict_key, _Sentinel.sentinel) # type: ignore[arg-type]
|
||||
self.cache[(key, dict_key)] = _PerKeyValue(value)
|
||||
|
||||
if value is not _Sentinel.sentinel:
|
||||
values[dict_key] = value
|
||||
|
||||
return DictionaryEntry(True, set(), values)
|
||||
return DictionaryEntry(False, set(), {})
|
||||
|
||||
def invalidate(self, key: KT) -> None:
|
||||
self.check_thread()
|
||||
@@ -243,9 +117,7 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
||||
# Increment the sequence number so that any SELECT statements that
|
||||
# raced with the INSERT don't update the cache (SYN-369)
|
||||
self.sequence += 1
|
||||
|
||||
# Del-multi accepts truncated tuples.
|
||||
self.cache.del_multi((key,)) # type: ignore[arg-type]
|
||||
self.cache.pop(key, None)
|
||||
|
||||
def invalidate_all(self) -> None:
|
||||
self.check_thread()
|
||||
@@ -277,27 +149,20 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
||||
# Only update the cache if the caches sequence number matches the
|
||||
# number that the cache had before the SELECT was started (SYN-369)
|
||||
if fetched_keys is None:
|
||||
self.cache[(key, _FullCacheKey.KEY)] = value
|
||||
self._insert(key, value, set())
|
||||
else:
|
||||
self._update_subset(key, value, fetched_keys)
|
||||
self._update_or_insert(key, value, fetched_keys)
|
||||
|
||||
def _update_subset(
|
||||
self, key: KT, value: Dict[DKT, DV], fetched_keys: Iterable[DKT]
|
||||
def _update_or_insert(
|
||||
self, key: KT, value: Dict[DKT, DV], known_absent: Iterable[DKT]
|
||||
) -> None:
|
||||
"""Add the given dictionary values as explicit keys in the cache.
|
||||
# We pop and reinsert as we need to tell the cache the size may have
|
||||
# changed
|
||||
|
||||
Args:
|
||||
key
|
||||
value: The dictionary with all the values that we should cache
|
||||
fetched_keys: The full set of keys that were looked up, any keys
|
||||
here not in `value` should be marked as "known absent".
|
||||
"""
|
||||
entry: DictionaryEntry = self.cache.pop(key, DictionaryEntry(False, set(), {}))
|
||||
entry.value.update(value)
|
||||
entry.known_absent.update(known_absent)
|
||||
self.cache[key] = entry
|
||||
|
||||
for dict_key, dict_value in value.items():
|
||||
self.cache[(key, dict_key)] = _PerKeyValue(dict_value)
|
||||
|
||||
for dict_key in fetched_keys:
|
||||
if (key, dict_key) in self.cache:
|
||||
continue
|
||||
|
||||
self.cache[(key, dict_key)] = _PerKeyValue(_Sentinel.sentinel)
|
||||
def _insert(self, key: KT, value: Dict[DKT, DV], known_absent: Set[DKT]) -> None:
|
||||
self.cache[key] = DictionaryEntry(True, known_absent, value)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user