1
0

Merge remote-tracking branch 'origin/develop' into rei/gsgfg

This commit is contained in:
Olivier Wilkinson (reivilibre)
2021-08-05 15:51:54 +01:00
97 changed files with 1611 additions and 403 deletions
+18 -10
View File
@@ -8,7 +8,7 @@ on:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
lint:
runs-on: ubuntu-latest
@@ -367,13 +367,21 @@ jobs:
- name: Set build result
env:
NEEDS_CONTEXT: ${{ toJSON(needs) }}
# the `jq` incantation dumps out a series of "<job> <result>" lines
# the `jq` incantation dumps out a series of "<job> <result>" lines.
# we set it to an intermediate variable to avoid a pipe, which makes it
# hard to set $rc.
run: |
set -o pipefail
jq -r 'to_entries[] | [.key,.value.result] | join(" ")' \
<<< $NEEDS_CONTEXT |
while read job result; do
if [ "$result" != "success" ]; then
echo "::set-failed ::Job $job returned $result"
fi
done
rc=0
results=$(jq -r 'to_entries[] | [.key,.value.result] | join(" ")' <<< $NEEDS_CONTEXT)
while read job result ; do
# The newsfile lint may be skipped on non PR builds
if [ $result == "skipped" ] && [ $job == "lint-newsfile" ]; then
continue
fi
if [ "$result" != "success" ]; then
echo "::set-failed ::Job $job returned $result"
rc=1
fi
done <<< $results
exit $rc
+74
View File
@@ -1,3 +1,77 @@
Synapse 1.40.0rc2 (2021-08-04)
==============================
Bugfixes
--------
- Fix the `PeriodicallyFlushingMemoryHandler` inhibiting application shutdown because of its background thread. ([\#10517](https://github.com/matrix-org/synapse/issues/10517))
- Fix a bug introduced in Synapse v1.40.0rc1 that could cause Synapse to respond with an error when clients would update read receipts. ([\#10531](https://github.com/matrix-org/synapse/issues/10531))
Internal Changes
----------------
- Fix release script to open the correct URL for the release. ([\#10516](https://github.com/matrix-org/synapse/issues/10516))
Synapse 1.40.0rc1 (2021-08-03)
==============================
Features
--------
- Add support for [MSC2033](https://github.com/matrix-org/matrix-doc/pull/2033): `device_id` on `/account/whoami`. ([\#9918](https://github.com/matrix-org/synapse/issues/9918))
- Update support for [MSC2716 - Incrementally importing history into existing rooms](https://github.com/matrix-org/matrix-doc/pull/2716). ([\#10245](https://github.com/matrix-org/synapse/issues/10245), [\#10432](https://github.com/matrix-org/synapse/issues/10432), [\#10463](https://github.com/matrix-org/synapse/issues/10463))
- Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events. ([\#10254](https://github.com/matrix-org/synapse/issues/10254), [\#10447](https://github.com/matrix-org/synapse/issues/10447), [\#10489](https://github.com/matrix-org/synapse/issues/10489))
- Initial support for [MSC3244](https://github.com/matrix-org/matrix-doc/pull/3244), Room version capabilities over the /capabilities API. ([\#10283](https://github.com/matrix-org/synapse/issues/10283))
- Add a buffered logging handler which periodically flushes itself. ([\#10407](https://github.com/matrix-org/synapse/issues/10407), [\#10515](https://github.com/matrix-org/synapse/issues/10515))
- Add support for https connections to a proxy server. Contributed by @Bubu and @dklimpel. ([\#10411](https://github.com/matrix-org/synapse/issues/10411))
- Support for [MSC2285 (hidden read receipts)](https://github.com/matrix-org/matrix-doc/pull/2285). Contributed by @SimonBrandner. ([\#10413](https://github.com/matrix-org/synapse/issues/10413))
- Email notifications now state whether an invitation is to a room or a space. ([\#10426](https://github.com/matrix-org/synapse/issues/10426))
- Allow setting transaction limit for database connections. ([\#10440](https://github.com/matrix-org/synapse/issues/10440), [\#10511](https://github.com/matrix-org/synapse/issues/10511))
- Add `creation_ts` to "list users" admin API. ([\#10448](https://github.com/matrix-org/synapse/issues/10448))
Bugfixes
--------
- Improve character set detection in URL previews by supporting underscores (in addition to hyphens). Contributed by @srividyut. ([\#10410](https://github.com/matrix-org/synapse/issues/10410))
- Fix events being incorrectly rejected over federation if they reference auth events that the server needed to fetch. ([\#10439](https://github.com/matrix-org/synapse/issues/10439))
- Fix `synapse_federation_server_oldest_inbound_pdu_in_staging` Prometheus metric to not report a max age of 51 years when the queue is empty. ([\#10455](https://github.com/matrix-org/synapse/issues/10455))
- Fix a bug which caused an explicit assignment of power-level 0 to a user to be misinterpreted in rare circumstances. ([\#10499](https://github.com/matrix-org/synapse/issues/10499))
Improved Documentation
----------------------
- Fix hierarchy of providers on the OpenID page. ([\#10445](https://github.com/matrix-org/synapse/issues/10445))
- Consolidate development documentation to `docs/development/`. ([\#10453](https://github.com/matrix-org/synapse/issues/10453))
- Add some developer docs to explain room DAG concepts like `outliers`, `state_groups`, `depth`, etc. ([\#10464](https://github.com/matrix-org/synapse/issues/10464))
- Document how to use Complement while developing a new Synapse feature. ([\#10483](https://github.com/matrix-org/synapse/issues/10483))
Internal Changes
----------------
- Prune inbound federation queues for a room if they get too large. ([\#10390](https://github.com/matrix-org/synapse/issues/10390))
- Add type hints to `synapse.federation.transport.client` module. ([\#10408](https://github.com/matrix-org/synapse/issues/10408))
- Remove shebang line from module files. ([\#10415](https://github.com/matrix-org/synapse/issues/10415))
- Drop backwards-compatibility code that was required to support Ubuntu Xenial. ([\#10429](https://github.com/matrix-org/synapse/issues/10429))
- Use a docker image cache for the prerequisites for the debian package build. ([\#10431](https://github.com/matrix-org/synapse/issues/10431))
- Improve servlet type hints. ([\#10437](https://github.com/matrix-org/synapse/issues/10437), [\#10438](https://github.com/matrix-org/synapse/issues/10438))
- Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages. ([\#10442](https://github.com/matrix-org/synapse/issues/10442))
- Update the `tests-done` Github Actions status. ([\#10444](https://github.com/matrix-org/synapse/issues/10444), [\#10512](https://github.com/matrix-org/synapse/issues/10512))
- Update type annotations to work with forthcoming Twisted 21.7.0 release. ([\#10446](https://github.com/matrix-org/synapse/issues/10446), [\#10450](https://github.com/matrix-org/synapse/issues/10450))
- Cancel redundant GHA workflows when a new commit is pushed. ([\#10451](https://github.com/matrix-org/synapse/issues/10451))
- Mitigate media repo XSS attacks on IE11 via the non-standard X-Content-Security-Policy header. ([\#10468](https://github.com/matrix-org/synapse/issues/10468))
- Additional type hints in the state handler. ([\#10482](https://github.com/matrix-org/synapse/issues/10482))
- Update syntax used to run complement tests. ([\#10488](https://github.com/matrix-org/synapse/issues/10488))
- Fix up type annotations to work with Twisted 21.7. ([\#10490](https://github.com/matrix-org/synapse/issues/10490))
- Improve type annotations for `ObservableDeferred`. ([\#10491](https://github.com/matrix-org/synapse/issues/10491))
- Extend release script to also tag and create GitHub releases. ([\#10496](https://github.com/matrix-org/synapse/issues/10496))
- Fix a bug which caused production debian packages to be incorrectly marked as 'prerelease'. ([\#10500](https://github.com/matrix-org/synapse/issues/10500))
Synapse 1.39.0 (2021-07-29)
===========================
+1
View File
@@ -0,0 +1 @@
Improve event caching mechanism to avoid having multiple copies of an event in memory at a time.
+1
View File
@@ -0,0 +1 @@
Add some clarification to the sample config file. Contributed by @Kentokamoto.
-1
View File
@@ -1 +0,0 @@
Make historical events discoverable from backfill for servers without any scrollback history (part of MSC2716).
-1
View File
@@ -1 +0,0 @@
Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events.
-1
View File
@@ -1 +0,0 @@
Initial support for MSC3244, Room version capabilities over the /capabilities API.
-1
View File
@@ -1 +0,0 @@
Prune inbound federation inbound queues for a room if they get too large.
-1
View File
@@ -1 +0,0 @@
Add a buffered logging handler which periodically flushes itself.
-1
View File
@@ -1 +0,0 @@
Add type hints to `synapse.federation.transport.client` module.
-1
View File
@@ -1 +0,0 @@
Improve character set detection in URL previews by supporting underscores (in addition to hyphens). Contributed by @srividyut.
-1
View File
@@ -1 +0,0 @@
Add support for https connections to a proxy server. Contributed by @Bubu and @dklimpel.
-1
View File
@@ -1 +0,0 @@
Support for [MSC2285 (hidden read receipts)](https://github.com/matrix-org/matrix-doc/pull/2285). Contributed by @SimonBrandner.
-1
View File
@@ -1 +0,0 @@
Remove shebang line from module files.
-1
View File
@@ -1 +0,0 @@
Email notifications now state whether an invitation is to a room or a space.
-1
View File
@@ -1 +0,0 @@
Drop backwards-compatibility code that was required to support Ubuntu Xenial.
-1
View File
@@ -1 +0,0 @@
Use a docker image cache for the prerequisites for the debian package build.
-1
View File
@@ -1 +0,0 @@
Connect historical chunks together with chunk events instead of a content field (MSC2716).
+1
View File
@@ -0,0 +1 @@
Experimental support for [MSC3288](https://github.com/matrix-org/matrix-doc/pull/3288), sending `room_type` to the identity server for 3pid invites over the `/store-invite` API.
-1
View File
@@ -1 +0,0 @@
Improve servlet type hints.
-1
View File
@@ -1 +0,0 @@
Improve servlet type hints.
-1
View File
@@ -1 +0,0 @@
Fix events with floating outlier state being rejected over federation.
-1
View File
@@ -1 +0,0 @@
Allow setting transaction limit for database connections.
-1
View File
@@ -1 +0,0 @@
Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.
+1
View File
@@ -0,0 +1 @@
Add documentation for configuration a forward proxy.
-1
View File
@@ -1 +0,0 @@
Update the `tests-done` Github Actions status.
-1
View File
@@ -1 +0,0 @@
Fix hierarchy of providers on the OpenID page.
-1
View File
@@ -1 +0,0 @@
Update type annotations to work with forthcoming Twisted 21.7.0 release.
-1
View File
@@ -1 +0,0 @@
Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events.
-1
View File
@@ -1 +0,0 @@
Add `creation_ts` to list users admin API.
-1
View File
@@ -1 +0,0 @@
Update type annotations to work with forthcoming Twisted 21.7.0 release.
-1
View File
@@ -1 +0,0 @@
Cancel redundant GHA workflows when a new commit is pushed.
-1
View File
@@ -1 +0,0 @@
Consolidate development documentation to `docs/development/`.
-1
View File
@@ -1 +0,0 @@
Fix `synapse_federation_server_oldest_inbound_pdu_in_staging` Prometheus metric to not report a max age of 51 years when the queue is empty.
-1
View File
@@ -1 +0,0 @@
Disable `msc2716` Complement tests until Complement updates are merged.
-1
View File
@@ -1 +0,0 @@
Mitigate media repo XSS attacks on IE11 via the non-standard X-Content-Security-Policy header.
-1
View File
@@ -1 +0,0 @@
Additional type hints in the state handler.
-1
View File
@@ -1 +0,0 @@
Document how to use Complement while developing a new Synapse feature.
-1
View File
@@ -1 +0,0 @@
Update syntax used to run complement tests.
-1
View File
@@ -1 +0,0 @@
Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events.
-1
View File
@@ -1 +0,0 @@
Fix up type annotations to work with Twisted 21.7.
-1
View File
@@ -1 +0,0 @@
Improve type annotations for `ObservableDeferred`.
+1
View File
@@ -0,0 +1 @@
Add support for "marker" events which makes historical events discoverable for servers that already have all of the scrollback history (part of MSC2716).
-1
View File
@@ -1 +0,0 @@
Fix a bug which caused an explicit assignment of power-level 0 to a user to be misinterpreted in rare circumstances.
-1
View File
@@ -1 +0,0 @@
Fix a bug which caused production debian packages to be incorrectly marked as 'prerelease'.
+1
View File
@@ -0,0 +1 @@
Reduce errors in PostgreSQL logs due to concurrent serialization errors.
+1
View File
@@ -0,0 +1 @@
Include room ID in ignored EDU log messages. Contributed by @ilmari.
-1
View File
@@ -1 +0,0 @@
Allow setting transaction limit for database connections.
+1
View File
@@ -0,0 +1 @@
Add a configuration setting for the time a `/sync` response is cached for.
+1
View File
@@ -0,0 +1 @@
Prepare for the new spaces summary endpoint (updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946)).
+1
View File
@@ -0,0 +1 @@
Fix CI to not break when run against branches rather than pull requests.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where protocols which are not implemented by any appservices were incorrectly returned via `GET /_matrix/client/r0/thirdparty/protocols`.
+1
View File
@@ -0,0 +1 @@
Add `get_userinfo_by_id` method to ModuleApi.
-1
View File
@@ -1 +0,0 @@
Add support for [MSC2033](https://github.com/matrix-org/matrix-doc/pull/2033): `device_id` on `/account/whoami`.
+15
View File
@@ -100,3 +100,18 @@ esac
# add a dependency on the right version of python to substvars.
PYPKG=`basename $SNAKE`
echo "synapse:pydepends=$PYPKG" >> debian/matrix-synapse-py3.substvars
# add a couple of triggers. This is needed so that dh-virtualenv can rebuild
# the venv when the system python changes (see
# https://dh-virtualenv.readthedocs.io/en/latest/tutorial.html#step-2-set-up-packaging-for-your-project)
#
# we do it here rather than the more conventional way of just adding it to
# debian/matrix-synapse-py3.triggers, because we need to add a trigger on the
# right version of python.
cat >>"debian/.debhelper/generated/matrix-synapse-py3/triggers" <<EOF
# triggers for dh-virtualenv
interest-noawait $SNAKE
interest dh-virtualenv-interpreter-update
EOF
+14 -2
View File
@@ -1,8 +1,20 @@
matrix-synapse-py3 (1.39.0ubuntu1) UNRELEASED; urgency=medium
matrix-synapse-py3 (1.40.0~rc2) stable; urgency=medium
* New synapse release 1.40.0~rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 04 Aug 2021 17:08:55 +0100
matrix-synapse-py3 (1.40.0~rc1) stable; urgency=medium
[ Richard van der Hoff ]
* Drop backwards-compatibility code that was required to support Ubuntu Xenial.
* Update package triggers so that the virtualenv is correctly rebuilt
when the system python is rebuilt, on recent Python versions.
-- Richard van der Hoff <richard@matrix.org> Tue, 20 Jul 2021 00:10:03 +0100
[ Synapse Packaging team ]
* New synapse release 1.40.0~rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 03 Aug 2021 11:31:49 +0100
matrix-synapse-py3 (1.39.0) stable; urgency=medium
-9
View File
@@ -1,9 +0,0 @@
# Register interest in Python interpreter changes and
# don't make the Python package dependent on the virtualenv package
# processing (noawait)
interest-noawait /usr/bin/python3.5
interest-noawait /usr/bin/python3.6
interest-noawait /usr/bin/python3.7
# Also provide a symbolic trigger for all dh-virtualenv packages
interest dh-virtualenv-interpreter-update
+2
View File
@@ -7,6 +7,7 @@
- [Installation](setup/installation.md)
- [Using Postgres](postgres.md)
- [Configuring a Reverse Proxy](reverse_proxy.md)
- [Configuring a Forward/Outbound Proxy](setup/forward_proxy.md)
- [Configuring a Turn Server](turn-howto.md)
- [Delegation](delegate.md)
@@ -79,6 +80,7 @@
- [Single Sign-On]()
- [SAML](development/saml.md)
- [CAS](development/cas.md)
- [Room DAG concepts](development/room-dag-concepts.md)
- [State Resolution]()
- [The Auth Chain Difference Algorithm](auth_chain_difference_algorithm.md)
- [Media Repository](media_repository.md)
+79
View File
@@ -0,0 +1,79 @@
# Room DAG concepts
## Edges
The word "edge" comes from graph theory lingo. An edge is just a connection
between two events. In Synapse, we connect events by specifying their
`prev_events`. A subsequent event points back at a previous event.
```
A (oldest) <---- B <---- C (most recent)
```
## Depth and stream ordering
Events are normally sorted by `(topological_ordering, stream_ordering)` where
`topological_ordering` is just `depth`. In other words, we first sort by `depth`
and then tie-break based on `stream_ordering`. `depth` is incremented as new
messages are added to the DAG. Normally, `stream_ordering` is an auto
incrementing integer, but backfilled events start with `stream_ordering=-1` and decrement.
---
- `/sync` returns things in the order they arrive at the server (`stream_ordering`).
- `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
The general idea is that, if you're following a room in real-time (i.e.
`/sync`), you probably want to see the messages as they arrive at your server,
rather than skipping any that arrived late; whereas if you're looking at a
historical section of timeline (i.e. `/messages`), you want to see the best
representation of the state of the room as others were seeing it at the time.
## Forward extremity
Most-recent-in-time events in the DAG which are not referenced by any other events' `prev_events` yet.
The forward extremities of a room are used as the `prev_events` when the next event is sent.
## Backwards extremity
The current marker of where we have backfilled up to and will generally be the
oldest-in-time events we know of in the DAG.
This is an event where we haven't fetched all of the `prev_events` for.
Once we have fetched all of its `prev_events`, it's unmarked as a backwards
extremity (although we may have formed new backwards extremities from the prev
events during the backfilling process).
## Outliers
We mark an event as an `outlier` when we haven't figured out the state for the
room at that point in the DAG yet.
We won't *necessarily* have the `prev_events` of an `outlier` in the database,
but it's entirely possible that we *might*. The status of whether we have all of
the `prev_events` is marked as a [backwards extremity](#backwards-extremity).
For example, when we fetch the event auth chain or state for a given event, we
mark all of those claimed auth events as outliers because we haven't done the
state calculation ourself.
## State groups
For every non-outlier event we need to know the state at that event. Instead of
storing the full state for each event in the DB (i.e. a `event_id -> state`
mapping), which is *very* space inefficient when state doesn't change, we
instead assign each different set of state a "state group" and then have
mappings of `event_id -> state_group` and `state_group -> state`.
### Stage group edges
TODO: `state_group_edges` is a further optimization...
notes from @Azrenbeth, https://pastebin.com/seUGVGeT
+13
View File
@@ -210,6 +210,8 @@ presence:
#
# This option replaces federation_ip_range_blacklist in Synapse v1.25.0.
#
# Note: The value is ignored when an HTTP proxy is in use
#
#ip_range_blacklist:
# - '127.0.0.0/8'
# - '10.0.0.0/8'
@@ -711,6 +713,15 @@ caches:
#
#expiry_time: 30m
# Controls how long the results of a /sync request are cached for after
# a successful response is returned. A higher duration can help clients with
# intermittent connections, at the cost of higher memory usage.
#
# By default, this is zero, which means that sync responses are not cached
# at all.
#
#sync_response_cache_duration: 2m
## Database ##
@@ -963,6 +974,8 @@ media_store_path: "DATADIR/media_store"
# This must be specified if url_preview_enabled is set. It is recommended that
# you uncomment the following list as a starting point.
#
# Note: The value is ignored when an HTTP proxy is in use
#
#url_preview_ip_range_blacklist:
# - '127.0.0.0/8'
# - '10.0.0.0/8'
+1 -4
View File
@@ -28,7 +28,7 @@ handlers:
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
# logs will still be flushed immediately.
buffer:
class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
class: logging.handlers.MemoryHandler
target: file
# The capacity is the number of log lines that are buffered before
# being written to disk. Increasing this will lead to better
@@ -36,9 +36,6 @@ handlers:
# be written to disk.
capacity: 10
flushLevel: 30 # Flush for WARNING logs as well
# The period of time, in seconds, between forced flushes.
# Messages will not be delayed for longer than this time.
period: 5
# A handler that writes logs to stderr. Unused by default, but can be used
# instead of "buffer" and "file" in the logger handlers.
+74
View File
@@ -0,0 +1,74 @@
# Using a forward proxy with Synapse
You can use Synapse with a forward or outbound proxy. An example of when
this is necessary is in corporate environments behind a DMZ (demilitarized zone).
Synapse supports routing outbound HTTP(S) requests via a proxy. Only HTTP(S)
proxy is supported, not SOCKS proxy or anything else.
## Configure
The `http_proxy`, `https_proxy`, `no_proxy` environment variables are used to
specify proxy settings. The environment variable is not case sensitive.
- `http_proxy`: Proxy server to use for HTTP requests.
- `https_proxy`: Proxy server to use for HTTPS requests.
- `no_proxy`: Comma-separated list of hosts, IP addresses, or IP ranges in CIDR
format which should not use the proxy. Synapse will directly connect to these hosts.
The `http_proxy` and `https_proxy` environment variables have the form: `[scheme://][<username>:<password>@]<host>[:<port>]`
- Supported schemes are `http://` and `https://`. The default scheme is `http://`
for compatibility reasons; it is recommended to set a scheme. If scheme is set
to `https://` the connection uses TLS between Synapse and the proxy.
**NOTE**: Synapse validates the certificates. If the certificate is not
valid, then the connection is dropped.
- Default port if not given is `1080`.
- Username and password are optional and will be used to authenticate against
the proxy.
**Examples**
- HTTP_PROXY=http://USERNAME:PASSWORD@10.0.1.1:8080/
- HTTPS_PROXY=http://USERNAME:PASSWORD@proxy.example.com:8080/
- NO_PROXY=master.hostname.example.com,10.1.0.0/16,172.30.0.0/16
**NOTE**:
Synapse does not apply the IP blacklist to connections through the proxy (since
the DNS resolution is done by the proxy). It is expected that the proxy or firewall
will apply blacklisting of IP addresses.
## Connection types
The proxy will be **used** for:
- push
- url previews
- phone-home stats
- recaptcha validation
- CAS auth validation
- OpenID Connect
- Federation (checking public key revocation)
It will **not be used** for:
- Application Services
- Identity servers
- Outbound federation
- In worker configurations
- connections between workers
- connections from workers to Redis
- Fetching public keys of other servers
- Downloading remote media
## Troubleshooting
If a proxy server is used with TLS (HTTPS) and no connections are established,
it is most likely due to the proxy's certificates. To test this, the validation
in Synapse can be deactivated.
**NOTE**: This has an impact on security and is for testing purposes only!
To deactivate the certificate validation, the following setting must be made in
[homserver.yaml](../usage/configuration/homeserver_sample_config.md).
```yaml
use_insecure_ssl_client_just_for_testing_do_not_use: true
```
+1 -1
View File
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi
# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
+275 -36
View File
@@ -14,29 +14,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""An interactive script for doing a release. See `run()` below.
"""An interactive script for doing a release. See `cli()` below.
"""
import re
import subprocess
import sys
from typing import Optional
import urllib.request
from os import path
from tempfile import TemporaryDirectory
from typing import List, Optional, Tuple
import attr
import click
import commonmark
import git
import redbaron
from click.exceptions import ClickException
from github import Github
from packaging import version
from redbaron import RedBaron
@click.command()
def run():
"""An interactive script to walk through the initial stages of creating a
release, including creating release branch, updating changelog and pushing to
GitHub.
@click.group()
def cli():
"""An interactive script to walk through the parts of creating a release.
Requires the dev dependencies be installed, which can be done via:
pip install -e .[dev]
Then to use:
./scripts-dev/release.py prepare
# ... ask others to look at the changelog ...
./scripts-dev/release.py tag
# ... wait for asssets to build ...
./scripts-dev/release.py publish
./scripts-dev/release.py upload
If the env var GH_TOKEN (or GITHUB_TOKEN) is set, or passed into the
`tag`/`publish` command, then a new draft release will be created/published.
"""
@cli.command()
def prepare():
"""Do the initial stages of creating a release, including creating release
branch, updating changelog and pushing to GitHub.
"""
# Make sure we're in a git repo.
@@ -51,32 +79,8 @@ def run():
click.secho("Updating git repo...")
repo.remote().fetch()
# Parse the AST and load the `__version__` node so that we can edit it
# later.
with open("synapse/__init__.py") as f:
red = RedBaron(f.read())
version_node = None
for node in red:
if node.type != "assignment":
continue
if node.target.type != "name":
continue
if node.target.value != "__version__":
continue
version_node = node
break
if not version_node:
print("Failed to find '__version__' definition in synapse/__init__.py")
sys.exit(1)
# Parse the current version.
current_version = version.parse(version_node.value.value.strip('"'))
assert isinstance(current_version, version.Version)
# Get the current version and AST from root Synapse module.
current_version, parsed_synapse_ast, version_node = parse_version_from_module()
# Figure out what sort of release we're doing and calcuate the new version.
rc = click.confirm("RC", default=True)
@@ -190,7 +194,7 @@ def run():
# Update the `__version__` variable and write it back to the file.
version_node.value = '"' + new_version + '"'
with open("synapse/__init__.py", "w") as f:
f.write(red.dumps())
f.write(parsed_synapse_ast.dumps())
# Generate changelogs
subprocess.run("python3 -m towncrier", shell=True)
@@ -240,6 +244,180 @@ def run():
)
@cli.command()
@click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"])
def tag(gh_token: Optional[str]):
"""Tags the release and generates a draft GitHub release"""
# Make sure we're in a git repo.
try:
repo = git.Repo()
except git.InvalidGitRepositoryError:
raise click.ClickException("Not in Synapse repo.")
if repo.is_dirty():
raise click.ClickException("Uncommitted changes exist.")
click.secho("Updating git repo...")
repo.remote().fetch()
# Find out the version and tag name.
current_version, _, _ = parse_version_from_module()
tag_name = f"v{current_version}"
# Check we haven't released this version.
if tag_name in repo.tags:
raise click.ClickException(f"Tag {tag_name} already exists!\n")
# Get the appropriate changelogs and tag.
changes = get_changes_for_version(current_version)
click.echo_via_pager(changes)
if click.confirm("Edit text?", default=False):
changes = click.edit(changes, require_save=False)
repo.create_tag(tag_name, message=changes)
if not click.confirm("Push tag to GitHub?", default=True):
print("")
print("Run when ready to push:")
print("")
print(f"\tgit push {repo.remote().name} tag {current_version}")
print("")
return
repo.git.push(repo.remote().name, "tag", tag_name)
# If no token was given, we bail here
if not gh_token:
click.launch(f"https://github.com/matrix-org/synapse/releases/edit/{tag_name}")
return
# Create a new draft release
gh = Github(gh_token)
gh_repo = gh.get_repo("matrix-org/synapse")
release = gh_repo.create_git_release(
tag=tag_name,
name=tag_name,
message=changes,
draft=True,
prerelease=current_version.is_prerelease,
)
# Open the release and the actions where we are building the assets.
click.launch(release.html_url)
click.launch(
f"https://github.com/matrix-org/synapse/actions?query=branch%3A{tag_name}"
)
click.echo("Wait for release assets to be built")
@cli.command()
@click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"], required=True)
def publish(gh_token: str):
"""Publish release."""
# Make sure we're in a git repo.
try:
repo = git.Repo()
except git.InvalidGitRepositoryError:
raise click.ClickException("Not in Synapse repo.")
if repo.is_dirty():
raise click.ClickException("Uncommitted changes exist.")
current_version, _, _ = parse_version_from_module()
tag_name = f"v{current_version}"
if not click.confirm(f"Publish {tag_name}?", default=True):
return
# Publish the draft release
gh = Github(gh_token)
gh_repo = gh.get_repo("matrix-org/synapse")
for release in gh_repo.get_releases():
if release.title == tag_name:
break
else:
raise ClickException(f"Failed to find GitHub release for {tag_name}")
assert release.title == tag_name
if not release.draft:
click.echo("Release already published.")
return
release = release.update_release(
name=release.title,
message=release.body,
tag_name=release.tag_name,
prerelease=release.prerelease,
draft=False,
)
@cli.command()
def upload():
"""Upload release to pypi."""
current_version, _, _ = parse_version_from_module()
tag_name = f"v{current_version}"
pypi_asset_names = [
f"matrix_synapse-{current_version}-py3-none-any.whl",
f"matrix-synapse-{current_version}.tar.gz",
]
with TemporaryDirectory(prefix=f"synapse_upload_{tag_name}_") as tmpdir:
for name in pypi_asset_names:
filename = path.join(tmpdir, name)
url = f"https://github.com/matrix-org/synapse/releases/download/{tag_name}/{name}"
click.echo(f"Downloading {name} into {filename}")
urllib.request.urlretrieve(url, filename=filename)
if click.confirm("Upload to PyPI?", default=True):
subprocess.run("twine upload *", shell=True, cwd=tmpdir)
click.echo(
f"Done! Remember to merge the tag {tag_name} into the appropriate branches"
)
def parse_version_from_module() -> Tuple[
version.Version, redbaron.RedBaron, redbaron.Node
]:
# Parse the AST and load the `__version__` node so that we can edit it
# later.
with open("synapse/__init__.py") as f:
red = redbaron.RedBaron(f.read())
version_node = None
for node in red:
if node.type != "assignment":
continue
if node.target.type != "name":
continue
if node.target.value != "__version__":
continue
version_node = node
break
if not version_node:
print("Failed to find '__version__' definition in synapse/__init__.py")
sys.exit(1)
# Parse the current version.
current_version = version.parse(version_node.value.value.strip('"'))
assert isinstance(current_version, version.Version)
return current_version, red, version_node
def find_ref(repo: git.Repo, ref_name: str) -> Optional[git.HEAD]:
"""Find the branch/ref, looking first locally then in the remote."""
if ref_name in repo.refs:
@@ -256,5 +434,66 @@ def update_branch(repo: git.Repo):
repo.git.merge(repo.active_branch.tracking_branch().name)
def get_changes_for_version(wanted_version: version.Version) -> str:
"""Get the changelogs for the given version.
If an RC then will only get the changelog for that RC version, otherwise if
its a full release will get the changelog for the release and all its RCs.
"""
with open("CHANGES.md") as f:
changes = f.read()
# First we parse the changelog so that we can split it into sections based
# on the release headings.
ast = commonmark.Parser().parse(changes)
@attr.s(auto_attribs=True)
class VersionSection:
title: str
# These are 0-based.
start_line: int
end_line: Optional[int] = None # Is none if its the last entry
headings: List[VersionSection] = []
for node, _ in ast.walker():
# We look for all text nodes that are in a level 1 heading.
if node.t != "text":
continue
if node.parent.t != "heading" or node.parent.level != 1:
continue
# If we have a previous heading then we update its `end_line`.
if headings:
headings[-1].end_line = node.parent.sourcepos[0][0] - 1
headings.append(VersionSection(node.literal, node.parent.sourcepos[0][0] - 1))
changes_by_line = changes.split("\n")
version_changelog = [] # The lines we want to include in the changelog
# Go through each section and find any that match the requested version.
regex = re.compile(r"^Synapse v?(\S+)")
for section in headings:
groups = regex.match(section.title)
if not groups:
continue
heading_version = version.parse(groups.group(1))
heading_base_version = version.parse(heading_version.base_version)
# Check if heading version matches the requested version, or if its an
# RC of the requested version.
if wanted_version not in (heading_version, heading_base_version):
continue
version_changelog.extend(changes_by_line[section.start_line : section.end_line])
return "\n".join(version_changelog)
if __name__ == "__main__":
run()
cli()
+2
View File
@@ -108,6 +108,8 @@ CONDITIONAL_REQUIREMENTS["dev"] = CONDITIONAL_REQUIREMENTS["lint"] + [
"click==7.1.2",
"redbaron==0.9.2",
"GitPython==3.1.14",
"commonmark==0.9.1",
"pygithub==1.55",
]
CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.13"]
+1 -1
View File
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.39.0"
__version__ = "1.40.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
+13
View File
@@ -151,6 +151,15 @@ class CacheConfig(Config):
# entries are never evicted based on time.
#
#expiry_time: 30m
# Controls how long the results of a /sync request are cached for after
# a successful response is returned. A higher duration can help clients with
# intermittent connections, at the cost of higher memory usage.
#
# By default, this is zero, which means that sync responses are not cached
# at all.
#
#sync_response_cache_duration: 2m
"""
def read_config(self, config, **kwargs):
@@ -212,6 +221,10 @@ class CacheConfig(Config):
else:
self.expiry_time_msec = None
self.sync_response_cache_duration = self.parse_duration(
cache_config.get("sync_response_cache_duration", 0)
)
# Resize all caches (if necessary) with the new factors we've loaded
self.resize_all_caches()
+1 -4
View File
@@ -71,7 +71,7 @@ handlers:
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
# logs will still be flushed immediately.
buffer:
class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
class: logging.handlers.MemoryHandler
target: file
# The capacity is the number of log lines that are buffered before
# being written to disk. Increasing this will lead to better
@@ -79,9 +79,6 @@ handlers:
# be written to disk.
capacity: 10
flushLevel: 30 # Flush for WARNING logs as well
# The period of time, in seconds, between forced flushes.
# Messages will not be delayed for longer than this time.
period: 5
# A handler that writes logs to stderr. Unused by default, but can be used
# instead of "buffer" and "file" in the logger handlers.
+19 -5
View File
@@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from collections import namedtuple
from typing import Dict, List
from urllib.request import getproxies_environment # type: ignore
from synapse.config.server import DEFAULT_IP_RANGE_BLACKLIST, generate_ip_set
from synapse.python_dependencies import DependencyException, check_requirements
@@ -22,6 +24,8 @@ from synapse.util.module_loader import load_module
from ._base import Config, ConfigError
logger = logging.getLogger(__name__)
DEFAULT_THUMBNAIL_SIZES = [
{"width": 32, "height": 32, "method": "crop"},
{"width": 96, "height": 96, "method": "crop"},
@@ -36,6 +40,9 @@ THUMBNAIL_SIZE_YAML = """\
# method: %(method)s
"""
HTTP_PROXY_SET_WARNING = """\
The Synapse config url_preview_ip_range_blacklist will be ignored as an HTTP(s) proxy is configured."""
ThumbnailRequirement = namedtuple(
"ThumbnailRequirement", ["width", "height", "method", "media_type"]
)
@@ -180,12 +187,17 @@ class ContentRepositoryConfig(Config):
e.message # noqa: B306, DependencyException.message is a property
)
proxy_env = getproxies_environment()
if "url_preview_ip_range_blacklist" not in config:
raise ConfigError(
"For security, you must specify an explicit target IP address "
"blacklist in url_preview_ip_range_blacklist for url previewing "
"to work"
)
if "http" not in proxy_env or "https" not in proxy_env:
raise ConfigError(
"For security, you must specify an explicit target IP address "
"blacklist in url_preview_ip_range_blacklist for url previewing "
"to work"
)
else:
if "http" in proxy_env or "https" in proxy_env:
logger.warning("".join(HTTP_PROXY_SET_WARNING))
# we always blacklist '0.0.0.0' and '::', which are supposed to be
# unroutable addresses.
@@ -292,6 +304,8 @@ class ContentRepositoryConfig(Config):
# This must be specified if url_preview_enabled is set. It is recommended that
# you uncomment the following list as a starting point.
#
# Note: The value is ignored when an HTTP proxy is in use
#
#url_preview_ip_range_blacklist:
%(ip_range_blacklist)s
+2
View File
@@ -960,6 +960,8 @@ class ServerConfig(Config):
#
# This option replaces federation_ip_range_blacklist in Synapse v1.25.0.
#
# Note: The value is ignored when an HTTP proxy is in use
#
#ip_range_blacklist:
%(ip_range_blacklist)s
+14 -9
View File
@@ -1290,7 +1290,7 @@ class FederationClient(FederationBase):
)
@attr.s(frozen=True, slots=True)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
"""Represents a single event in the result of a successful get_space_summary call.
@@ -1299,12 +1299,13 @@ class FederationSpaceSummaryEventResult:
object attributes.
"""
event_type = attr.ib(type=str)
state_key = attr.ib(type=str)
via = attr.ib(type=Sequence[str])
event_type: str
room_id: str
state_key: str
via: Sequence[str]
# the raw data, including the above keys
data = attr.ib(type=JsonDict)
data: JsonDict
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
@@ -1321,6 +1322,10 @@ class FederationSpaceSummaryEventResult:
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
room_id = d.get("room_id")
if not isinstance(room_id, str):
raise ValueError("Invalid event: 'room_id' must be a str")
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
@@ -1335,15 +1340,15 @@ class FederationSpaceSummaryEventResult:
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")
return cls(event_type, state_key, via, d)
return cls(event_type, room_id, state_key, via, d)
@attr.s(frozen=True, slots=True)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryResult:
"""Represents the data returned by a successful get_space_summary call."""
rooms = attr.ib(type=Sequence[JsonDict])
events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])
rooms: Sequence[JsonDict]
events: Sequence[FederationSpaceSummaryEventResult]
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
+3 -4
View File
@@ -392,9 +392,6 @@ class ApplicationServicesHandler:
protocols[p].append(info)
def _merge_instances(infos: List[JsonDict]) -> JsonDict:
if not infos:
return {}
# Merge the 'instances' lists of multiple results, but just take
# the other fields from the first as they ought to be identical
# copy the result so as not to corrupt the cached one
@@ -406,7 +403,9 @@ class ApplicationServicesHandler:
return combined
return {p: _merge_instances(protocols[p]) for p in protocols.keys()}
return {
p: _merge_instances(protocols[p]) for p in protocols.keys() if protocols[p]
}
async def _get_services_for_event(
self, event: EventBase
+113 -6
View File
@@ -42,6 +42,7 @@ from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RejectedReason,
@@ -262,7 +263,12 @@ class FederationHandler(BaseHandler):
state = None
# Get missing pdus if necessary.
# Check that the event passes auth based on the state at the event. This is
# done for events that are to be added to the timeline (non-outliers).
#
# Get missing pdus if necessary:
# - Fetching any missing prev events to fill in gaps in the graph
# - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +438,13 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
# A second round of checks for all events. Check that the event passes auth
# based on `auth_events`, this allows us to assert that the event would
# have been allowed at some point. If an event passes this check its OK
# for it to be used as part of a returned `/state` request, as either
# a) we received the event as part of the original join and so trust it, or
# b) we'll do a state resolution with existing state before it becomes
# part of the "current state", which adds more protection.
await self._process_received_pdu(origin, pdu, state=state)
async def _get_missing_events_for_pdu(
@@ -889,6 +902,79 @@ class FederationHandler(BaseHandler):
"resync_device_due_to_pdu", self._resync_device, event.sender
)
await self._handle_marker_event(origin, event)
async def _handle_marker_event(self, origin: str, marker_event: EventBase):
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
Args:
origin: Origin of the event. Will be called to get the insertion event
marker_event: The event to process
"""
if marker_event.type != EventTypes.MSC2716_MARKER:
# Not a marker event
return
if marker_event.rejected_reason is not None:
# Rejected event
return
# Skip processing a marker event if the room version doesn't
# support it.
room_version = await self.store.get_room_version(marker_event.room_id)
if not room_version.msc2716_historical:
return
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
)
if insertion_event_id is None:
# Nothing to retrieve then (invalid marker)
return
logger.debug(
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
)
await self._get_events_and_persist(
origin,
marker_event.room_id,
[insertion_event_id],
)
insertion_event = await self.store.get_event(
insertion_event_id, allow_none=True
)
if insertion_event is None:
logger.warning(
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
origin,
insertion_event_id,
marker_event.event_id,
)
return
logger.debug(
"_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
insertion_event,
marker_event,
)
await self.store.insert_insertion_extremity(
insertion_event_id, marker_event.room_id
)
logger.debug(
"_handle_marker_event: insertion extremity added for %s from marker event %s",
insertion_event,
marker_event,
)
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
@@ -1057,9 +1143,19 @@ class FederationHandler(BaseHandler):
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
insertion_events_to_be_backfilled,
)
if not extremities:
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False
@@ -1089,10 +1185,12 @@ class FederationHandler(BaseHandler):
# state *before* the event, ignoring the special casing certain event
# types have.
forward_events = await self.store.get_successor_events(list(extremities))
forward_event_ids = await self.store.get_successor_events(
list(oldest_events_with_depth)
)
extremities_events = await self.store.get_events(
forward_events,
forward_event_ids,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
@@ -1106,10 +1204,19 @@ class FederationHandler(BaseHandler):
redact=False,
check_history_visibility_only=True,
)
logger.debug(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
)
if not filtered_extremities:
if not filtered_extremities and not insertion_events_to_be_backfilled:
return False
extremities = {
**oldest_events_with_depth,
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
**insertion_events_to_be_backfilled,
}
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
+6
View File
@@ -824,6 +824,7 @@ class IdentityHandler(BaseHandler):
room_avatar_url: str,
room_join_rules: str,
room_name: str,
room_type: Optional[str],
inviter_display_name: str,
inviter_avatar_url: str,
id_access_token: Optional[str] = None,
@@ -843,6 +844,7 @@ class IdentityHandler(BaseHandler):
notifications.
room_join_rules: The join rules of the email (e.g. "public").
room_name: The m.room.name of the room.
room_type: The type of the room from its m.room.create event (e.g "m.space").
inviter_display_name: The current display name of the
inviter.
inviter_avatar_url: The URL of the inviter's avatar.
@@ -869,6 +871,10 @@ class IdentityHandler(BaseHandler):
"sender_display_name": inviter_display_name,
"sender_avatar_url": inviter_avatar_url,
}
if room_type is not None:
invite_config["org.matrix.msc3288.room_type"] = room_type
# If a custom web client location is available, include it in the request.
if self._web_client_location:
invite_config["org.matrix.web_client_location"] = self._web_client_location
+2 -1
View File
@@ -70,7 +70,8 @@ class ReceiptsHandler(BaseHandler):
)
if not is_in_room:
logger.info(
"Ignoring receipt from %s as we're not in the room",
"Ignoring receipt for room %r from server %s as we're not in the room",
room_id,
origin,
)
continue
+12 -1
View File
@@ -19,7 +19,12 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
from synapse import types
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.constants import (
AccountDataTypes,
EventContentFields,
EventTypes,
Membership,
)
from synapse.api.errors import (
AuthError,
Codes,
@@ -1237,6 +1242,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if room_name_event:
room_name = room_name_event.content.get("name", "")
room_type = None
room_create_event = room_state.get((EventTypes.Create, ""))
if room_create_event:
room_type = room_create_event.content.get(EventContentFields.ROOM_TYPE)
room_join_rules = ""
join_rules_event = room_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
@@ -1263,6 +1273,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_avatar_url=room_avatar_url,
room_join_rules=room_join_rules,
room_name=room_name,
room_type=room_type,
inviter_display_name=inviter_display_name,
inviter_avatar_url=inviter_avatar_url,
id_access_token=id_access_token,
+76 -49
View File
@@ -16,7 +16,17 @@ import itertools
import logging
import re
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
)
import attr
@@ -116,20 +126,22 @@ class SpaceSummaryHandler:
max_children = max_rooms_per_space if processed_rooms else None
if is_in_room:
room, events = await self._summarize_local_room(
room_entry = await self._summarize_local_room(
requester, None, room_id, suggested_only, max_children
)
events: Collection[JsonDict] = []
if room_entry:
rooms_result.append(room_entry.room)
events = room_entry.children
logger.debug(
"Query of local room %s returned events %s",
room_id,
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
if room:
rooms_result.append(room)
else:
fed_rooms, fed_events = await self._summarize_remote_room(
fed_rooms = await self._summarize_remote_room(
queue_entry,
suggested_only,
max_children,
@@ -141,12 +153,10 @@ class SpaceSummaryHandler:
# user is not permitted see.
#
# Filter the returned results to only what is accessible to the user.
room_ids = set()
events = []
for room in fed_rooms:
fed_room_id = room.get("room_id")
if not fed_room_id or not isinstance(fed_room_id, str):
continue
for room_entry in fed_rooms:
room = room_entry.room
fed_room_id = room_entry.room_id
# The room should only be included in the summary if:
# a. the user is in the room;
@@ -189,21 +199,17 @@ class SpaceSummaryHandler:
# The user can see the room, include it!
if include_room:
rooms_result.append(room)
room_ids.add(fed_room_id)
events.extend(room_entry.children)
# All rooms returned don't need visiting again (even if the user
# didn't have access to them).
processed_rooms.add(fed_room_id)
for event in fed_events:
if event.get("room_id") in room_ids:
events.append(event)
logger.debug(
"Query of %s returned rooms %s, events %s",
room_id,
[room.get("room_id") for room in fed_rooms],
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in fed_events],
[room_entry.room.get("room_id") for room_entry in fed_rooms],
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
# the room we queried may or may not have been returned, but don't process
@@ -283,20 +289,20 @@ class SpaceSummaryHandler:
# already done this room
continue
logger.debug("Processing room %s", room_id)
room, events = await self._summarize_local_room(
room_entry = await self._summarize_local_room(
None, origin, room_id, suggested_only, max_rooms_per_space
)
processed_rooms.add(room_id)
if room:
rooms_result.append(room)
events_result.extend(events)
if room_entry:
rooms_result.append(room_entry.room)
events_result.extend(room_entry.children)
# add any children to the queue
room_queue.extend(edge_event["state_key"] for edge_event in events)
# add any children to the queue
room_queue.extend(
edge_event["state_key"] for edge_event in room_entry.children
)
return {"rooms": rooms_result, "events": events_result}
@@ -307,7 +313,7 @@ class SpaceSummaryHandler:
room_id: str,
suggested_only: bool,
max_children: Optional[int],
) -> Tuple[Optional[JsonDict], Sequence[JsonDict]]:
) -> Optional["_RoomEntry"]:
"""
Generate a room entry and a list of event entries for a given room.
@@ -326,21 +332,16 @@ class SpaceSummaryHandler:
to a server-set limit.
Returns:
A tuple of:
The room information, if the room should be returned to the
user. None, otherwise.
An iterable of the sorted children events. This may be limited
to a maximum size or may include all children.
A room entry if the room should be returned. None, otherwise.
"""
if not await self._is_room_accessible(room_id, requester, origin):
return None, ()
return None
room_entry = await self._build_room_entry(room_id)
# If the room is not a space, return just the room information.
if room_entry.get("room_type") != RoomTypes.SPACE:
return room_entry, ()
return _RoomEntry(room_id, room_entry)
# Otherwise, look for child rooms/spaces.
child_events = await self._get_child_events(room_id)
@@ -363,7 +364,7 @@ class SpaceSummaryHandler:
)
)
return room_entry, events_result
return _RoomEntry(room_id, room_entry, events_result)
async def _summarize_remote_room(
self,
@@ -371,7 +372,7 @@ class SpaceSummaryHandler:
suggested_only: bool,
max_children: Optional[int],
exclude_rooms: Iterable[str],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
) -> Iterable["_RoomEntry"]:
"""
Request room entries and a list of event entries for a given room by querying a remote server.
@@ -386,11 +387,7 @@ class SpaceSummaryHandler:
Rooms IDs which do not need to be summarized.
Returns:
A tuple of:
An iterable of rooms.
An iterable of the sorted children events. This may be limited
to a maximum size or may include all children.
An iterable of room entries.
"""
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)
@@ -414,11 +411,30 @@ class SpaceSummaryHandler:
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return (), ()
return ()
return res.rooms, tuple(
ev.data for ev in res.events if ev.event_type == EventTypes.SpaceChild
)
# Group the events by their room.
children_by_room: Dict[str, List[JsonDict]] = {}
for ev in res.events:
if ev.event_type == EventTypes.SpaceChild:
children_by_room.setdefault(ev.room_id, []).append(ev.data)
# Generate the final results.
results = []
for fed_room in res.rooms:
fed_room_id = fed_room.get("room_id")
if not fed_room_id or not isinstance(fed_room_id, str):
continue
results.append(
_RoomEntry(
fed_room_id,
fed_room,
children_by_room.get(fed_room_id, []),
)
)
return results
async def _is_room_accessible(
self, room_id: str, requester: Optional[str], origin: Optional[str]
@@ -606,10 +622,21 @@ class SpaceSummaryHandler:
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
@attr.s(frozen=True, slots=True)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _RoomQueueEntry:
room_id = attr.ib(type=str)
via = attr.ib(type=Sequence[str])
room_id: str
via: Sequence[str]
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _RoomEntry:
room_id: str
# The room summary for this room.
room: JsonDict
# An iterable of the sorted, stripped children events for children of this room.
#
# This may not include all children.
children: Collection[JsonDict] = ()
def _has_valid_via(e: EventBase) -> bool:
+11 -3
View File
@@ -269,14 +269,22 @@ class SyncHandler:
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
hs.get_clock(), "sync"
)
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
self.state_store = self.storage.state
# TODO: flush cache entries on subsequent sync request.
# Once we get the next /sync request (ie, one with the same access token
# that sets 'since' to 'next_batch'), we know that device won't need a
# cached result any more, and we could flush the entry from the cache to save
# memory.
self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
hs.get_clock(),
"sync",
timeout_ms=hs.config.caches.sync_response_cache_duration,
)
# ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
self.lazy_loaded_members_cache: ExpiringCache[
Tuple[str, Optional[str]], LruCache[str, str]
+2 -1
View File
@@ -335,7 +335,8 @@ class TypingWriterHandler(FollowerTypingHandler):
)
if not is_in_room:
logger.info(
"Ignoring typing update from %s as we're not in the room",
"Ignoring typing update for room %r from server %s as we're not in the room",
room_id,
origin,
)
return
+1
View File
@@ -45,6 +45,7 @@ class PeriodicallyFlushingMemoryHandler(MemoryHandler):
self._flushing_thread: Thread = Thread(
name="PeriodicallyFlushingMemoryHandler flushing thread",
target=self._flush_periodically,
daemon=True,
)
self._flushing_thread.start()
+11 -1
View File
@@ -45,7 +45,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.types import JsonDict, Requester, UserID, UserInfo, create_requester
from synapse.util import Clock
from synapse.util.caches.descriptors import cached
@@ -174,6 +174,16 @@ class ModuleApi:
"""The application name configured in the homeserver's configuration."""
return self._hs.config.email.email_app_name
async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Get user info by user_id
Args:
user_id: Fully qualified user id.
Returns:
UserInfo object if a user was found, otherwise None
"""
return await self._store.get_userinfo_by_id(user_id)
async def get_user_by_req(
self,
req: SynapseRequest,
+1 -1
View File
@@ -43,7 +43,7 @@ class ReceiptRestServlet(RestServlet):
if receipt_type != "m.read":
raise SynapseError(400, "Receipt type must be 'm.read'")
body = parse_json_object_from_request(request)
body = parse_json_object_from_request(request, allow_empty_body=True)
hidden = body.get(ReadReceiptEventFields.MSC2285_HIDDEN, False)
if not isinstance(hidden, bool):
+7 -7
View File
@@ -941,13 +941,13 @@ class DatabasePool:
`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:
* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
* we somehow know that we are the only thread which will be updating
this table.
1. there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
2. we somehow know that we are the only thread which will be updating
this table.
As an additional note, this parameter only matters for old SQLite versions
because we will use native upserts otherwise.
Args:
table: The table to upsert into
+127 -63
View File
@@ -755,81 +755,145 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
"""
@trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" LIMIT 1"
)
fallback_sql = (
"SELECT key_id, key_json, used FROM e2e_fallback_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" LIMIT 1"
)
result = {}
delete = []
used_fallbacks = []
for user_id, device_id, algorithm in query_list:
user_result = result.setdefault(user_id, {})
device_result = user_result.setdefault(device_id, {})
txn.execute(sql, (user_id, device_id, algorithm))
otk_row = txn.fetchone()
if otk_row is not None:
key_id, key_json = otk_row
device_result[algorithm + ":" + key_id] = key_json
delete.append((user_id, device_id, algorithm, key_id))
else:
# no one-time key available, so see if there's a fallback
# key
txn.execute(fallback_sql, (user_id, device_id, algorithm))
fallback_row = txn.fetchone()
if fallback_row is not None:
key_id, key_json, used = fallback_row
device_result[algorithm + ":" + key_id] = key_json
if not used:
used_fallbacks.append(
(user_id, device_id, algorithm, key_id)
)
def _claim_e2e_one_time_key_simple(
txn, user_id: str, device_id: str, algorithm: str
) -> Optional[Tuple[str, str]]:
"""Claim OTK for device for DBs that don't support RETURNING.
# drop any one-time keys that were claimed
sql = (
"DELETE FROM e2e_one_time_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" AND key_id = ?"
Returns:
A tuple of key name (algorithm + key ID) and key JSON, if an
OTK was found.
"""
sql = """
SELECT key_id, key_json FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
LIMIT 1
"""
txn.execute(sql, (user_id, device_id, algorithm))
otk_row = txn.fetchone()
if otk_row is None:
return None
key_id, key_json = otk_row
self.db_pool.simple_delete_one_txn(
txn,
table="e2e_one_time_keys_json",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
},
)
for user_id, device_id, algorithm, key_id in delete:
log_kv(
{
"message": "Executing claim e2e_one_time_keys transaction on database."
}
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
return f"{algorithm}:{key_id}", key_json
@trace
def _claim_e2e_one_time_key_returning(
txn, user_id: str, device_id: str, algorithm: str
) -> Optional[Tuple[str, str]]:
"""Claim OTK for device for DBs that support RETURNING.
Returns:
A tuple of key name (algorithm + key ID) and key JSON, if an
OTK was found.
"""
# We can use RETURNING to do the fetch and DELETE in once step.
sql = """
DELETE FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
AND key_id IN (
SELECT key_id FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
LIMIT 1
)
RETURNING key_id, key_json
"""
txn.execute(
sql, (user_id, device_id, algorithm, user_id, device_id, algorithm)
)
otk_row = txn.fetchone()
if otk_row is None:
return None
key_id, key_json = otk_row
return f"{algorithm}:{key_id}", key_json
results = {}
for user_id, device_id, algorithm in query_list:
if self.database_engine.supports_returning:
# If we support RETURNING clause we can use a single query that
# allows us to use autocommit mode.
_claim_e2e_one_time_key = _claim_e2e_one_time_key_returning
db_autocommit = True
else:
_claim_e2e_one_time_key = _claim_e2e_one_time_key_simple
db_autocommit = False
row = await self.db_pool.runInteraction(
"claim_e2e_one_time_keys",
_claim_e2e_one_time_key,
user_id,
device_id,
algorithm,
db_autocommit=db_autocommit,
)
if row:
device_results = results.setdefault(user_id, {}).setdefault(
device_id, {}
)
txn.execute(sql, (user_id, device_id, algorithm, key_id))
log_kv({"message": "finished executing and invalidating cache"})
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
# mark fallback keys as used
for user_id, device_id, algorithm, key_id in used_fallbacks:
self.db_pool.simple_update_txn(
txn,
"e2e_fallback_keys_json",
{
device_results[row[0]] = row[1]
continue
# No one-time key available, so see if there's a fallback
# key
row = await self.db_pool.simple_select_one(
table="e2e_fallback_keys_json",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
},
retcols=("key_id", "key_json", "used"),
desc="_get_fallback_key",
allow_none=True,
)
if row is None:
continue
key_id = row["key_id"]
key_json = row["key_json"]
used = row["used"]
# Mark fallback key as used if not already.
if not used:
await self.db_pool.simple_update_one(
table="e2e_fallback_keys_json",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
},
{"used": True},
updatevalues={"used": True},
desc="_get_fallback_key_set_used",
)
self._invalidate_cache_and_stream(
txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
await self.invalidate_cache_and_stream(
"get_e2e_unused_fallback_key_types", (user_id, device_id)
)
return result
device_results = results.setdefault(user_id, {}).setdefault(device_id, {})
device_results[f"{algorithm}:{key_id}"] = key_json
return await self.db_pool.runInteraction(
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)
return results
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
@@ -671,28 +671,98 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Return all events where not all sets can reach them.
return {eid for eid, n in event_to_missing_sets.items() if n}
async def get_oldest_events_with_depth_in_room(self, room_id):
async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
"""Gets the oldest events(backwards extremities) in the room along with the
aproximate depth.
We use this function so that we can compare and see if someones current
depth at their current scrollback is within pagination range of the
event extremeties. If the current depth is close to the depth of given
oldest event, we can trigger a backfill.
Args:
room_id: Room where we want to find the oldest events
Returns:
Map from event_id to depth
"""
def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
# Assemble a dictionary with event_id -> depth for the oldest events
# we know of in the room. Backwards extremeties are the oldest
# events we know of in the room but we only know of them because
# some other event referenced them by prev_event and aren't peristed
# in our database yet (meaning we don't know their depth
# specifically). So we need to look for the aproximate depth from
# the events connected to the current backwards extremeties.
sql = """
SELECT b.event_id, MAX(e.depth) FROM events as e
/**
* Get the edge connections from the event_edges table
* so we can see whether this event's prev_events points
* to a backward extremity in the next join.
*/
INNER JOIN event_edges as g
ON g.event_id = e.event_id
/**
* We find the "oldest" events in the room by looking for
* events connected to backwards extremeties (oldest events
* in the room that we know of so far).
*/
INNER JOIN event_backward_extremities as b
ON g.prev_event_id = b.event_id
WHERE b.room_id = ? AND g.is_state is ?
GROUP BY b.event_id
"""
txn.execute(sql, (room_id, False))
return dict(txn)
return await self.db_pool.runInteraction(
"get_oldest_events_with_depth_in_room",
self.get_oldest_events_with_depth_in_room_txn,
"get_oldest_event_ids_with_depth_in_room",
get_oldest_event_ids_with_depth_in_room_txn,
room_id,
)
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
async def get_insertion_event_backwards_extremities_in_room(
self, room_id
) -> Dict[str, int]:
"""Get the insertion events we know about that we haven't backfilled yet.
We use this function so that we can compare and see if someones current
depth at their current scrollback is within pagination range of the
insertion event. If the current depth is close to the depth of given
insertion event, we can trigger a backfill.
Args:
room_id: Room where we want to find the oldest events
Returns:
Map from event_id to depth
"""
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
INNER JOIN insertion_event_extremities as b USING (event_id)
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE b.room_id = ?
GROUP BY b.event_id
"""
txn.execute(sql, (room_id,))
return dict(txn)
return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
room_id,
)
txn.execute(sql, (room_id, False))
return dict(txn)
async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
if row[1] not in event_results:
queue.put((-row[0], row[1]))
# Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
@@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",
keyvalues={"event_id": event_id},
values={
"event_id": event_id,
"room_id": room_id,
},
insertion_values={},
desc="insert_insertion_extremity",
lock=False,
)
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
+20 -4
View File
@@ -1845,6 +1845,18 @@ class PersistEventsStore:
},
)
# When we receive an event with a `chunk_id` referencing the
# `next_chunk_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
WHERE next_chunk_id = ?
)
"""
txn.execute(sql, (chunk_id,))
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
@@ -2101,15 +2113,17 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)
@@ -2123,6 +2137,8 @@ class PersistEventsStore:
],
)
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
+105 -39
View File
@@ -14,7 +14,6 @@
import logging
import threading
from collections import namedtuple
from typing import (
Collection,
Container,
@@ -27,6 +26,7 @@ from typing import (
overload,
)
import attr
from constantly import NamedConstant, Names
from typing_extensions import Literal
@@ -42,7 +42,11 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.logging.context import (
PreserveLoggingContext,
current_context,
make_deferred_yieldable,
)
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -56,6 +60,8 @@ from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@@ -74,7 +80,10 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
@attr.s(slots=True, auto_attribs=True)
class _EventCacheEntry:
event: EventBase
redacted_event: Optional[EventBase]
class EventRedactBehaviour(Names):
@@ -161,6 +170,13 @@ class EventsWorkerStore(SQLBaseStore):
max_size=hs.config.caches.event_cache_size,
)
# Map from event ID to a deferred that will result in a map from event
# ID to cache entry. Note that the returned dict may not have the
# requested event in it if the event isn't in the DB.
self._current_event_fetches: Dict[
str, ObservableDeferred[Dict[str, _EventCacheEntry]]
] = {}
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
@@ -476,7 +492,9 @@ class EventsWorkerStore(SQLBaseStore):
return events
async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
async def _get_events_from_cache_or_db(
self, event_ids: Iterable[str], allow_rejected: bool = False
) -> Dict[str, _EventCacheEntry]:
"""Fetch a bunch of events from the cache or the database.
If events are pulled from the database, they will be cached for future lookups.
@@ -485,53 +503,107 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
event_ids: The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events. If False,
allow_rejected: Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns:
Dict[str, _EventCacheEntry]:
map from event id to result
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_ids, allow_rejected=allow_rejected
event_ids,
)
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
missing_events_ids = {e for e in event_ids if e not in event_entry_map}
# We now look up if we're already fetching some of the events in the DB,
# if so we wait for those lookups to finish instead of pulling the same
# events out of the DB multiple times.
already_fetching: Dict[str, defer.Deferred] = {}
for event_id in missing_events_ids:
deferred = self._current_event_fetches.get(event_id)
if deferred is not None:
# We're already pulling the event out of the DB. Add the deferred
# to the collection of deferreds to wait on.
already_fetching[event_id] = deferred.observe()
missing_events_ids.difference_update(already_fetching)
if missing_events_ids:
log_ctx = current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
# Add entries to `self._current_event_fetches` for each event we're
# going to pull from the DB. We use a single deferred that resolves
# to all the events we pulled from the DB (this will result in this
# function returning more events than requested, but that can happen
# already due to `_get_events_from_db`).
fetching_deferred: ObservableDeferred[
Dict[str, _EventCacheEntry]
] = ObservableDeferred(defer.Deferred())
for event_id in missing_events_ids:
self._current_event_fetches[event_id] = fetching_deferred
# Note that _get_events_from_db is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
missing_events = await self._get_events_from_db(
missing_events_ids, allow_rejected=allow_rejected
)
try:
missing_events = await self._get_events_from_db(
missing_events_ids,
)
event_entry_map.update(missing_events)
event_entry_map.update(missing_events)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
raise e
finally:
# Ensure that we mark these events as no longer being fetched.
for event_id in missing_events_ids:
self._current_event_fetches.pop(event_id, None)
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)
if already_fetching:
# Wait for the other event requests to finish and add their results
# to ours.
results = await make_deferred_yieldable(
defer.gatherResults(
already_fetching.values(),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
for result in results:
event_entry_map.update(result)
if not allow_rejected:
event_entry_map = {
event_id: entry
for event_id, entry in event_entry_map.items()
if not entry.event.rejected_reason
}
return event_entry_map
def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
"""Fetch events from the caches
def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, _EventCacheEntry]:
"""Fetch events from the caches.
May return rejected events.
Args:
events (Iterable[str]): list of event_ids to fetch
allow_rejected (bool): Whether to return events that were rejected
update_metrics (bool): Whether to update the cache hit ratio metrics
Returns:
dict of event_id -> _EventCacheEntry for each event_id in cache. If
allow_rejected is `False` then there will still be an entry but it
will be `None`
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = {}
@@ -542,10 +614,7 @@ class EventsWorkerStore(SQLBaseStore):
if not ret:
continue
if allow_rejected or not ret.event.rejected_reason:
event_map[event_id] = ret
else:
event_map[event_id] = None
event_map[event_id] = ret
return event_map
@@ -672,23 +741,23 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list, e)
async def _get_events_from_db(self, event_ids, allow_rejected=False):
async def _get_events_from_db(
self, event_ids: Iterable[str]
) -> Dict[str, _EventCacheEntry]:
"""Fetch a bunch of events from the database.
May return rejected events.
Returned events will be added to the cache for future lookups.
Unknown events are omitted from the response.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
event_ids: The event_ids of the events to fetch
Returns:
Dict[str, _EventCacheEntry]:
map from event id to result. May return extra events which
weren't asked for.
map from event id to result. May return extra events which
weren't asked for.
"""
fetched_events = {}
events_to_fetch = event_ids
@@ -717,9 +786,6 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason = row["rejected_reason"]
if not allow_rejected and rejected_reason:
continue
# If the event or metadata cannot be parsed, log the error and act
# as if the event is unknown.
try:
+29 -1
View File
@@ -29,7 +29,7 @@ from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.types import Connection, Cursor
from synapse.storage.util.id_generators import IdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import UserID
from synapse.types import UserID, UserInfo
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
@@ -146,6 +146,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
@cached()
async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Deprecated: use get_userinfo_by_id instead"""
return await self.db_pool.simple_select_one(
table="users",
keyvalues={"name": user_id},
@@ -166,6 +167,33 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
desc="get_user_by_id",
)
async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Get a UserInfo object for a user by user ID.
Note! Currently uses the cache of `get_user_by_id`. Once that deprecated method is removed,
this method should be cached.
Args:
user_id: The user to fetch user info for.
Returns:
`UserInfo` object if user found, otherwise `None`.
"""
user_data = await self.get_user_by_id(user_id)
if not user_data:
return None
return UserInfo(
appservice_id=user_data["appservice_id"],
consent_server_notice_sent=user_data["consent_server_notice_sent"],
consent_version=user_data["consent_version"],
creation_ts=user_data["creation_ts"],
is_admin=bool(user_data["admin"]),
is_deactivated=bool(user_data["deactivated"]),
is_guest=bool(user_data["is_guest"]),
is_shadow_banned=bool(user_data["shadow_banned"]),
user_id=UserID.from_string(user_data["name"]),
user_type=user_data["user_type"],
)
async def is_trial_user(self, user_id: str) -> bool:
"""Checks if user is in the "trial" period, i.e. within the first
N days of registration defined by `mau_trial_days` config
+2 -4
View File
@@ -629,14 +629,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(
member_event_ids, allow_rejected=False, update_metrics=False
)
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
missing_member_event_ids = []
for event_id in member_event_ids:
ev_entry = event_map.get(event_id)
if ev_entry:
if ev_entry and not ev_entry.event.rejected_reason:
if ev_entry.event.membership == Membership.JOIN:
users_in_room[ev_entry.event.state_key] = ProfileInfo(
display_name=ev_entry.event.content.get("displayname", None),
+1 -1
View File
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SCHEMA_VERSION = 61
SCHEMA_VERSION = 62
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@@ -0,0 +1,24 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a table that keeps track of which "insertion" events need to be backfilled
CREATE TABLE IF NOT EXISTS insertion_event_extremities(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);
+29
View File
@@ -751,3 +751,32 @@ def get_verify_key_from_cross_signing_key(key_info):
# and return that one key
for key_id, key_data in keys.items():
return (key_id, decode_verify_key_bytes(key_id, decode_base64(key_data)))
@attr.s(auto_attribs=True, frozen=True, slots=True)
class UserInfo:
"""Holds information about a user. Result of get_userinfo_by_id.
Attributes:
user_id: ID of the user.
appservice_id: Application service ID that created this user.
consent_server_notice_sent: Version of policy documents the user has been sent.
consent_version: Version of policy documents the user has consented to.
creation_ts: Creation timestamp of the user.
is_admin: True if the user is an admin.
is_deactivated: True if the user has been deactivated.
is_guest: True if the user is a guest user.
is_shadow_banned: True if the user has been shadow-banned.
user_type: User type (None for normal user, 'support' and 'bot' other options).
"""
user_id: UserID
appservice_id: Optional[int]
consent_server_notice_sent: Optional[str]
consent_version: Optional[str]
user_type: Optional[str]
creation_ts: int
is_admin: bool
is_deactivated: bool
is_guest: bool
is_shadow_banned: bool
+121 -1
View File
@@ -133,11 +133,131 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.assertEquals(result.room_id, room_id)
self.assertEquals(result.servers, servers)
def _mkservice(self, is_interested):
def test_get_3pe_protocols_no_appservices(self):
self.mock_store.get_app_services.return_value = []
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols("my-protocol"))
)
self.mock_as_api.get_3pe_protocol.assert_not_called()
self.assertEquals(response, {})
def test_get_3pe_protocols_no_protocols(self):
service = self._mkservice(False, [])
self.mock_store.get_app_services.return_value = [service]
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols())
)
self.mock_as_api.get_3pe_protocol.assert_not_called()
self.assertEquals(response, {})
def test_get_3pe_protocols_protocol_no_response(self):
service = self._mkservice(False, ["my-protocol"])
self.mock_store.get_app_services.return_value = [service]
self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(None)
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols())
)
self.mock_as_api.get_3pe_protocol.assert_called_once_with(
service, "my-protocol"
)
self.assertEquals(response, {})
def test_get_3pe_protocols_select_one_protocol(self):
service = self._mkservice(False, ["my-protocol"])
self.mock_store.get_app_services.return_value = [service]
self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(
{"x-protocol-data": 42, "instances": []}
)
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols("my-protocol"))
)
self.mock_as_api.get_3pe_protocol.assert_called_once_with(
service, "my-protocol"
)
self.assertEquals(
response, {"my-protocol": {"x-protocol-data": 42, "instances": []}}
)
def test_get_3pe_protocols_one_protocol(self):
service = self._mkservice(False, ["my-protocol"])
self.mock_store.get_app_services.return_value = [service]
self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(
{"x-protocol-data": 42, "instances": []}
)
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols())
)
self.mock_as_api.get_3pe_protocol.assert_called_once_with(
service, "my-protocol"
)
self.assertEquals(
response, {"my-protocol": {"x-protocol-data": 42, "instances": []}}
)
def test_get_3pe_protocols_multiple_protocol(self):
service_one = self._mkservice(False, ["my-protocol"])
service_two = self._mkservice(False, ["other-protocol"])
self.mock_store.get_app_services.return_value = [service_one, service_two]
self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(
{"x-protocol-data": 42, "instances": []}
)
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols())
)
self.mock_as_api.get_3pe_protocol.assert_called()
self.assertEquals(
response,
{
"my-protocol": {"x-protocol-data": 42, "instances": []},
"other-protocol": {"x-protocol-data": 42, "instances": []},
},
)
def test_get_3pe_protocols_multiple_info(self):
service_one = self._mkservice(False, ["my-protocol"])
service_two = self._mkservice(False, ["my-protocol"])
async def get_3pe_protocol(service, unusedProtocol):
if service == service_one:
return {
"x-protocol-data": 42,
"instances": [{"desc": "Alice's service"}],
}
if service == service_two:
return {
"x-protocol-data": 36,
"x-not-used": 45,
"instances": [{"desc": "Bob's service"}],
}
raise Exception("Unexpected service")
self.mock_store.get_app_services.return_value = [service_one, service_two]
self.mock_as_api.get_3pe_protocol = get_3pe_protocol
response = self.successResultOf(
defer.ensureDeferred(self.handler.get_3pe_protocols())
)
# It's expected that the second service's data doesn't appear in the response
self.assertEquals(
response,
{
"my-protocol": {
"x-protocol-data": 42,
"instances": [
{
"desc": "Alice's service",
},
{"desc": "Bob's service"},
],
},
},
)
def _mkservice(self, is_interested, protocols=None):
service = Mock()
service.is_interested.return_value = make_awaitable(is_interested)
service.token = "mock_service_token"
service.url = "mock_service_url"
service.protocols = protocols
return service
def _mkservice_alias(self, is_interested_in_alias):
+107 -78
View File
@@ -26,7 +26,7 @@ from synapse.api.constants import (
from synapse.api.errors import AuthError
from synapse.api.room_versions import RoomVersions
from synapse.events import make_event_from_dict
from synapse.handlers.space_summary import _child_events_comparison_key
from synapse.handlers.space_summary import _child_events_comparison_key, _RoomEntry
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.server import HomeServer
@@ -351,26 +351,30 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase):
# events before child events).
# Note that these entries are brief, but should contain enough info.
rooms = [
{
"room_id": subspace,
"world_readable": True,
"room_type": RoomTypes.SPACE,
},
{
"room_id": subroom,
"world_readable": True,
},
return [
_RoomEntry(
subspace,
{
"room_id": subspace,
"world_readable": True,
"room_type": RoomTypes.SPACE,
},
[
{
"room_id": subspace,
"state_key": subroom,
"content": {"via": [fed_hostname]},
}
],
),
_RoomEntry(
subroom,
{
"room_id": subroom,
"world_readable": True,
},
),
]
event_content = {"via": [fed_hostname]}
events = [
{
"room_id": subspace,
"state_key": subroom,
"content": event_content,
},
]
return rooms, events
# Add a room to the space which is on another server.
self._add_child(self.space, subspace, self.token)
@@ -436,70 +440,95 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase):
):
# Note that these entries are brief, but should contain enough info.
rooms = [
{
"room_id": public_room,
"world_readable": False,
"join_rules": JoinRules.PUBLIC,
},
{
"room_id": knock_room,
"world_readable": False,
"join_rules": JoinRules.KNOCK,
},
{
"room_id": not_invited_room,
"world_readable": False,
"join_rules": JoinRules.INVITE,
},
{
"room_id": invited_room,
"world_readable": False,
"join_rules": JoinRules.INVITE,
},
{
"room_id": restricted_room,
"world_readable": False,
"join_rules": JoinRules.MSC3083_RESTRICTED,
"allowed_spaces": [],
},
{
"room_id": restricted_accessible_room,
"world_readable": False,
"join_rules": JoinRules.MSC3083_RESTRICTED,
"allowed_spaces": [self.room],
},
{
"room_id": world_readable_room,
"world_readable": True,
"join_rules": JoinRules.INVITE,
},
{
"room_id": joined_room,
"world_readable": False,
"join_rules": JoinRules.INVITE,
},
]
# Place each room in the sub-space.
event_content = {"via": [fed_hostname]}
events = [
{
"room_id": subspace,
"state_key": room["room_id"],
"content": event_content,
}
for room in rooms
_RoomEntry(
public_room,
{
"room_id": public_room,
"world_readable": False,
"join_rules": JoinRules.PUBLIC,
},
),
_RoomEntry(
knock_room,
{
"room_id": knock_room,
"world_readable": False,
"join_rules": JoinRules.KNOCK,
},
),
_RoomEntry(
not_invited_room,
{
"room_id": not_invited_room,
"world_readable": False,
"join_rules": JoinRules.INVITE,
},
),
_RoomEntry(
invited_room,
{
"room_id": invited_room,
"world_readable": False,
"join_rules": JoinRules.INVITE,
},
),
_RoomEntry(
restricted_room,
{
"room_id": restricted_room,
"world_readable": False,
"join_rules": JoinRules.MSC3083_RESTRICTED,
"allowed_spaces": [],
},
),
_RoomEntry(
restricted_accessible_room,
{
"room_id": restricted_accessible_room,
"world_readable": False,
"join_rules": JoinRules.MSC3083_RESTRICTED,
"allowed_spaces": [self.room],
},
),
_RoomEntry(
world_readable_room,
{
"room_id": world_readable_room,
"world_readable": True,
"join_rules": JoinRules.INVITE,
},
),
_RoomEntry(
joined_room,
{
"room_id": joined_room,
"world_readable": False,
"join_rules": JoinRules.INVITE,
},
),
]
# Also include the subspace.
rooms.insert(
0,
{
"room_id": subspace,
"world_readable": True,
},
_RoomEntry(
subspace,
{
"room_id": subspace,
"world_readable": True,
},
# Place each room in the sub-space.
[
{
"room_id": subspace,
"state_key": room.room_id,
"content": {"via": [fed_hostname]},
}
for room in rooms
],
),
)
return rooms, events
return rooms
# Add a room to the space which is on another server.
self._add_child(self.space, subspace, self.token)
+10
View File
@@ -79,6 +79,16 @@ class ModuleApiTestCase(HomeserverTestCase):
displayname = self.get_success(self.store.get_profile_displayname("bob"))
self.assertEqual(displayname, "Bobberino")
def test_get_userinfo_by_id(self):
user_id = self.register_user("alice", "1234")
found_user = self.get_success(self.module_api.get_userinfo_by_id(user_id))
self.assertEqual(found_user.user_id.to_string(), user_id)
self.assertIdentical(found_user.is_admin, False)
def test_get_userinfo_by_id__no_user_found(self):
found_user = self.get_success(self.module_api.get_userinfo_by_id("@alice:test"))
self.assertIsNone(found_user)
def test_sending_events_into_room(self):
"""Tests that a module can send events into a room"""
# Mock out create_and_send_nonmember_event to check whether events are being sent
+12
View File
@@ -418,6 +418,18 @@ class ReadReceiptsTestCase(unittest.HomeserverTestCase):
# Test that the first user can't see the other user's hidden read receipt
self.assertEqual(self._get_read_receipt(), None)
def test_read_receipt_with_empty_body(self):
# Send a message as the first user
res = self.helper.send(self.room_id, body="hello", tok=self.tok)
# Send a read receipt for this message with an empty body
channel = self.make_request(
"POST",
"/rooms/%s/receipt/m.read/%s" % (self.room_id, res["event_id"]),
access_token=self.tok2,
)
self.assertEqual(channel.code, 200)
def _get_read_receipt(self):
"""Syncs and returns the read receipt."""
@@ -14,7 +14,10 @@
import json
from synapse.logging.context import LoggingContext
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.util.async_helpers import yieldable_gather_results
from tests import unittest
@@ -94,3 +97,50 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
self.assertEquals(res, {"event10"})
self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
class EventCacheTestCase(unittest.HomeserverTestCase):
"""Test that the various layers of event cache works."""
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store: EventsWorkerStore = hs.get_datastore()
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room = self.helper.create_room_as(self.user, tok=self.token)
res = self.helper.send(self.room, tok=self.token)
self.event_id = res["event_id"]
# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
def test_simple(self):
"""Test that we cache events that we pull from the DB."""
with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))
# We should have fetched the event from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
def test_dedupe(self):
"""Test that if we request the same event multiple times we only pull it
out once.
"""
with LoggingContext("test") as ctx:
d = yieldable_gather_results(
self.store.get_event, [self.event_id, self.event_id]
)
self.get_success(d)
# We should have fetched the event from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)