1
0

Compare commits

..

2 Commits

Author SHA1 Message Date
Erik Johnston a48296dd86 WIP docs 2021-07-28 11:06:24 +01:00
Erik Johnston 13f9422e38 Allow /typing to be handled by any worker 2021-07-28 10:58:45 +01:00
102 changed files with 485 additions and 2449 deletions
+9 -12
View File
@@ -367,16 +367,13 @@ jobs:
- name: Set build result
env:
NEEDS_CONTEXT: ${{ toJSON(needs) }}
# the `jq` incantation dumps out a series of "<job> <result>" lines.
# we set it to an intermediate variable to avoid a pipe, which makes it
# hard to set $rc.
# the `jq` incantation dumps out a series of "<job> <result>" lines
run: |
rc=0
results=$(jq -r 'to_entries[] | [.key,.value.result] | join(" ")' <<< $NEEDS_CONTEXT)
while read job result ; do
if [ "$result" != "success" ]; then
echo "::set-failed ::Job $job returned $result"
rc=1
fi
done <<< $results
exit $rc
set -o pipefail
jq -r 'to_entries[] | [.key,.value.result] | join(" ")' \
<<< $NEEDS_CONTEXT |
while read job result; do
if [ "$result" != "success" ]; then
echo "::set-failed ::Job $job returned $result"
fi
done
+4 -99
View File
@@ -1,105 +1,10 @@
Synapse 1.40.0rc2 (2021-08-04)
==============================
Bugfixes
--------
- Fix the `PeriodicallyFlushingMemoryHandler` inhibiting application shutdown because of its background thread. ([\#10517](https://github.com/matrix-org/synapse/issues/10517))
- Fix a bug introduced in Synapse v1.40.0rc1 that could cause Synapse to respond with an error when clients would update read receipts. ([\#10531](https://github.com/matrix-org/synapse/issues/10531))
Internal Changes
----------------
- Fix release script to open the correct URL for the release. ([\#10516](https://github.com/matrix-org/synapse/issues/10516))
Synapse 1.40.0rc1 (2021-08-03)
==============================
Features
--------
- Add support for [MSC2033](https://github.com/matrix-org/matrix-doc/pull/2033): `device_id` on `/account/whoami`. ([\#9918](https://github.com/matrix-org/synapse/issues/9918))
- Update support for [MSC2716 - Incrementally importing history into existing rooms](https://github.com/matrix-org/matrix-doc/pull/2716). ([\#10245](https://github.com/matrix-org/synapse/issues/10245), [\#10432](https://github.com/matrix-org/synapse/issues/10432), [\#10463](https://github.com/matrix-org/synapse/issues/10463))
- Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events. ([\#10254](https://github.com/matrix-org/synapse/issues/10254), [\#10447](https://github.com/matrix-org/synapse/issues/10447), [\#10489](https://github.com/matrix-org/synapse/issues/10489))
- Initial support for [MSC3244](https://github.com/matrix-org/matrix-doc/pull/3244), Room version capabilities over the /capabilities API. ([\#10283](https://github.com/matrix-org/synapse/issues/10283))
- Add a buffered logging handler which periodically flushes itself. ([\#10407](https://github.com/matrix-org/synapse/issues/10407), [\#10515](https://github.com/matrix-org/synapse/issues/10515))
- Add support for https connections to a proxy server. Contributed by @Bubu and @dklimpel. ([\#10411](https://github.com/matrix-org/synapse/issues/10411))
- Support for [MSC2285 (hidden read receipts)](https://github.com/matrix-org/matrix-doc/pull/2285). Contributed by @SimonBrandner. ([\#10413](https://github.com/matrix-org/synapse/issues/10413))
- Email notifications now state whether an invitation is to a room or a space. ([\#10426](https://github.com/matrix-org/synapse/issues/10426))
- Allow setting transaction limit for database connections. ([\#10440](https://github.com/matrix-org/synapse/issues/10440), [\#10511](https://github.com/matrix-org/synapse/issues/10511))
- Add `creation_ts` to "list users" admin API. ([\#10448](https://github.com/matrix-org/synapse/issues/10448))
Bugfixes
--------
- Improve character set detection in URL previews by supporting underscores (in addition to hyphens). Contributed by @srividyut. ([\#10410](https://github.com/matrix-org/synapse/issues/10410))
- Fix events being incorrectly rejected over federation if they reference auth events that the server needed to fetch. ([\#10439](https://github.com/matrix-org/synapse/issues/10439))
- Fix `synapse_federation_server_oldest_inbound_pdu_in_staging` Prometheus metric to not report a max age of 51 years when the queue is empty. ([\#10455](https://github.com/matrix-org/synapse/issues/10455))
- Fix a bug which caused an explicit assignment of power-level 0 to a user to be misinterpreted in rare circumstances. ([\#10499](https://github.com/matrix-org/synapse/issues/10499))
Improved Documentation
----------------------
- Fix hierarchy of providers on the OpenID page. ([\#10445](https://github.com/matrix-org/synapse/issues/10445))
- Consolidate development documentation to `docs/development/`. ([\#10453](https://github.com/matrix-org/synapse/issues/10453))
- Add some developer docs to explain room DAG concepts like `outliers`, `state_groups`, `depth`, etc. ([\#10464](https://github.com/matrix-org/synapse/issues/10464))
- Document how to use Complement while developing a new Synapse feature. ([\#10483](https://github.com/matrix-org/synapse/issues/10483))
Internal Changes
----------------
- Prune inbound federation queues for a room if they get too large. ([\#10390](https://github.com/matrix-org/synapse/issues/10390))
- Add type hints to `synapse.federation.transport.client` module. ([\#10408](https://github.com/matrix-org/synapse/issues/10408))
- Remove shebang line from module files. ([\#10415](https://github.com/matrix-org/synapse/issues/10415))
- Drop backwards-compatibility code that was required to support Ubuntu Xenial. ([\#10429](https://github.com/matrix-org/synapse/issues/10429))
- Use a docker image cache for the prerequisites for the debian package build. ([\#10431](https://github.com/matrix-org/synapse/issues/10431))
- Improve servlet type hints. ([\#10437](https://github.com/matrix-org/synapse/issues/10437), [\#10438](https://github.com/matrix-org/synapse/issues/10438))
- Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages. ([\#10442](https://github.com/matrix-org/synapse/issues/10442))
- Update the `tests-done` Github Actions status. ([\#10444](https://github.com/matrix-org/synapse/issues/10444), [\#10512](https://github.com/matrix-org/synapse/issues/10512))
- Update type annotations to work with forthcoming Twisted 21.7.0 release. ([\#10446](https://github.com/matrix-org/synapse/issues/10446), [\#10450](https://github.com/matrix-org/synapse/issues/10450))
- Cancel redundant GHA workflows when a new commit is pushed. ([\#10451](https://github.com/matrix-org/synapse/issues/10451))
- Mitigate media repo XSS attacks on IE11 via the non-standard X-Content-Security-Policy header. ([\#10468](https://github.com/matrix-org/synapse/issues/10468))
- Additional type hints in the state handler. ([\#10482](https://github.com/matrix-org/synapse/issues/10482))
- Update syntax used to run complement tests. ([\#10488](https://github.com/matrix-org/synapse/issues/10488))
- Fix up type annotations to work with Twisted 21.7. ([\#10490](https://github.com/matrix-org/synapse/issues/10490))
- Improve type annotations for `ObservableDeferred`. ([\#10491](https://github.com/matrix-org/synapse/issues/10491))
- Extend release script to also tag and create GitHub releases. ([\#10496](https://github.com/matrix-org/synapse/issues/10496))
- Fix a bug which caused production debian packages to be incorrectly marked as 'prerelease'. ([\#10500](https://github.com/matrix-org/synapse/issues/10500))
Synapse 1.39.0 (2021-07-29)
===========================
No significant changes.
Synapse 1.39.0rc3 (2021-07-28)
==============================
Bugfixes
--------
- Fix a bug introduced in Synapse 1.38 which caused an exception at startup when SAML authentication was enabled. ([\#10477](https://github.com/matrix-org/synapse/issues/10477))
- Fix a long-standing bug where Synapse would not inform clients that a device had exhausted its one-time-key pool, potentially causing problems decrypting events. ([\#10485](https://github.com/matrix-org/synapse/issues/10485))
- Fix reporting old R30 stats as R30v2 stats. Introduced in v1.39.0rc1. ([\#10486](https://github.com/matrix-org/synapse/issues/10486))
Internal Changes
----------------
- Fix an error which prevented the Github Actions workflow to build the docker images from running. ([\#10461](https://github.com/matrix-org/synapse/issues/10461))
- Fix release script to correctly version debian changelog when doing RCs. ([\#10465](https://github.com/matrix-org/synapse/issues/10465))
Synapse 1.39.0rc2 (2021-07-22)
==============================
This release also includes the changes in v1.38.1.
Bugfixes
--------
- Always include `device_one_time_keys_count` key in `/sync` response to work around a bug in Element Android that broke encryption for new devices. ([\#10457](https://github.com/matrix-org/synapse/issues/10457))
Internal Changes
+2 -39
View File
@@ -155,7 +155,7 @@ source ./env/bin/activate
./scripts-dev/lint.sh path/to/file1.py path/to/file2.py path/to/folder
```
## Run the unit tests (Twisted trial).
## Run the unit tests.
The unit tests run parts of Synapse, including your changes, to see if anything
was broken. They are slower than the linters but will typically catch more errors.
@@ -186,7 +186,7 @@ SYNAPSE_TEST_LOG_LEVEL=DEBUG trial tests
```
## Run the integration tests ([Sytest](https://github.com/matrix-org/sytest)).
## Run the integration tests.
The integration tests are a more comprehensive suite of tests. They
run a full version of Synapse, including your changes, to check if
@@ -203,43 +203,6 @@ $ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v
This configuration should generally cover your needs. For more details about other configurations, see [documentation in the SyTest repo](https://github.com/matrix-org/sytest/blob/develop/docker/README.md).
## Run the integration tests ([Complement](https://github.com/matrix-org/complement)).
[Complement](https://github.com/matrix-org/complement) is a suite of black box tests that can be run on any homeserver implementation. It can also be thought of as end-to-end (e2e) tests.
It's often nice to develop on Synapse and write Complement tests at the same time.
Here is how to run your local Synapse checkout against your local Complement checkout.
(checkout [`complement`](https://github.com/matrix-org/complement) alongside your `synapse` checkout)
```sh
COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh
```
To run a specific test file, you can pass the test name at the end of the command. The name passed comes from the naming structure in your Complement tests. If you're unsure of the name, you can do a full run and copy it from the test output:
```sh
COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh TestBackfillingHistory
```
To run a specific test, you can specify the whole name structure:
```sh
COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh TestBackfillingHistory/parallel/Backfilled_historical_events_resolve_with_proper_state_in_correct_order
```
### Access database for homeserver after Complement test runs.
If you're curious what the database looks like after you run some tests, here are some steps to get you going in Synapse:
1. In your Complement test comment out `defer deployment.Destroy(t)` and replace with `defer time.Sleep(2 * time.Hour)` to keep the homeserver running after the tests complete
1. Start the Complement tests
1. Find the name of the container, `docker ps -f name=complement_` (this will filter for just the Compelement related Docker containers)
1. Access the container replacing the name with what you found in the previous step: `docker exec -it complement_1_hs_with_application_service.hs1_2 /bin/bash`
1. Install sqlite (database driver), `apt-get update && apt-get install -y sqlite3`
1. Then run `sqlite3` and open the database `.open /conf/homeserver.db` (this db path comes from the Synapse homeserver.yaml)
# 9. Submit your patch.
Once you're happy with your patch, it's time to prepare a Pull Request.
+1
View File
@@ -0,0 +1 @@
Update support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083) to consider changes in the MSC around which servers can issue join events.
+1
View File
@@ -0,0 +1 @@
Initial support for MSC3244, Room version capabilities over the /capabilities API.
+1
View File
@@ -0,0 +1 @@
Add a buffered logging handler which periodically flushes itself.
+1
View File
@@ -0,0 +1 @@
Add type hints to `synapse.federation.transport.client` module.
+1
View File
@@ -0,0 +1 @@
Email notifications now state whether an invitation is to a room or a space.
+1
View File
@@ -0,0 +1 @@
Drop backwards-compatibility code that was required to support Ubuntu Xenial.
+1
View File
@@ -0,0 +1 @@
Use a docker image cache for the prerequisites for the debian package build.
+1
View File
@@ -0,0 +1 @@
Connect historical chunks together with chunk events instead of a content field (MSC2716).
+1
View File
@@ -0,0 +1 @@
Improve servlet type hints.
+1
View File
@@ -0,0 +1 @@
Improve servlet type hints.
+1
View File
@@ -0,0 +1 @@
Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.
+1
View File
@@ -0,0 +1 @@
Update the `tests-done` Github Actions status.
+1
View File
@@ -0,0 +1 @@
Fix hierarchy of providers on the OpenID page.
+1
View File
@@ -0,0 +1 @@
Update type annotations to work with forthcoming Twisted 21.7.0 release.
+1
View File
@@ -0,0 +1 @@
Add `creation_ts` to list users admin API.
+1
View File
@@ -0,0 +1 @@
Cancel redundant GHA workflows when a new commit is pushed.
+1
View File
@@ -0,0 +1 @@
Consolidate development documentation to `docs/development/`.
+1
View File
@@ -0,0 +1 @@
Fix an error which prevented the Github Actions workflow to build the docker images from running.
+1
View File
@@ -0,0 +1 @@
Disable `msc2716` Complement tests until Complement updates are merged.
+1
View File
@@ -0,0 +1 @@
Mitigate media repo XSS attacks on IE11 via the non-standard X-Content-Security-Policy header.
+1
View File
@@ -0,0 +1 @@
Additional type hints in the state handler.
+1
View File
@@ -0,0 +1 @@
Update syntax used to run complement tests.
+1
View File
@@ -0,0 +1 @@
Add support for [MSC2033](https://github.com/matrix-org/matrix-doc/pull/2033): `device_id` on `/account/whoami`.
+2 -24
View File
@@ -1,30 +1,8 @@
matrix-synapse-py3 (1.40.0~rc2) stable; urgency=medium
matrix-synapse-py3 (1.39.0ubuntu1) UNRELEASED; urgency=medium
* New synapse release 1.40.0~rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 04 Aug 2021 17:08:55 +0100
matrix-synapse-py3 (1.40.0~rc1) stable; urgency=medium
[ Richard van der Hoff ]
* Drop backwards-compatibility code that was required to support Ubuntu Xenial.
[ Synapse Packaging team ]
* New synapse release 1.40.0~rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 03 Aug 2021 11:31:49 +0100
matrix-synapse-py3 (1.39.0) stable; urgency=medium
* New synapse release 1.39.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 29 Jul 2021 09:59:00 +0100
matrix-synapse-py3 (1.39.0~rc3) stable; urgency=medium
* New synapse release 1.39.0~rc3.
-- Synapse Packaging team <packages@matrix.org> Wed, 28 Jul 2021 13:30:58 +0100
-- Richard van der Hoff <richard@matrix.org> Tue, 20 Jul 2021 00:10:03 +0100
matrix-synapse-py3 (1.38.1) stable; urgency=medium
+5 -4
View File
@@ -11,6 +11,10 @@ DIST=`cut -d ':' -f2 <<< $distro`
cp -aT /synapse/source /synapse/build
cd /synapse/build
# add an entry to the changelog for this distribution
dch -M -l "+$DIST" "build for $DIST"
dch -M -r "" --force-distribution --distribution "$DIST"
# if this is a prerelease, set the Section accordingly.
#
# When the package is later added to the package repo, reprepro will use the
@@ -19,14 +23,11 @@ cd /synapse/build
DEB_VERSION=`dpkg-parsechangelog -SVersion`
case $DEB_VERSION in
*~rc*|*~a*|*~b*|*~c*)
*rc*|*a*|*b*|*c*)
sed -ie '/^Section:/c\Section: prerelease' debian/control
;;
esac
# add an entry to the changelog for this distribution
dch -M -l "+$DIST" "build for $DIST"
dch -M -r "" --force-distribution --distribution "$DIST"
dpkg-buildpackage -us -uc
-1
View File
@@ -79,7 +79,6 @@
- [Single Sign-On]()
- [SAML](development/saml.md)
- [CAS](development/cas.md)
- [Room DAG concepts](development/room-dag-concepts.md)
- [State Resolution]()
- [The Auth Chain Difference Algorithm](auth_chain_difference_algorithm.md)
- [Media Repository](media_repository.md)
-79
View File
@@ -1,79 +0,0 @@
# Room DAG concepts
## Edges
The word "edge" comes from graph theory lingo. An edge is just a connection
between two events. In Synapse, we connect events by specifying their
`prev_events`. A subsequent event points back at a previous event.
```
A (oldest) <---- B <---- C (most recent)
```
## Depth and stream ordering
Events are normally sorted by `(topological_ordering, stream_ordering)` where
`topological_ordering` is just `depth`. In other words, we first sort by `depth`
and then tie-break based on `stream_ordering`. `depth` is incremented as new
messages are added to the DAG. Normally, `stream_ordering` is an auto
incrementing integer, but backfilled events start with `stream_ordering=-1` and decrement.
---
- `/sync` returns things in the order they arrive at the server (`stream_ordering`).
- `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
The general idea is that, if you're following a room in real-time (i.e.
`/sync`), you probably want to see the messages as they arrive at your server,
rather than skipping any that arrived late; whereas if you're looking at a
historical section of timeline (i.e. `/messages`), you want to see the best
representation of the state of the room as others were seeing it at the time.
## Forward extremity
Most-recent-in-time events in the DAG which are not referenced by any other events' `prev_events` yet.
The forward extremities of a room are used as the `prev_events` when the next event is sent.
## Backwards extremity
The current marker of where we have backfilled up to and will generally be the
oldest-in-time events we know of in the DAG.
This is an event where we haven't fetched all of the `prev_events` for.
Once we have fetched all of its `prev_events`, it's unmarked as a backwards
extremity (although we may have formed new backwards extremities from the prev
events during the backfilling process).
## Outliers
We mark an event as an `outlier` when we haven't figured out the state for the
room at that point in the DAG yet.
We won't *necessarily* have the `prev_events` of an `outlier` in the database,
but it's entirely possible that we *might*. The status of whether we have all of
the `prev_events` is marked as a [backwards extremity](#backwards-extremity).
For example, when we fetch the event auth chain or state for a given event, we
mark all of those claimed auth events as outliers because we haven't done the
state calculation ourself.
## State groups
For every non-outlier event we need to know the state at that event. Instead of
storing the full state for each event in the DB (i.e. a `event_id -> state`
mapping), which is *very* space inefficient when state doesn't change, we
instead assign each different set of state a "state group" and then have
mappings of `event_id -> state_group` and `state_group -> state`.
### Stage group edges
TODO: `state_group_edges` is a further optimization...
notes from @Azrenbeth, https://pastebin.com/seUGVGeT
-4
View File
@@ -720,9 +720,6 @@ caches:
# 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or
# 'psycopg2' (for PostgreSQL).
#
# 'txn_limit' gives the maximum number of transactions to run per connection
# before reconnecting. Defaults to 0, which means no limit.
#
# 'args' gives options which are passed through to the database engine,
# except for options starting 'cp_', which are used to configure the Twisted
# connection pool. For a reference to valid arguments, see:
@@ -743,7 +740,6 @@ caches:
#
#database:
# name: psycopg2
# txn_limit: 10000
# args:
# user: synapse_user
# password: secretpassword
+4 -1
View File
@@ -28,7 +28,7 @@ handlers:
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
# logs will still be flushed immediately.
buffer:
class: logging.handlers.MemoryHandler
class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
target: file
# The capacity is the number of log lines that are buffered before
# being written to disk. Increasing this will lead to better
@@ -36,6 +36,9 @@ handlers:
# be written to disk.
capacity: 10
flushLevel: 30 # Flush for WARNING logs as well
# The period of time, in seconds, between forced flushes.
# Messages will not be delayed for longer than this time.
period: 5
# A handler that writes logs to stderr. Unused by default, but can be used
# instead of "buffer" and "file" in the logger handlers.
+23 -8
View File
@@ -319,11 +319,24 @@ effects of bursts of events from that bridge on events sent by normal users.
#### Stream writers
Additionally, there is *experimental* support for moving writing of specific
streams (such as events) off of the main process to a particular worker. (This
is only supported with Redis-based replication.)
Additionally, there is support for moving writing of specific streams (such as
events) off of the main process to a particular worker. (This is only supported
with Redis-based replication.)
Currently supported streams are `events` and `typing`.
Currently supported streams are, and which endpoints **must** be routed to them:
* `events`
* `typing`:
* `^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/typing`
* `to_device`:
`^/_matrix/client/(api/v1|r0|unstable)/sendToDevice/`
`^/_matrix/client/(api/v1|r0|unstable)/keys/claim`
`^/_matrix/client/(api/v1|r0|unstable)/room_keys`
* `account_data`
* `receipts`
* `presence`
To enable this, the worker must have a HTTP replication listener configured,
have a `worker_name` and be listed in the `instance_map` config. For example to
@@ -340,10 +353,10 @@ stream_writers:
events: event_persister1
```
The `events` stream also experimentally supports having multiple writers, where
work is sharded between them by room ID. Note that you *must* restart all worker
instances when adding or removing event persisters. An example `stream_writers`
configuration with multiple writers:
The `events` stream also supports having multiple writers, where work is sharded
between them by room ID. Note that you *must* restart all worker instances when
adding or removing event persisters. An example `stream_writers` configuration
with multiple writers:
```yaml
stream_writers:
@@ -352,6 +365,8 @@ stream_writers:
- event_persister2
```
All other streams currently only support having a single writer.
#### Background tasks
There is also *experimental* support for moving background tasks to a separate
+42 -295
View File
@@ -14,57 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""An interactive script for doing a release. See `cli()` below.
"""An interactive script for doing a release. See `run()` below.
"""
import re
import subprocess
import sys
import urllib.request
from os import path
from tempfile import TemporaryDirectory
from typing import List, Optional, Tuple
from typing import Optional
import attr
import click
import commonmark
import git
import redbaron
from click.exceptions import ClickException
from github import Github
from packaging import version
from redbaron import RedBaron
@click.group()
def cli():
"""An interactive script to walk through the parts of creating a release.
@click.command()
def run():
"""An interactive script to walk through the initial stages of creating a
release, including creating release branch, updating changelog and pushing to
GitHub.
Requires the dev dependencies be installed, which can be done via:
pip install -e .[dev]
Then to use:
./scripts-dev/release.py prepare
# ... ask others to look at the changelog ...
./scripts-dev/release.py tag
# ... wait for asssets to build ...
./scripts-dev/release.py publish
./scripts-dev/release.py upload
If the env var GH_TOKEN (or GITHUB_TOKEN) is set, or passed into the
`tag`/`publish` command, then a new draft release will be created/published.
"""
@cli.command()
def prepare():
"""Do the initial stages of creating a release, including creating release
branch, updating changelog and pushing to GitHub.
"""
# Make sure we're in a git repo.
@@ -79,8 +51,32 @@ def prepare():
click.secho("Updating git repo...")
repo.remote().fetch()
# Get the current version and AST from root Synapse module.
current_version, parsed_synapse_ast, version_node = parse_version_from_module()
# Parse the AST and load the `__version__` node so that we can edit it
# later.
with open("synapse/__init__.py") as f:
red = RedBaron(f.read())
version_node = None
for node in red:
if node.type != "assignment":
continue
if node.target.type != "name":
continue
if node.target.value != "__version__":
continue
version_node = node
break
if not version_node:
print("Failed to find '__version__' definition in synapse/__init__.py")
sys.exit(1)
# Parse the current version.
current_version = version.parse(version_node.value.value.strip('"'))
assert isinstance(current_version, version.Version)
# Figure out what sort of release we're doing and calcuate the new version.
rc = click.confirm("RC", default=True)
@@ -143,11 +139,6 @@ def prepare():
# Switch to the release branch.
parsed_new_version = version.parse(new_version)
# We assume for debian changelogs that we only do RCs or full releases.
assert not parsed_new_version.is_devrelease
assert not parsed_new_version.is_postrelease
release_branch_name = (
f"release-v{parsed_new_version.major}.{parsed_new_version.minor}"
)
@@ -194,26 +185,17 @@ def prepare():
# Update the `__version__` variable and write it back to the file.
version_node.value = '"' + new_version + '"'
with open("synapse/__init__.py", "w") as f:
f.write(parsed_synapse_ast.dumps())
f.write(red.dumps())
# Generate changelogs
subprocess.run("python3 -m towncrier", shell=True)
# Generate debian changelogs
if parsed_new_version.pre is not None:
# If this is an RC then we need to coerce the version string to match
# Debian norms, e.g. 1.39.0rc2 gets converted to 1.39.0~rc2.
base_ver = parsed_new_version.base_version
pre_type, pre_num = parsed_new_version.pre
debian_version = f"{base_ver}~{pre_type}{pre_num}"
else:
debian_version = new_version
subprocess.run(
f'dch -M -v {debian_version} "New synapse release {debian_version}."',
shell=True,
)
subprocess.run('dch -M -r -D stable ""', shell=True)
# Generate debian changelogs if its not an RC.
if not rc:
subprocess.run(
f'dch -M -v {new_version} "New synapse release {new_version}."', shell=True
)
subprocess.run('dch -M -r -D stable ""', shell=True)
# Show the user the changes and ask if they want to edit the change log.
repo.git.add("-u")
@@ -244,180 +226,6 @@ def prepare():
)
@cli.command()
@click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"])
def tag(gh_token: Optional[str]):
"""Tags the release and generates a draft GitHub release"""
# Make sure we're in a git repo.
try:
repo = git.Repo()
except git.InvalidGitRepositoryError:
raise click.ClickException("Not in Synapse repo.")
if repo.is_dirty():
raise click.ClickException("Uncommitted changes exist.")
click.secho("Updating git repo...")
repo.remote().fetch()
# Find out the version and tag name.
current_version, _, _ = parse_version_from_module()
tag_name = f"v{current_version}"
# Check we haven't released this version.
if tag_name in repo.tags:
raise click.ClickException(f"Tag {tag_name} already exists!\n")
# Get the appropriate changelogs and tag.
changes = get_changes_for_version(current_version)
click.echo_via_pager(changes)
if click.confirm("Edit text?", default=False):
changes = click.edit(changes, require_save=False)
repo.create_tag(tag_name, message=changes)
if not click.confirm("Push tag to GitHub?", default=True):
print("")
print("Run when ready to push:")
print("")
print(f"\tgit push {repo.remote().name} tag {current_version}")
print("")
return
repo.git.push(repo.remote().name, "tag", tag_name)
# If no token was given, we bail here
if not gh_token:
click.launch(f"https://github.com/matrix-org/synapse/releases/edit/{tag_name}")
return
# Create a new draft release
gh = Github(gh_token)
gh_repo = gh.get_repo("matrix-org/synapse")
release = gh_repo.create_git_release(
tag=tag_name,
name=tag_name,
message=changes,
draft=True,
prerelease=current_version.is_prerelease,
)
# Open the release and the actions where we are building the assets.
click.launch(release.html_url)
click.launch(
f"https://github.com/matrix-org/synapse/actions?query=branch%3A{tag_name}"
)
click.echo("Wait for release assets to be built")
@cli.command()
@click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"], required=True)
def publish(gh_token: str):
"""Publish release."""
# Make sure we're in a git repo.
try:
repo = git.Repo()
except git.InvalidGitRepositoryError:
raise click.ClickException("Not in Synapse repo.")
if repo.is_dirty():
raise click.ClickException("Uncommitted changes exist.")
current_version, _, _ = parse_version_from_module()
tag_name = f"v{current_version}"
if not click.confirm(f"Publish {tag_name}?", default=True):
return
# Publish the draft release
gh = Github(gh_token)
gh_repo = gh.get_repo("matrix-org/synapse")
for release in gh_repo.get_releases():
if release.title == tag_name:
break
else:
raise ClickException(f"Failed to find GitHub release for {tag_name}")
assert release.title == tag_name
if not release.draft:
click.echo("Release already published.")
return
release = release.update_release(
name=release.title,
message=release.body,
tag_name=release.tag_name,
prerelease=release.prerelease,
draft=False,
)
@cli.command()
def upload():
"""Upload release to pypi."""
current_version, _, _ = parse_version_from_module()
tag_name = f"v{current_version}"
pypi_asset_names = [
f"matrix_synapse-{current_version}-py3-none-any.whl",
f"matrix-synapse-{current_version}.tar.gz",
]
with TemporaryDirectory(prefix=f"synapse_upload_{tag_name}_") as tmpdir:
for name in pypi_asset_names:
filename = path.join(tmpdir, name)
url = f"https://github.com/matrix-org/synapse/releases/download/{tag_name}/{name}"
click.echo(f"Downloading {name} into {filename}")
urllib.request.urlretrieve(url, filename=filename)
if click.confirm("Upload to PyPI?", default=True):
subprocess.run("twine upload *", shell=True, cwd=tmpdir)
click.echo(
f"Done! Remember to merge the tag {tag_name} into the appropriate branches"
)
def parse_version_from_module() -> Tuple[
version.Version, redbaron.RedBaron, redbaron.Node
]:
# Parse the AST and load the `__version__` node so that we can edit it
# later.
with open("synapse/__init__.py") as f:
red = redbaron.RedBaron(f.read())
version_node = None
for node in red:
if node.type != "assignment":
continue
if node.target.type != "name":
continue
if node.target.value != "__version__":
continue
version_node = node
break
if not version_node:
print("Failed to find '__version__' definition in synapse/__init__.py")
sys.exit(1)
# Parse the current version.
current_version = version.parse(version_node.value.value.strip('"'))
assert isinstance(current_version, version.Version)
return current_version, red, version_node
def find_ref(repo: git.Repo, ref_name: str) -> Optional[git.HEAD]:
"""Find the branch/ref, looking first locally then in the remote."""
if ref_name in repo.refs:
@@ -434,66 +242,5 @@ def update_branch(repo: git.Repo):
repo.git.merge(repo.active_branch.tracking_branch().name)
def get_changes_for_version(wanted_version: version.Version) -> str:
"""Get the changelogs for the given version.
If an RC then will only get the changelog for that RC version, otherwise if
its a full release will get the changelog for the release and all its RCs.
"""
with open("CHANGES.md") as f:
changes = f.read()
# First we parse the changelog so that we can split it into sections based
# on the release headings.
ast = commonmark.Parser().parse(changes)
@attr.s(auto_attribs=True)
class VersionSection:
title: str
# These are 0-based.
start_line: int
end_line: Optional[int] = None # Is none if its the last entry
headings: List[VersionSection] = []
for node, _ in ast.walker():
# We look for all text nodes that are in a level 1 heading.
if node.t != "text":
continue
if node.parent.t != "heading" or node.parent.level != 1:
continue
# If we have a previous heading then we update its `end_line`.
if headings:
headings[-1].end_line = node.parent.sourcepos[0][0] - 1
headings.append(VersionSection(node.literal, node.parent.sourcepos[0][0] - 1))
changes_by_line = changes.split("\n")
version_changelog = [] # The lines we want to include in the changelog
# Go through each section and find any that match the requested version.
regex = re.compile(r"^Synapse v?(\S+)")
for section in headings:
groups = regex.match(section.title)
if not groups:
continue
heading_version = version.parse(groups.group(1))
heading_base_version = version.parse(heading_version.base_version)
# Check if heading version matches the requested version, or if its an
# RC of the requested version.
if wanted_version not in (heading_version, heading_base_version):
continue
version_changelog.extend(changes_by_line[section.start_line : section.end_line])
return "\n".join(version_changelog)
if __name__ == "__main__":
cli()
run()
-2
View File
@@ -108,8 +108,6 @@ CONDITIONAL_REQUIREMENTS["dev"] = CONDITIONAL_REQUIREMENTS["lint"] + [
"click==7.1.2",
"redbaron==0.9.2",
"GitPython==3.1.14",
"commonmark==0.9.1",
"pygithub==1.55",
]
CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.13"]
+1 -1
View File
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.40.0rc2"
__version__ = "1.39.0rc2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
+3 -12
View File
@@ -128,14 +128,6 @@ class ToDeviceEventTypes:
RoomKeyRequest = "m.room_key_request"
class DeviceKeyAlgorithms:
"""Spec'd algorithms for the generation of per-device keys"""
ED25519 = "ed25519"
CURVE25519 = "curve25519"
SIGNED_CURVE25519 = "signed_curve25519"
class EduTypes:
Presence = "m.presence"
@@ -206,6 +198,9 @@ class EventContentFields:
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
# For "marker" events
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
MSC2716_MARKER_INSERTION_PREV_EVENTS = (
"org.matrix.msc2716.marker.insertion_prev_events"
)
class RoomTypes:
@@ -229,7 +224,3 @@ class HistoryVisibility:
JOINED = "joined"
SHARED = "shared"
WORLD_READABLE = "world_readable"
class ReadReceiptEventFields:
MSC2285_HIDDEN = "org.matrix.msc2285.hidden"
-27
View File
@@ -73,9 +73,6 @@ class RoomVersion:
# MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
# m.room.membership event with membership 'knock'.
msc2403_knocking = attr.ib(type=bool)
# MSC2716: Adds m.room.power_levels -> content.historical field to control
# whether "insertion", "chunk", "marker" events can be sent
msc2716_historical = attr.ib(type=bool)
class RoomVersions:
@@ -91,7 +88,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V2 = RoomVersion(
"2",
@@ -105,7 +101,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V3 = RoomVersion(
"3",
@@ -119,7 +114,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V4 = RoomVersion(
"4",
@@ -133,7 +127,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V5 = RoomVersion(
"5",
@@ -147,7 +140,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V6 = RoomVersion(
"6",
@@ -161,7 +153,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
MSC2176 = RoomVersion(
"org.matrix.msc2176",
@@ -175,7 +166,6 @@ class RoomVersions:
msc2176_redaction_rules=True,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
MSC3083 = RoomVersion(
"org.matrix.msc3083.v2",
@@ -189,7 +179,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=True,
msc2403_knocking=False,
msc2716_historical=False,
)
V7 = RoomVersion(
"7",
@@ -203,21 +192,6 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=True,
msc2716_historical=False,
)
MSC2716 = RoomVersion(
"org.matrix.msc2716",
RoomDisposition.STABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=True,
msc2716_historical=True,
)
@@ -233,7 +207,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
RoomVersions.MSC2176,
RoomVersions.MSC3083,
RoomVersions.V7,
RoomVersions.MSC2716,
)
}
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1 -1
View File
@@ -109,7 +109,7 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process):
for name, count in r30_results.items():
stats["r30_users_" + name] = count
r30v2_results = await store.count_r30v2_users()
r30v2_results = await store.count_r30_users()
for name, count in r30v2_results.items():
stats["r30v2_users_" + name] = count
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
-4
View File
@@ -33,9 +33,6 @@ DEFAULT_CONFIG = """\
# 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or
# 'psycopg2' (for PostgreSQL).
#
# 'txn_limit' gives the maximum number of transactions to run per connection
# before reconnecting. Defaults to 0, which means no limit.
#
# 'args' gives options which are passed through to the database engine,
# except for options starting 'cp_', which are used to configure the Twisted
# connection pool. For a reference to valid arguments, see:
@@ -56,7 +53,6 @@ DEFAULT_CONFIG = """\
#
#database:
# name: psycopg2
# txn_limit: 10000
# args:
# user: synapse_user
# password: secretpassword
-3
View File
@@ -33,8 +33,5 @@ class ExperimentalConfig(Config):
# MSC2716 (backfill existing history)
self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False)
# MSC2285 (hidden read receipts)
self.msc2285_enabled: bool = experimental.get("msc2285_enabled", False)
# MSC3244 (room version capabilities)
self.msc3244_enabled: bool = experimental.get("msc3244_enabled", False)
+4 -1
View File
@@ -71,7 +71,7 @@ handlers:
# will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
# logs will still be flushed immediately.
buffer:
class: logging.handlers.MemoryHandler
class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
target: file
# The capacity is the number of log lines that are buffered before
# being written to disk. Increasing this will lead to better
@@ -79,6 +79,9 @@ handlers:
# be written to disk.
capacity: 10
flushLevel: 30 # Flush for WARNING logs as well
# The period of time, in seconds, between forced flushes.
# Messages will not be delayed for longer than this time.
period: 5
# A handler that writes logs to stderr. Unused by default, but can be used
# instead of "buffer" and "file" in the logger handlers.
+1 -39
View File
@@ -205,13 +205,6 @@ def check(
if event.type == EventTypes.Redaction:
check_redaction(room_version_obj, event, auth_events)
if (
event.type == EventTypes.MSC2716_INSERTION
or event.type == EventTypes.MSC2716_CHUNK
or event.type == EventTypes.MSC2716_MARKER
):
check_historical(room_version_obj, event, auth_events)
logger.debug("Allowing! %s", event)
@@ -546,37 +539,6 @@ def check_redaction(
raise AuthError(403, "You don't have permission to redact events")
def check_historical(
room_version_obj: RoomVersion,
event: EventBase,
auth_events: StateMap[EventBase],
) -> None:
"""Check whether the event sender is allowed to send historical related
events like "insertion", "chunk", and "marker".
Returns:
None
Raises:
AuthError if the event sender is not allowed to send historical related events
("insertion", "chunk", and "marker").
"""
# Ignore the auth checks in room versions that do not support historical
# events
if not room_version_obj.msc2716_historical:
return
user_level = get_user_power_level(event.user_id, auth_events)
historical_level = get_named_level(auth_events, "historical", 100)
if user_level < historical_level:
raise AuthError(
403,
'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
)
def _check_power_levels(
room_version_obj: RoomVersion,
event: EventBase,
@@ -692,7 +654,7 @@ def get_user_power_level(user_id: str, auth_events: StateMap[EventBase]) -> int:
power_level_event = get_power_level_event(auth_events)
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
if level is None:
if not level:
level = power_level_event.content.get("users_default", 0)
if level is None:
-5
View File
@@ -109,8 +109,6 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
add_fields("creator")
elif event_type == EventTypes.JoinRules:
add_fields("join_rule")
if room_version.msc3083_join_rules:
add_fields("allow")
elif event_type == EventTypes.PowerLevels:
add_fields(
"users",
@@ -126,9 +124,6 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
if room_version.msc2176_redaction_rules:
add_fields("invite")
if room_version.msc2716_historical:
add_fields("historical")
elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
add_fields("aliases")
elif event_type == EventTypes.RoomHistoryVisibility:
+4 -39
View File
@@ -22,7 +22,6 @@ from typing import (
Awaitable,
Callable,
Collection,
Container,
Dict,
Iterable,
List,
@@ -514,7 +513,6 @@ class FederationClient(FederationBase):
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
failover_errcodes: Optional[Container[str]] = None,
failover_on_unknown_endpoint: bool = False,
) -> T:
"""Try an operation on a series of servers, until it succeeds
@@ -535,9 +533,6 @@ class FederationClient(FederationBase):
next server tried. Normally the stacktrace is logged but this is
suppressed if the exception is an InvalidResponseError.
failover_errcodes: Error codes (specific to this endpoint) which should
cause a failover when received as part of an HTTP 400 error.
failover_on_unknown_endpoint: if True, we will try other servers if it looks
like a server doesn't support the endpoint. This is typically useful
if the endpoint in question is new or experimental.
@@ -549,9 +544,6 @@ class FederationClient(FederationBase):
SynapseError if the chosen remote server returns a 300/400 code, or
no servers were reachable.
"""
if failover_errcodes is None:
failover_errcodes = ()
for destination in destinations:
if destination == self.server_name:
continue
@@ -566,17 +558,11 @@ class FederationClient(FederationBase):
synapse_error = e.to_synapse_error()
failover = False
# Failover should occur:
#
# * On internal server errors.
# * If the destination responds that it cannot complete the request.
# * If the destination doesn't implemented the endpoint for some reason.
# Failover on an internal server error, or if the destination
# doesn't implemented the endpoint for some reason.
if 500 <= e.code < 600:
failover = True
elif e.code == 400 and synapse_error.errcode in failover_errcodes:
failover = True
elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
e, synapse_error
):
@@ -692,20 +678,8 @@ class FederationClient(FederationBase):
return destination, ev, room_version
# MSC3083 defines additional error codes for room joins. Unfortunately
# we do not yet know the room version, assume these will only be returned
# by valid room versions.
failover_errcodes = (
(Codes.UNABLE_AUTHORISE_JOIN, Codes.UNABLE_TO_GRANT_JOIN)
if membership == Membership.JOIN
else None
)
return await self._try_destination_list(
"make_" + membership,
destinations,
send_request,
failover_errcodes=failover_errcodes,
"make_" + membership, destinations, send_request
)
async def send_join(
@@ -844,14 +818,7 @@ class FederationClient(FederationBase):
origin=destination,
)
# MSC3083 defines additional error codes for room joins.
failover_errcodes = None
if room_version.msc3083_join_rules:
failover_errcodes = (
Codes.UNABLE_AUTHORISE_JOIN,
Codes.UNABLE_TO_GRANT_JOIN,
)
# If the join is being authorised via allow rules, we need to send
# the /send_join back to the same server that was originally used
# with /make_join.
@@ -860,9 +827,7 @@ class FederationClient(FederationBase):
get_domain_from_id(pdu.content["join_authorised_via_users_server"])
]
return await self._try_destination_list(
"send_join", destinations, send_request, failover_errcodes=failover_errcodes
)
return await self._try_destination_list("send_join", destinations, send_request)
async def _do_send_join(
self, room_version: RoomVersion, destination: str, pdu: EventBase
-17
View File
@@ -1024,23 +1024,6 @@ class FederationServer(FederationBase):
origin, event = next
# Prune the event queue if it's getting large.
#
# We do this *after* handling the first event as the common case is
# that the queue is empty (/has the single event in), and so there's
# no need to do this check.
pruned = await self.store.prune_staged_events_in_room(room_id, room_version)
if pruned:
# If we have pruned the queue check we need to refetch the next
# event to handle.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
break
origin, event = next
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
+2
View File
@@ -15,6 +15,8 @@
import logging
from typing import TYPE_CHECKING, Optional
import synapse.state
import synapse.storage
import synapse.types
from synapse.api.constants import EventTypes, Membership
from synapse.api.ratelimiting import Ratelimiter
+2 -4
View File
@@ -2748,11 +2748,9 @@ class FederationHandler(BaseHandler):
event.event_id,
e.event_id,
)
missing_auth_event_context = (
await self.state_handler.compute_event_context(e)
)
context = await self.state_handler.compute_event_context(e)
await self._auth_and_persist_event(
origin, e, missing_auth_event_context, auth_events=auth
origin, e, context, auth_events=auth
)
if e.event_id in event_auth_events:
+1 -6
View File
@@ -21,7 +21,6 @@ from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
@@ -135,8 +134,6 @@ class InitialSyncHandler(BaseHandler):
joined_rooms,
to_key=int(now_token.receipt_key),
)
if self.hs.config.experimental.msc2285_enabled:
receipt = ReceiptEventSource.filter_out_hidden(receipt, user_id)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -433,9 +430,7 @@ class InitialSyncHandler(BaseHandler):
room_id, to_key=now_token.receipt_key
)
if not receipts:
return []
if self.hs.config.experimental.msc2285_enabled:
receipts = ReceiptEventSource.filter_out_hidden(receipts, user_id)
receipts = []
return receipts
presence, receipts, (messages, token) = await make_deferred_yieldable(
+5 -53
View File
@@ -14,10 +14,9 @@
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
from synapse.api.constants import ReadReceiptEventFields
from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -138,7 +137,7 @@ class ReceiptsHandler(BaseHandler):
return True
async def received_client_receipt(
self, room_id: str, receipt_type: str, user_id: str, event_id: str, hidden: bool
self, room_id: str, receipt_type: str, user_id: str, event_id: str
) -> None:
"""Called when a client tells us a local user has read up to the given
event_id in the room.
@@ -148,67 +147,23 @@ class ReceiptsHandler(BaseHandler):
receipt_type=receipt_type,
user_id=user_id,
event_ids=[event_id],
data={"ts": int(self.clock.time_msec()), "hidden": hidden},
data={"ts": int(self.clock.time_msec())},
)
is_new = await self._handle_new_receipts([receipt])
if not is_new:
return
if self.federation_sender and not (
self.hs.config.experimental.msc2285_enabled and hidden
):
if self.federation_sender:
await self.federation_sender.send_read_receipt(receipt)
class ReceiptEventSource:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.config = hs.config
@staticmethod
def filter_out_hidden(events: List[JsonDict], user_id: str) -> List[JsonDict]:
visible_events = []
# filter out hidden receipts the user shouldn't see
for event in events:
content = event.get("content", {})
new_event = event.copy()
new_event["content"] = {}
for event_id in content.keys():
event_content = content.get(event_id, {})
m_read = event_content.get("m.read", {})
# If m_read is missing copy over the original event_content as there is nothing to process here
if not m_read:
new_event["content"][event_id] = event_content.copy()
continue
new_users = {}
for rr_user_id, user_rr in m_read.items():
hidden = user_rr.get("hidden", None)
if hidden is not True or rr_user_id == user_id:
new_users[rr_user_id] = user_rr.copy()
# If hidden has a value replace hidden with the correct prefixed key
if hidden is not None:
new_users[rr_user_id].pop("hidden")
new_users[rr_user_id][
ReadReceiptEventFields.MSC2285_HIDDEN
] = hidden
# Set new users unless empty
if len(new_users.keys()) > 0:
new_event["content"][event_id] = {"m.read": new_users}
# Append new_event to visible_events unless empty
if len(new_event["content"].keys()) > 0:
visible_events.append(new_event)
return visible_events
async def get_new_events(
self, from_key: int, room_ids: List[str], user: UserID, **kwargs
self, from_key: int, room_ids: List[str], **kwargs
) -> Tuple[List[JsonDict], int]:
from_key = int(from_key)
to_key = self.get_current_key()
@@ -220,9 +175,6 @@ class ReceiptEventSource:
room_ids, from_key=from_key, to_key=to_key
)
if self.config.experimental.msc2285_enabled:
events = ReceiptEventSource.filter_out_hidden(events, user.to_string())
return (events, to_key)
async def get_new_events_as(
-1
View File
@@ -951,7 +951,6 @@ class RoomCreationHandler(BaseHandler):
"kick": 50,
"redact": 50,
"invite": 50,
"historical": 100,
}
if config["original_invitees_have_ops"]:
-4
View File
@@ -1093,10 +1093,6 @@ class SyncHandler:
one_time_key_counts: JsonDict = {}
unused_fallback_key_types: List[str] = []
if device_id:
# TODO: We should have a way to let clients differentiate between the states of:
# * no change in OTK count since the provided since token
# * the server has zero OTKs left for this device
# Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
+28 -1
View File
@@ -22,6 +22,7 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.http.typing import ReplicationTypingRestServlet
from synapse.replication.tcp.streams import TypingStream
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -61,7 +62,9 @@ class FollowerTypingHandler:
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
if hs.config.worker.writers.typing != hs.get_instance_name():
self._typing_repl_client = ReplicationTypingRestServlet.make_client(hs)
self._typing_worker = hs.config.worker.writers.typing
if self._typing_worker != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(
"m.typing",
hs.config.worker.writers.typing,
@@ -199,6 +202,30 @@ class FollowerTypingHandler:
def get_current_token(self) -> int:
return self._latest_room_serial
async def started_typing(
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
) -> None:
await self._typing_repl_client(
typing=True,
instance_name=self._typing_worker,
user_id=target_user.to_string(),
requester=requester,
room_id=room_id,
timeout=timeout,
)
async def stopped_typing(
self, target_user: UserID, requester: Requester, room_id: str
) -> None:
await self._typing_repl_client(
typing=True,
instance_name=self._typing_worker,
user_id=target_user.to_string(),
requester=requester,
room_id=room_id,
timeout=None,
)
class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
+2 -2
View File
@@ -847,7 +847,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
def read_body_with_max_size(
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
) -> "defer.Deferred[int]":
) -> defer.Deferred:
"""
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.
@@ -862,7 +862,7 @@ def read_body_with_max_size(
Returns:
A Deferred which resolves to the length of the read body.
"""
d: "defer.Deferred[int]" = defer.Deferred()
d = defer.Deferred()
# If the Content-Length header gives a size larger than the maximum allowed
# size, do not bother downloading the body.
+79 -113
View File
@@ -14,32 +14,21 @@
import base64
import logging
import re
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlparse
from urllib.request import ( # type: ignore[attr-defined]
getproxies_environment,
proxy_bypass_environment,
)
from typing import Optional, Tuple
from urllib.request import getproxies_environment, proxy_bypass_environment
import attr
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint
from twisted.python.failure import Failure
from twisted.web.client import (
URI,
BrowserLikePolicyForHTTPS,
HTTPConnectionPool,
_AgentBase,
)
from twisted.web.client import URI, BrowserLikePolicyForHTTPS, _AgentBase
from twisted.web.error import SchemeNotSupported
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS
from twisted.web.iweb import IAgent, IPolicyForHTTPS
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
@@ -74,38 +63,35 @@ class ProxyAgent(_AgentBase):
reactor might have some blacklisting applied (i.e. for DNS queries),
but we need unblocked access to the proxy.
contextFactory: A factory for TLS contexts, to control the
contextFactory (IPolicyForHTTPS): A factory for TLS contexts, to control the
verification parameters of OpenSSL. The default is to use a
`BrowserLikePolicyForHTTPS`, so unless you have special
requirements you can leave this as-is.
connectTimeout: The amount of time that this Agent will wait
connectTimeout (Optional[float]): The amount of time that this Agent will wait
for the peer to accept a connection, in seconds. If 'None',
HostnameEndpoint's default (30s) will be used.
This is used for connections to both proxies and destination servers.
bindAddress: The local address for client sockets to bind to.
bindAddress (bytes): The local address for client sockets to bind to.
pool: connection pool to be used. If None, a
pool (HTTPConnectionPool|None): connection pool to be used. If None, a
non-persistent pool instance will be created.
use_proxy: Whether proxy settings should be discovered and used
use_proxy (bool): Whether proxy settings should be discovered and used
from conventional environment variables.
Raises:
ValueError if use_proxy is set and the environment variables
contain an invalid proxy specification.
"""
def __init__(
self,
reactor: IReactorCore,
proxy_reactor: Optional[ISynapseReactor] = None,
reactor,
proxy_reactor=None,
contextFactory: Optional[IPolicyForHTTPS] = None,
connectTimeout: Optional[float] = None,
bindAddress: Optional[bytes] = None,
pool: Optional[HTTPConnectionPool] = None,
use_proxy: bool = False,
connectTimeout=None,
bindAddress=None,
pool=None,
use_proxy=False,
):
contextFactory = contextFactory or BrowserLikePolicyForHTTPS()
@@ -116,7 +102,7 @@ class ProxyAgent(_AgentBase):
else:
self.proxy_reactor = proxy_reactor
self._endpoint_kwargs: Dict[str, Any] = {}
self._endpoint_kwargs = {}
if connectTimeout is not None:
self._endpoint_kwargs["timeout"] = connectTimeout
if bindAddress is not None:
@@ -131,12 +117,16 @@ class ProxyAgent(_AgentBase):
https_proxy = proxies["https"].encode() if "https" in proxies else None
no_proxy = proxies["no"] if "no" in proxies else None
self.http_proxy_endpoint, self.http_proxy_creds = _http_proxy_endpoint(
http_proxy, self.proxy_reactor, contextFactory, **self._endpoint_kwargs
# Parse credentials from http and https proxy connection string if present
self.http_proxy_creds, http_proxy = parse_username_password(http_proxy)
self.https_proxy_creds, https_proxy = parse_username_password(https_proxy)
self.http_proxy_endpoint = _http_proxy_endpoint(
http_proxy, self.proxy_reactor, **self._endpoint_kwargs
)
self.https_proxy_endpoint, self.https_proxy_creds = _http_proxy_endpoint(
https_proxy, self.proxy_reactor, contextFactory, **self._endpoint_kwargs
self.https_proxy_endpoint = _http_proxy_endpoint(
https_proxy, self.proxy_reactor, **self._endpoint_kwargs
)
self.no_proxy = no_proxy
@@ -144,13 +134,7 @@ class ProxyAgent(_AgentBase):
self._policy_for_https = contextFactory
self._reactor = reactor
def request(
self,
method: bytes,
uri: bytes,
headers: Optional[Headers] = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> defer.Deferred:
def request(self, method, uri, headers=None, bodyProducer=None):
"""
Issue a request to the server indicated by the given uri.
@@ -162,15 +146,16 @@ class ProxyAgent(_AgentBase):
See also: twisted.web.iweb.IAgent.request
Args:
method: The request method to use, such as `GET`, `POST`, etc
method (bytes): The request method to use, such as `GET`, `POST`, etc
uri: The location of the resource to request.
uri (bytes): The location of the resource to request.
headers: Extra headers to send with the request
headers (Headers|None): Extra headers to send with the request
bodyProducer: An object which can generate bytes to make up the body of
this request (for example, the properly encoded contents of a file for
a file upload). Or, None if the request is to have no body.
bodyProducer (IBodyProducer|None): An object which can generate bytes to
make up the body of this request (for example, the properly encoded
contents of a file for a file upload). Or, None if the request is to
have no body.
Returns:
Deferred[IResponse]: completes when the header of the response has
@@ -268,89 +253,70 @@ class ProxyAgent(_AgentBase):
)
def _http_proxy_endpoint(
proxy: Optional[bytes],
reactor: IReactorCore,
tls_options_factory: IPolicyForHTTPS,
**kwargs,
) -> Tuple[Optional[IStreamClientEndpoint], Optional[ProxyCredentials]]:
def _http_proxy_endpoint(proxy: Optional[bytes], reactor, **kwargs):
"""Parses an http proxy setting and returns an endpoint for the proxy
Args:
proxy: the proxy setting in the form: [scheme://][<username>:<password>@]<host>[:<port>]
This currently supports http:// and https:// proxies.
A hostname without scheme is assumed to be http.
proxy: the proxy setting in the form: [<username>:<password>@]<host>[:<port>]
Note that compared to other apps, this function currently lacks support
for specifying a protocol schema (i.e. protocol://...).
reactor: reactor to be used to connect to the proxy
tls_options_factory: the TLS options to use when connecting through a https proxy
kwargs: other args to be passed to HostnameEndpoint
Returns:
a tuple of
endpoint to use to connect to the proxy, or None
ProxyCredentials or if no credentials were found, or None
Raise:
ValueError if proxy has no hostname or unsupported scheme.
interfaces.IStreamClientEndpoint|None: endpoint to use to connect to the proxy,
or None
"""
if proxy is None:
return None, None
return None
# Note: urlsplit/urlparse cannot be used here as that does not work (for Python
# 3.9+) on scheme-less proxies, e.g. host:port.
scheme, host, port, credentials = parse_proxy(proxy)
proxy_endpoint = HostnameEndpoint(reactor, host, port, **kwargs)
if scheme == b"https":
tls_options = tls_options_factory.creatorForNetloc(host, port)
proxy_endpoint = wrapClientTLS(tls_options, proxy_endpoint)
return proxy_endpoint, credentials
# Parse the connection string
host, port = parse_host_port(proxy, default_port=1080)
return HostnameEndpoint(reactor, host, port, **kwargs)
def parse_proxy(
proxy: bytes, default_scheme: bytes = b"http", default_port: int = 1080
) -> Tuple[bytes, bytes, int, Optional[ProxyCredentials]]:
def parse_username_password(proxy: bytes) -> Tuple[Optional[ProxyCredentials], bytes]:
"""
Parse a proxy connection string.
Given a HTTP proxy URL, breaks it down into components and checks that it
has a hostname (otherwise it is not useful to us when trying to find a
proxy) and asserts that the URL has a scheme we support.
Parses the username and password from a proxy declaration e.g
username:password@hostname:port.
Args:
proxy: The proxy connection string. Must be in the form '[scheme://][<username>:<password>@]host[:port]'.
default_scheme: The default scheme to return if one is not found in `proxy`. Defaults to http
default_port: The default port to return if one is not found in `proxy`. Defaults to 1080
proxy: The proxy connection string.
Returns
An instance of ProxyCredentials and the proxy connection string with any credentials
stripped, i.e u:p@host:port -> host:port. If no credentials were found, the
ProxyCredentials instance is replaced with None.
"""
if proxy and b"@" in proxy:
# We use rsplit here as the password could contain an @ character
credentials, proxy_without_credentials = proxy.rsplit(b"@", 1)
return ProxyCredentials(credentials), proxy_without_credentials
return None, proxy
def parse_host_port(hostport: bytes, default_port: int = None) -> Tuple[bytes, int]:
"""
Parse the hostname and port from a proxy connection byte string.
Args:
hostport: The proxy connection string. Must be in the form 'host[:port]'.
default_port: The default port to return if one is not found in `hostport`.
Returns:
A tuple containing the scheme, hostname, port and ProxyCredentials.
If no credentials were found, the ProxyCredentials instance is replaced with None.
Raise:
ValueError if proxy has no hostname or unsupported scheme.
A tuple containing the hostname and port. Uses `default_port` if one was not found.
"""
# First check if we have a scheme present
# Note: urlsplit/urlparse cannot be used (for Python # 3.9+) on scheme-less proxies, e.g. host:port.
if b"://" not in proxy:
proxy = b"".join([default_scheme, b"://", proxy])
if b":" in hostport:
host, port = hostport.rsplit(b":", 1)
try:
port = int(port)
return host, port
except ValueError:
# the thing after the : wasn't a valid port; presumably this is an
# IPv6 address.
pass
url = urlparse(proxy)
if not url.hostname:
raise ValueError("Proxy URL did not contain a hostname! Please specify one.")
if url.scheme not in (b"http", b"https"):
raise ValueError(
f"Unknown proxy scheme {url.scheme!s}; only 'http' and 'https' is supported."
)
credentials = None
if url.username and url.password:
credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))
return url.scheme, url.hostname, url.port or default_port, credentials
return hostport, default_port
-1
View File
@@ -45,7 +45,6 @@ class PeriodicallyFlushingMemoryHandler(MemoryHandler):
self._flushing_thread: Thread = Thread(
name="PeriodicallyFlushingMemoryHandler flushing thread",
target=self._flush_periodically,
daemon=True,
)
self._flushing_thread.start()
+2 -3
View File
@@ -111,9 +111,8 @@ class _NotifierUserStream:
self.last_notified_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(
self,
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
+2
View File
@@ -24,6 +24,7 @@ from synapse.replication.http import (
register,
send_event,
streams,
typing,
)
REPLICATION_PREFIX = "/_synapse/replication"
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)
typing.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
+89
View File
@@ -0,0 +1,89 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.types import Requester, UserID
from typing import TYPE_CHECKING
import logging
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReplicationTypingRestServlet(ReplicationEndpoint):
"""Call to start or stop a user typing in a room.
Request format:
POST /_synapse/replication/typing/:room_id/:user_id
{
"requester": ...,
"typing": true,
"timeout": 30000
}
"""
NAME = "typing"
PATH_ARGS = ("room_id", "user_id")
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.handler = hs.get_typing_handler()
self.store = hs.get_datastore()
@staticmethod
async def _serialize_payload(requester, room_id, user_id, typing, timeout):
payload = {
"requester": requester.serialize(),
"typing": typing,
"timeout": timeout,
}
return payload
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
requester = Requester.deserialize(self.store, content["requester"])
request.requester = requester
target_user = UserID.from_string(user_id)
if content["typing"]:
await self.handler.started_typing(
target_user,
requester,
room_id,
content["timeout"],
)
else:
await self.handler.stopped_typing(
target_user,
requester,
room_id,
)
return 200, {}
def register_servlets(hs, http_server):
ReplicationTypingRestServlet(hs).register(http_server)
+1 -6
View File
@@ -285,7 +285,7 @@ class ReplicationDataHandler:
# Create a new deferred that times out after N seconds, as we don't want
# to wedge here forever.
deferred: "Deferred[None]" = Deferred()
deferred = Deferred()
deferred = timeout_deferred(
deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor
)
@@ -393,11 +393,6 @@ class FederationSenderHandler:
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
if (
receipt.data.get("hidden", False)
and self._hs.config.experimental.msc2285_enabled
):
continue
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,
+4 -19
View File
@@ -504,6 +504,7 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
events_to_create = body["events"]
prev_event_ids = prev_events_from_query
inherited_depth = await self._inherit_depth_from_prev_ids(
prev_events_from_query
)
@@ -515,10 +516,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
chunk_id_to_connect_to = chunk_id_from_query
base_insertion_event = None
if chunk_id_from_query:
# All but the first base insertion event should point at a fake
# event, which causes the HS to ask for the state at the start of
# the chunk later.
prev_event_ids = [fake_prev_event_id]
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
pass
# Otherwise, create an insertion event to act as a starting point.
@@ -529,8 +526,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
# an insertion event), in which case we just create a new insertion event
# that can then get pointed to by a "marker" event later.
else:
prev_event_ids = prev_events_from_query
base_insertion_event_dict = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
@@ -1259,18 +1254,11 @@ class RoomTypingRestServlet(RestServlet):
self.presence_handler = hs.get_presence_handler()
self.auth = hs.get_auth()
# If we're not on the typing writer instance we should scream if we get
# requests.
self._is_typing_writer = (
hs.config.worker.writers.typing == hs.get_instance_name()
)
self.handler = hs.get_typing_handler()
async def on_PUT(self, request, room_id, user_id):
requester = await self.auth.get_user_by_req(request)
if not self._is_typing_writer:
raise Exception("Got /typing request on instance that is not typing writer")
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))
@@ -1281,19 +1269,16 @@ class RoomTypingRestServlet(RestServlet):
# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
# Defer getting the typing handler since it will raise on workers.
typing_handler = self.hs.get_typing_writer_handler()
try:
if content["typing"]:
await typing_handler.started_typing(
await self.handler.started_typing(
target_user=target_user,
requester=requester,
room_id=room_id,
timeout=timeout,
)
else:
await typing_handler.stopped_typing(
await self.handler.stopped_typing(
target_user=target_user, requester=requester, room_id=room_id
)
except ShadowBanError:
+1 -13
View File
@@ -14,8 +14,6 @@
import logging
from synapse.api.constants import ReadReceiptEventFields
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from ._base import client_patterns
@@ -39,24 +37,14 @@ class ReadMarkerRestServlet(RestServlet):
await self.presence_handler.bump_presence_active_time(requester.user)
body = parse_json_object_from_request(request)
read_event_id = body.get("m.read", None)
hidden = body.get(ReadReceiptEventFields.MSC2285_HIDDEN, False)
if not isinstance(hidden, bool):
raise SynapseError(
400,
"Param %s must be a boolean, if given"
% ReadReceiptEventFields.MSC2285_HIDDEN,
Codes.BAD_JSON,
)
if read_event_id:
await self.receipts_handler.received_client_receipt(
room_id,
"m.read",
user_id=requester.user.to_string(),
event_id=read_event_id,
hidden=hidden,
)
read_marker_event_id = body.get("m.fully_read", None)
+3 -19
View File
@@ -14,9 +14,8 @@
import logging
from synapse.api.constants import ReadReceiptEventFields
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet
from ._base import client_patterns
@@ -43,25 +42,10 @@ class ReceiptRestServlet(RestServlet):
if receipt_type != "m.read":
raise SynapseError(400, "Receipt type must be 'm.read'")
body = parse_json_object_from_request(request, allow_empty_body=True)
hidden = body.get(ReadReceiptEventFields.MSC2285_HIDDEN, False)
if not isinstance(hidden, bool):
raise SynapseError(
400,
"Param %s must be a boolean, if given"
% ReadReceiptEventFields.MSC2285_HIDDEN,
Codes.BAD_JSON,
)
await self.presence_handler.bump_presence_active_time(requester.user)
await self.receipts_handler.received_client_receipt(
room_id,
receipt_type,
user_id=requester.user.to_string(),
event_id=event_id,
hidden=hidden,
room_id, receipt_type, user_id=requester.user.to_string(), event_id=event_id
)
return 200, {}
-2
View File
@@ -82,8 +82,6 @@ class VersionsRestServlet(RestServlet):
"io.element.e2ee_forced.trusted_private": self.e2ee_forced_trusted_private,
# Supports the busy presence state described in MSC3026.
"org.matrix.msc3026.busy_presence": self.config.experimental.msc3026_enabled,
# Supports receiving hidden read receipts as per MSC2285
"org.matrix.msc2285": self.config.experimental.msc2285_enabled,
},
},
)
@@ -58,11 +58,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_charset_match = re.compile(
br'<\s*meta[^>]*charset\s*=\s*"?([a-z0-9_-]+)"?', flags=re.I
)
_charset_match = re.compile(br'<\s*meta[^>]*charset\s*=\s*"?([a-z0-9-]+)"?', flags=re.I)
_xml_encoding_match = re.compile(
br'\s*<\s*\?\s*xml[^>]*encoding="([a-z0-9_-]+)"', flags=re.I
br'\s*<\s*\?\s*xml[^>]*encoding="([a-z0-9-]+)"', flags=re.I
)
_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
-21
View File
@@ -15,7 +15,6 @@
# limitations under the License.
import logging
import time
from collections import defaultdict
from sys import intern
from time import monotonic as monotonic_time
from typing import (
@@ -398,7 +397,6 @@ class DatabasePool:
):
self.hs = hs
self._clock = hs.get_clock()
self._txn_limit = database_config.config.get("txn_limit", 0)
self._database_config = database_config
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
@@ -408,9 +406,6 @@ class DatabasePool:
self._current_txn_total_time = 0.0
self._previous_loop_ts = 0.0
# Transaction counter: key is the twisted thread id, value is the current count
self._txn_counters: Dict[int, int] = defaultdict(int)
# TODO(paul): These can eventually be removed once the metrics code
# is running in mainline, and we have some nice monitoring frontends
# to watch it
@@ -755,26 +750,10 @@ class DatabasePool:
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
if self._txn_limit > 0:
tid = self._db_pool.threadID()
self._txn_counters[tid] += 1
if self._txn_counters[tid] > self._txn_limit:
logger.debug(
"Reconnecting database connection over transaction limit"
)
conn.reconnect()
opentracing.log_kv(
{"message": "reconnected due to txn limit"}
)
self._txn_counters[tid] = 1
if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
opentracing.log_kv({"message": "reconnected"})
if self._txn_limit > 0:
self._txn_counters[tid] = 1
try:
if db_autocommit:
@@ -21,7 +21,6 @@ from canonicaljson import encode_canonical_json
from twisted.enterprise.adbapi import Connection
from synapse.api.constants import DeviceKeyAlgorithms
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
@@ -382,15 +381,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
" GROUP BY algorithm"
)
txn.execute(sql, (user_id, device_id))
# Initially set the key count to 0. This ensures that the client will always
# receive *some count*, even if it's 0.
result = {DeviceKeyAlgorithms.SIGNED_CURVE25519: 0}
# Override entries with the count of any keys we pulled from the database
result = {}
for algorithm, key_count in txn:
result[algorithm] = key_count
return result
return await self.db_pool.runInteraction(
@@ -16,11 +16,11 @@ import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter, Gauge
from prometheus_client import Gauge
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -44,12 +44,6 @@ number_pdus_in_federation_queue = Gauge(
"The total number of events in the inbound federation staging",
)
pdus_pruned_from_federation_queue = Counter(
"synapse_federation_server_number_inbound_pdu_pruned",
"The number of events in the inbound federation staging that have been "
"pruned due to the queue getting too long",
)
logger = logging.getLogger(__name__)
@@ -942,46 +936,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
# Look for the prev_event_id connected to the given event_id
query = """
SELECT depth, prev_event_id FROM event_edges
/* Get the depth of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Find an event which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
LIMIT ?
"""
query = (
"SELECT depth, prev_event_id FROM event_edges"
" INNER JOIN events"
" ON prev_event_id = events.event_id"
" WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)
# Look for the "insertion" events connected to the given event_id
connected_insertion_event_query = """
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which points via prev_events to the given event_id */
WHERE i.insertion_prev_event_id = ?
LIMIT ?
"""
# Find any chunk connections of a given insertion event
chunk_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
/* Find the chunk that connects to the given insertion event */
INNER JOIN chunk_events AS c
ON i.next_chunk_id = c.chunk_id
/* Get the depth of the chunk start event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which matches the given event_id */
WHERE i.event_id = ?
LIMIT ?
"""
# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue.
# Depth is lowest at the oldest-in-time message and highest and
# newest-in-time message. We add events to the queue with a negative depth so that
# we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()
for event_id in event_list:
@@ -1007,48 +970,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
# Try and find any potential historical chunks of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
# find any chunk events connected to the insertion event (by
# chunk_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
# Find any chunk connections for the given insertion event
txn.execute(
chunk_connection_query,
(connected_insertion_event, limit - len(event_results)),
)
chunk_start_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: chunk_start_event_id_results %s",
chunk_start_event_id_results,
)
for row in chunk_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
# Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)
for row in prev_event_id_results:
for row in txn:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
@@ -1283,100 +1207,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event
async def prune_staged_events_in_room(
self,
room_id: str,
room_version: RoomVersion,
) -> bool:
"""Checks if there are lots of staged events for the room, and if so
prune them down.
Returns:
Whether any events were pruned
"""
# First check the size of the queue.
count = await self.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="prune_staged_events_in_room_count",
)
if count < 100:
return False
# If the queue is too large, then we want clear the entire queue,
# keeping only the forward extremities (i.e. the events not referenced
# by other events in the queue). We do this so that we can always
# backpaginate in all the events we have dropped.
rows = await self.db_pool.simple_select_list(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcols=("event_id", "event_json"),
desc="prune_staged_events_in_room_fetch",
)
# Find the set of events referenced by those in the queue, as well as
# collecting all the event IDs in the queue.
referenced_events: Set[str] = set()
seen_events: Set[str] = set()
for row in rows:
event_id = row["event_id"]
seen_events.add(event_id)
event_d = db_to_json(row["event_json"])
# We don't bother parsing the dicts into full blown event objects,
# as that is needlessly expensive.
# We haven't checked that the `prev_events` have the right format
# yet, so we check as we go.
prev_events = event_d.get("prev_events", [])
if not isinstance(prev_events, list):
logger.info("Invalid prev_events for %s", event_id)
continue
if room_version.event_format == EventFormatVersions.V1:
for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
logger.info("Invalid prev_events for %s", event_id)
break
prev_event_id = prev_event_tuple[0]
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break
referenced_events.add(prev_event_id)
else:
for prev_event_id in prev_events:
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break
referenced_events.add(prev_event_id)
to_delete = referenced_events & seen_events
if not to_delete:
return False
pdus_pruned_from_federation_queue.inc(len(to_delete))
logger.info(
"Pruning %d events in room %s from federation queue",
len(to_delete),
room_id,
)
await self.db_pool.simple_delete_many(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
iterable=to_delete,
column="event_id",
desc="prune_staged_events_in_room_delete",
)
return True
async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
@@ -1397,15 +1227,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
(count,) = txn.fetchone()
txn.execute(
"SELECT min(received_ts) FROM federation_inbound_events_staging"
"SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
)
(received_ts,) = txn.fetchone()
# If there is nothing in the staging area default it to 0.
age = 0
if received_ts is not None:
age = self._clock.time_msec() - received_ts
age = self._clock.time_msec() - received_ts
return count, age
-91
View File
@@ -1502,9 +1502,6 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
self._handle_insertion_event(txn, event)
self._handle_chunk_event(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1757,94 +1754,6 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
"""Handles keeping track of insertion events and edges/connections.
Part of MSC2716.
Args:
txn: The database transaction object
event: The event to process
"""
if event.type != EventTypes.MSC2716_INSERTION:
# Not a insertion event
return
# Skip processing a insertion event if the room version doesn't
# support it.
room_version = self.store.get_room_version_txn(txn, event.room_id)
if not room_version.msc2716_historical:
return
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
if next_chunk_id is None:
# Invalid insertion event without next chunk ID
return
logger.debug(
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
)
# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"next_chunk_id": next_chunk_id,
},
)
# Insert an edge for every prev_event connection
for prev_event_id in event.prev_events:
self.db_pool.simple_insert_txn(
txn,
table="insertion_event_edges",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"insertion_prev_event_id": prev_event_id,
},
)
def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
"""Handles inserting the chunk edges/connections between the chunk event
and an insertion event. Part of MSC2716.
Args:
txn: The database transaction object
event: The event to process
"""
if event.type != EventTypes.MSC2716_CHUNK:
# Not a chunk event
return
# Skip processing a chunk event if the room version doesn't
# support it.
room_version = self.store.get_room_version_txn(txn, event.room_id)
if not room_version.msc2716_historical:
return
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
if chunk_id is None:
# Invalid chunk event without a chunk ID
return
logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
txn,
table="chunk_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"chunk_id": chunk_id,
},
)
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
+11 -39
View File
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateFilter
@@ -58,32 +58,15 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
Raises:
NotFoundError: if the room is unknown
UnsupportedRoomVersionError: if the room uses an unknown room version.
Typically this happens if support for the room's version has been
removed from Synapse.
"""
return await self.db_pool.runInteraction(
"get_room_version_txn",
self.get_room_version_txn,
room_id,
)
def get_room_version_txn(
self, txn: LoggingTransaction, room_id: str
) -> RoomVersion:
"""Get the room_version of a given room
Args:
txn: Transaction object
room_id: The room_id of the room you are trying to get the version for
Raises:
NotFoundError: if the room is unknown
UnsupportedRoomVersionError: if the room uses an unknown room version.
Typically this happens if support for the room's version has been
removed from Synapse.
"""
room_version_id = self.get_room_version_id_txn(txn, room_id)
room_version_id = await self.get_room_version_id(room_id)
v = KNOWN_ROOM_VERSIONS.get(room_version_id)
if not v:
@@ -97,20 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@cached(max_entries=10000)
async def get_room_version_id(self, room_id: str) -> str:
"""Get the room_version of a given room
Raises:
NotFoundError: if the room is unknown
"""
return await self.db_pool.runInteraction(
"get_room_version_id_txn",
self.get_room_version_id_txn,
room_id,
)
def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
"""Get the room_version of a given room
Args:
txn: Transaction object
room_id: The room_id of the room you are trying to get the version for
Raises:
NotFoundError: if the room is unknown
"""
@@ -118,22 +88,24 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# First we try looking up room version from the database, but for old
# rooms we might not have added the room version to it yet so we fall
# back to previous behaviour and look in current state events.
#
# We really should have an entry in the rooms table for every room we
# care about, but let's be a bit paranoid (at least while the background
# update is happening) to avoid breaking existing rooms.
room_version = self.db_pool.simple_select_one_onecol_txn(
txn,
version = await self.db_pool.simple_select_one_onecol(
table="rooms",
keyvalues={"room_id": room_id},
retcol="room_version",
desc="get_room_version",
allow_none=True,
)
if room_version is None:
raise NotFoundError("Could not room_version for %s" % (room_id,))
if version is not None:
return version
return room_version
# Retrieve the room's create event
create_event = await self.get_create_event_for_room(room_id)
return create_event.content.get("room_version", "1")
async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
"""Get the predecessor of an upgraded room if it exists.
+1 -3
View File
@@ -170,9 +170,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
end_item = queue[-1]
else:
# need to make a new queue item
deferred: ObservableDeferred[_PersistResult] = ObservableDeferred(
defer.Deferred(), consumeErrors=True
)
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
end_item = _EventPersistQueueItem(
events_and_contexts=[],
@@ -1,49 +0,0 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a table that keeps track of "insertion" events and
-- their next_chunk_id's so we can navigate to the next chunk of history.
CREATE TABLE IF NOT EXISTS insertion_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
next_chunk_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
-- Add a table that keeps track of all of the events we are inserting between.
-- We use this when navigating the DAG and when we hit an event which matches
-- `insertion_prev_event_id`, it should backfill from the "insertion" event and
-- navigate the historical messages from there.
CREATE TABLE IF NOT EXISTS insertion_event_edges(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
insertion_prev_event_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
-- Add a table that keeps track of how each chunk is labeled. The chunks are
-- connected together based on an insertion events `next_chunk_id`.
CREATE TABLE IF NOT EXISTS chunk_events(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
chunk_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS chunk_events_event_id ON chunk_events(event_id);
CREATE INDEX IF NOT EXISTS chunk_events_chunk_id ON chunk_events(chunk_id);
+13 -15
View File
@@ -23,7 +23,6 @@ from typing import (
Awaitable,
Callable,
Dict,
Generic,
Hashable,
Iterable,
List,
@@ -40,7 +39,6 @@ from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.python import failure
from twisted.python.failure import Failure
from synapse.logging.context import (
PreserveLoggingContext,
@@ -51,10 +49,8 @@ from synapse.util import Clock, unwrapFirstError
logger = logging.getLogger(__name__)
_T = TypeVar("_T")
class ObservableDeferred(Generic[_T]):
class ObservableDeferred:
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
deferred.
@@ -72,7 +68,7 @@ class ObservableDeferred(Generic[_T]):
__slots__ = ["_deferred", "_observers", "_result"]
def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
def __init__(self, deferred: defer.Deferred, consumeErrors: bool = False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
object.__setattr__(self, "_observers", set())
@@ -117,7 +113,7 @@ class ObservableDeferred(Generic[_T]):
deferred.addCallbacks(callback, errback)
def observe(self) -> "defer.Deferred[_T]":
def observe(self) -> defer.Deferred:
"""Observe the underlying deferred.
This returns a brand new deferred that is resolved when the underlying
@@ -125,7 +121,7 @@ class ObservableDeferred(Generic[_T]):
effect the underlying deferred.
"""
if not self._result:
d: "defer.Deferred[_T]" = defer.Deferred()
d = defer.Deferred()
def remove(r):
self._observers.discard(d)
@@ -139,7 +135,7 @@ class ObservableDeferred(Generic[_T]):
success, res = self._result
return defer.succeed(res) if success else defer.fail(res)
def observers(self) -> "List[defer.Deferred[_T]]":
def observers(self) -> List[defer.Deferred]:
return self._observers
def has_called(self) -> bool:
@@ -148,7 +144,7 @@ class ObservableDeferred(Generic[_T]):
def has_succeeded(self) -> bool:
return self._result is not None and self._result[0] is True
def get_result(self) -> Union[_T, Failure]:
def get_result(self) -> Any:
return self._result[1]
def __getattr__(self, name: str) -> Any:
@@ -419,7 +415,7 @@ class ReadWriteLock:
self.key_to_current_writer: Dict[str, defer.Deferred] = {}
async def read(self, key: str) -> ContextManager:
new_defer: "defer.Deferred[None]" = defer.Deferred()
new_defer = defer.Deferred()
curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
@@ -442,7 +438,7 @@ class ReadWriteLock:
return _ctx_manager()
async def write(self, key: str) -> ContextManager:
new_defer: "defer.Deferred[None]" = defer.Deferred()
new_defer = defer.Deferred()
curr_readers = self.key_to_current_readers.get(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
@@ -475,8 +471,10 @@ R = TypeVar("R")
def timeout_deferred(
deferred: "defer.Deferred[_T]", timeout: float, reactor: IReactorTime
) -> "defer.Deferred[_T]":
deferred: defer.Deferred,
timeout: float,
reactor: IReactorTime,
) -> defer.Deferred:
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
@@ -499,7 +497,7 @@ def timeout_deferred(
Returns:
A new Deferred, which will errback with defer.TimeoutError on timeout.
"""
new_d: "defer.Deferred[_T]" = defer.Deferred()
new_d = defer.Deferred()
timed_out = [False]
+10 -17
View File
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
from typing import Awaitable, Callable, Generic, Optional, TypeVar, Union
from twisted.internet.defer import Deferred
@@ -22,10 +22,6 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
TV = TypeVar("TV")
class _Sentinel(enum.Enum):
sentinel = object()
class CachedCall(Generic[TV]):
"""A wrapper for asynchronous calls whose results should be shared
@@ -69,7 +65,7 @@ class CachedCall(Generic[TV]):
"""
self._callable: Optional[Callable[[], Awaitable[TV]]] = f
self._deferred: Optional[Deferred] = None
self._result: Union[_Sentinel, TV, Failure] = _Sentinel.sentinel
self._result: Union[None, Failure, TV] = None
async def get(self) -> TV:
"""Kick off the call if necessary, and return the result"""
@@ -82,9 +78,8 @@ class CachedCall(Generic[TV]):
self._callable = None
# once the deferred completes, store the result. We cannot simply leave the
# result in the deferred, since `awaiting` a deferred destroys its result.
# (Also, if it's a Failure, GCing the deferred would log a critical error
# about unhandled Failures)
# result in the deferred, since if it's a Failure, GCing the deferred
# would then log a critical error about unhandled Failures.
def got_result(r):
self._result = r
@@ -97,15 +92,13 @@ class CachedCall(Generic[TV]):
# and any eventual exception may not be reported.
# we can now await the deferred, and once it completes, return the result.
if isinstance(self._result, _Sentinel):
await make_deferred_yieldable(self._deferred)
assert not isinstance(self._result, _Sentinel)
await make_deferred_yieldable(self._deferred)
if isinstance(self._result, Failure):
self._result.raiseException()
raise AssertionError("unexpected return from Failure.raiseException")
return self._result
# I *think* this is the easiest way to correctly raise a Failure without having
# to gut-wrench into the implementation of Deferred.
d = Deferred()
d.callback(self._result)
return await d
class RetryOnExceptionCachedCall(Generic[TV]):
+3 -12
View File
@@ -16,16 +16,7 @@
import enum
import threading
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union
from prometheus_client import Gauge
@@ -175,7 +166,7 @@ class DeferredCache(Generic[KT, VT]):
def set(
self,
key: KT,
value: "defer.Deferred[VT]",
value: defer.Deferred,
callback: Optional[Callable[[], None]] = None,
) -> defer.Deferred:
"""Adds a new entry to the cache (or updates an existing one).
@@ -223,7 +214,7 @@ class DeferredCache(Generic[KT, VT]):
if value.called:
result = value.result
if not isinstance(result, failure.Failure):
self.cache.set(key, cast(VT, result), callbacks)
self.cache.set(key, result, callbacks)
return value
# otherwise, we'll add an entry to the _pending_deferred_cache for now,
+1 -1
View File
@@ -413,7 +413,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
# relevant result for that key.
deferreds_map = {}
for arg in missing:
deferred: "defer.Deferred[Any]" = defer.Deferred()
deferred = defer.Deferred()
deferreds_map[arg] = deferred
key = arg_to_cache_key(arg)
cache.set(key, deferred, callback=invalidate_callback)
+1
View File
@@ -1,3 +1,4 @@
#!/usr/bin/env python
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
-43
View File
@@ -301,49 +301,6 @@ class PruneEventTestCase(unittest.TestCase):
room_version=RoomVersions.MSC2176,
)
def test_join_rules(self):
"""Join rules events have changed behavior starting with MSC3083."""
self.run_test(
{
"type": "m.room.join_rules",
"event_id": "$test:domain",
"content": {
"join_rule": "invite",
"allow": [],
"other_key": "stripped",
},
},
{
"type": "m.room.join_rules",
"event_id": "$test:domain",
"content": {"join_rule": "invite"},
"signatures": {},
"unsigned": {},
},
)
# After MSC3083, alias events have no special behavior.
self.run_test(
{
"type": "m.room.join_rules",
"content": {
"join_rule": "invite",
"allow": [],
"other_key": "stripped",
},
},
{
"type": "m.room.join_rules",
"content": {
"join_rule": "invite",
"allow": [],
},
"signatures": {},
"unsigned": {},
},
room_version=RoomVersions.MSC3083,
)
class SerializeEventTestCase(unittest.TestCase):
def serialize(self, ev, fields):
+5 -15
View File
@@ -47,16 +47,12 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
"alg2:k3": {"key": "key3"},
}
# Note that "signed_curve25519" is always returned in key count responses. This is necessary until
# https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
res = self.get_success(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": keys}
)
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}}
)
self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
# we should be able to change the signature without a problem
keys["alg2:k2"]["signatures"]["k1"] = "sig2"
@@ -65,9 +61,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
local_user, device_id, {"one_time_keys": keys}
)
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}}
)
self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
def test_change_one_time_keys(self):
"""attempts to change one-time-keys should be rejected"""
@@ -85,9 +79,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
local_user, device_id, {"one_time_keys": keys}
)
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}}
)
self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
# Error when changing string key
self.get_failure(
@@ -97,7 +89,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
SynapseError,
)
# Error when replacing dict key with string
# Error when replacing dict key with strin
self.get_failure(
self.handler.upload_keys_for_user(
local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}
@@ -139,9 +131,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
local_user, device_id, {"one_time_keys": keys}
)
)
self.assertDictEqual(
res, {"one_time_key_counts": {"alg1": 1, "signed_curve25519": 0}}
)
self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1}})
res2 = self.get_success(
self.handler.claim_one_time_keys(
-131
View File
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from unittest import TestCase
from synapse.api.constants import EventTypes
@@ -23,7 +22,6 @@ from synapse.federation.federation_base import event_from_pdu_json
from synapse.logging.context import LoggingContext, run_in_background
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.util.stringutils import random_string
from tests import unittest
@@ -41,8 +39,6 @@ class FederationTestCase(unittest.HomeserverTestCase):
hs = self.setup_test_homeserver(federation_http_client=None)
self.handler = hs.get_federation_handler()
self.store = hs.get_datastore()
self.state_store = hs.get_storage().state
self._event_auth_handler = hs.get_event_auth_handler()
return hs
def test_exchange_revoked_invite(self):
@@ -194,133 +190,6 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(sg, sg2)
def test_backfill_floating_outlier_membership_auth(self):
"""
As the local homeserver, check that we can properly process a federated
event from the OTHER_SERVER with auth_events that include a floating
membership event from the OTHER_SERVER.
Regression test, see #10439.
"""
OTHER_SERVER = "otherserver"
OTHER_USER = "@otheruser:" + OTHER_SERVER
# create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(
room_creator=user_id,
is_public=True,
tok=tok,
extra_content={
"preset": "public_chat",
},
)
room_version = self.get_success(self.store.get_room_version(room_id))
prev_event_ids = self.get_success(self.store.get_prev_events_for_room(room_id))
(
most_recent_prev_event_id,
most_recent_prev_event_depth,
) = self.get_success(self.store.get_max_depth_of(prev_event_ids))
# mapping from (type, state_key) -> state_event_id
prev_state_map = self.get_success(
self.state_store.get_state_ids_for_event(most_recent_prev_event_id)
)
# List of state event ID's
prev_state_ids = list(prev_state_map.values())
auth_event_ids = prev_state_ids
auth_events = list(
self.get_success(self.store.get_events(auth_event_ids)).values()
)
# build a floating outlier member state event
fake_prev_event_id = "$" + random_string(43)
member_event_dict = {
"type": EventTypes.Member,
"content": {
"membership": "join",
},
"state_key": OTHER_USER,
"room_id": room_id,
"sender": OTHER_USER,
"depth": most_recent_prev_event_depth,
"prev_events": [fake_prev_event_id],
"origin_server_ts": self.clock.time_msec(),
"signatures": {OTHER_SERVER: {"ed25519:key_version": "SomeSignatureHere"}},
}
builder = self.hs.get_event_builder_factory().for_room_version(
room_version, member_event_dict
)
member_event = self.get_success(
builder.build(
prev_event_ids=member_event_dict["prev_events"],
auth_event_ids=self._event_auth_handler.compute_auth_events(
builder,
prev_state_map,
for_verification=False,
),
depth=member_event_dict["depth"],
)
)
# Override the signature added from "test" homeserver that we created the event with
member_event.signatures = member_event_dict["signatures"]
# Add the new member_event to the StateMap
prev_state_map[
(member_event.type, member_event.state_key)
] = member_event.event_id
auth_events.append(member_event)
# build and send an event authed based on the member event
message_event_dict = {
"type": EventTypes.Message,
"content": {},
"room_id": room_id,
"sender": OTHER_USER,
"depth": most_recent_prev_event_depth,
"prev_events": prev_event_ids.copy(),
"origin_server_ts": self.clock.time_msec(),
"signatures": {OTHER_SERVER: {"ed25519:key_version": "SomeSignatureHere"}},
}
builder = self.hs.get_event_builder_factory().for_room_version(
room_version, message_event_dict
)
message_event = self.get_success(
builder.build(
prev_event_ids=message_event_dict["prev_events"],
auth_event_ids=self._event_auth_handler.compute_auth_events(
builder,
prev_state_map,
for_verification=False,
),
depth=message_event_dict["depth"],
)
)
# Override the signature added from "test" homeserver that we created the event with
message_event.signatures = message_event_dict["signatures"]
# Stub the /event_auth response from the OTHER_SERVER
async def get_event_auth(
destination: str, room_id: str, event_id: str
) -> List[EventBase]:
return auth_events
self.handler.federation_client.get_event_auth = get_event_auth
with LoggingContext("receive_pdu"):
# Fake the OTHER_SERVER federating the message event over to our local homeserver
d = run_in_background(
self.handler.on_receive_pdu, OTHER_SERVER, message_event
)
self.get_success(d)
# Now try and get the events on our local homeserver
stored_event = self.get_success(
self.store.get_event(message_event.event_id, allow_none=True)
)
self.assertTrue(stored_event is not None)
@unittest.override_config(
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
)
-294
View File
@@ -1,294 +0,0 @@
# Copyright 2021 Šimon Brandner <simon.bra.ag@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from synapse.api.constants import ReadReceiptEventFields
from synapse.types import JsonDict
from tests import unittest
class ReceiptsTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.event_source = hs.get_event_sources().sources["receipt"]
# In the first param of _test_filters_hidden we use "hidden" instead of
# ReadReceiptEventFields.MSC2285_HIDDEN. We do this because we're mocking
# the data from the database which doesn't use the prefix
def test_filters_out_hidden_receipt(self):
self._test_filters_hidden(
[
{
"content": {
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@rikj:jki.re": {
"ts": 1436451550453,
"hidden": True,
}
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
[],
)
def test_does_not_filter_out_our_hidden_receipt(self):
self._test_filters_hidden(
[
{
"content": {
"$1435641916hfgh4394fHBLK:matrix.org": {
"m.read": {
"@me:server.org": {
"ts": 1436451550453,
"hidden": True,
},
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
[
{
"content": {
"$1435641916hfgh4394fHBLK:matrix.org": {
"m.read": {
"@me:server.org": {
"ts": 1436451550453,
ReadReceiptEventFields.MSC2285_HIDDEN: True,
},
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
)
def test_filters_out_hidden_receipt_and_ignores_rest(self):
self._test_filters_hidden(
[
{
"content": {
"$1dgdgrd5641916114394fHBLK:matrix.org": {
"m.read": {
"@rikj:jki.re": {
"ts": 1436451550453,
"hidden": True,
},
"@user:jki.re": {
"ts": 1436451550453,
},
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
[
{
"content": {
"$1dgdgrd5641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
)
def test_filters_out_event_with_only_hidden_receipts_and_ignores_the_rest(self):
self._test_filters_hidden(
[
{
"content": {
"$14356419edgd14394fHBLK:matrix.org": {
"m.read": {
"@rikj:jki.re": {
"ts": 1436451550453,
"hidden": True,
},
}
},
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
[
{
"content": {
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
)
def test_handles_missing_content_of_m_read(self):
self._test_filters_hidden(
[
{
"content": {
"$14356419ggffg114394fHBLK:matrix.org": {"m.read": {}},
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
[
{
"content": {
"$14356419ggffg114394fHBLK:matrix.org": {"m.read": {}},
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
)
def test_handles_empty_event(self):
self._test_filters_hidden(
[
{
"content": {
"$143564gdfg6114394fHBLK:matrix.org": {},
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
[
{
"content": {
"$143564gdfg6114394fHBLK:matrix.org": {},
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
)
def test_filters_out_receipt_event_with_only_hidden_receipt_and_ignores_rest(self):
self._test_filters_hidden(
[
{
"content": {
"$14356419edgd14394fHBLK:matrix.org": {
"m.read": {
"@rikj:jki.re": {
"ts": 1436451550453,
"hidden": True,
},
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
},
{
"content": {
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
},
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
},
],
[
{
"content": {
"$1435641916114394fHBLK:matrix.org": {
"m.read": {
"@user:jki.re": {
"ts": 1436451550453,
}
}
}
},
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"type": "m.receipt",
}
],
)
def _test_filters_hidden(
self, events: List[JsonDict], expected_output: List[JsonDict]
):
"""Tests that the _filter_out_hidden returns the expected output"""
filtered_events = self.event_source.filter_out_hidden(events, "@me:server.org")
self.assertEquals(filtered_events, expected_output)
+58 -340
View File
@@ -14,22 +14,19 @@
import base64
import logging
import os
from typing import Iterable, Optional
from typing import Optional
from unittest.mock import patch
import treq
from netaddr import IPSet
from parameterized import parameterized
from twisted.internet import interfaces # noqa: F401
from twisted.internet.endpoints import HostnameEndpoint, _WrapperEndpoint
from twisted.internet.interfaces import IProtocol, IProtocolFactory
from twisted.internet.protocol import Factory
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web.http import HTTPChannel
from synapse.http.client import BlacklistingReactorWrapper
from synapse.http.proxyagent import ProxyAgent, ProxyCredentials, parse_proxy
from synapse.http.proxyagent import ProxyAgent
from tests.http import TestServerTLSConnectionFactory, get_test_https_policy
from tests.server import FakeTransport, ThreadedMemoryReactorClock
@@ -40,208 +37,33 @@ logger = logging.getLogger(__name__)
HTTPFactory = Factory.forProtocol(HTTPChannel)
class ProxyParserTests(TestCase):
"""
Values for test
[
proxy_string,
expected_scheme,
expected_hostname,
expected_port,
expected_credentials,
]
"""
@parameterized.expand(
[
# host
[b"localhost", b"http", b"localhost", 1080, None],
[b"localhost:9988", b"http", b"localhost", 9988, None],
# host+scheme
[b"https://localhost", b"https", b"localhost", 1080, None],
[b"https://localhost:1234", b"https", b"localhost", 1234, None],
# ipv4
[b"1.2.3.4", b"http", b"1.2.3.4", 1080, None],
[b"1.2.3.4:9988", b"http", b"1.2.3.4", 9988, None],
# ipv4+scheme
[b"https://1.2.3.4", b"https", b"1.2.3.4", 1080, None],
[b"https://1.2.3.4:9988", b"https", b"1.2.3.4", 9988, None],
# ipv6 - without brackets is broken
# [
# b"2001:0db8:85a3:0000:0000:8a2e:0370:effe",
# b"http",
# b"2001:0db8:85a3:0000:0000:8a2e:0370:effe",
# 1080,
# None,
# ],
# [
# b"2001:0db8:85a3:0000:0000:8a2e:0370:1234",
# b"http",
# b"2001:0db8:85a3:0000:0000:8a2e:0370:1234",
# 1080,
# None,
# ],
# [b"::1", b"http", b"::1", 1080, None],
# [b"::ffff:0.0.0.0", b"http", b"::ffff:0.0.0.0", 1080, None],
# ipv6 - with brackets
[
b"[2001:0db8:85a3:0000:0000:8a2e:0370:effe]",
b"http",
b"2001:0db8:85a3:0000:0000:8a2e:0370:effe",
1080,
None,
],
[
b"[2001:0db8:85a3:0000:0000:8a2e:0370:1234]",
b"http",
b"2001:0db8:85a3:0000:0000:8a2e:0370:1234",
1080,
None,
],
[b"[::1]", b"http", b"::1", 1080, None],
[b"[::ffff:0.0.0.0]", b"http", b"::ffff:0.0.0.0", 1080, None],
# ipv6+port
[
b"[2001:0db8:85a3:0000:0000:8a2e:0370:effe]:9988",
b"http",
b"2001:0db8:85a3:0000:0000:8a2e:0370:effe",
9988,
None,
],
[
b"[2001:0db8:85a3:0000:0000:8a2e:0370:1234]:9988",
b"http",
b"2001:0db8:85a3:0000:0000:8a2e:0370:1234",
9988,
None,
],
[b"[::1]:9988", b"http", b"::1", 9988, None],
[b"[::ffff:0.0.0.0]:9988", b"http", b"::ffff:0.0.0.0", 9988, None],
# ipv6+scheme
[
b"https://[2001:0db8:85a3:0000:0000:8a2e:0370:effe]",
b"https",
b"2001:0db8:85a3:0000:0000:8a2e:0370:effe",
1080,
None,
],
[
b"https://[2001:0db8:85a3:0000:0000:8a2e:0370:1234]",
b"https",
b"2001:0db8:85a3:0000:0000:8a2e:0370:1234",
1080,
None,
],
[b"https://[::1]", b"https", b"::1", 1080, None],
[b"https://[::ffff:0.0.0.0]", b"https", b"::ffff:0.0.0.0", 1080, None],
# ipv6+scheme+port
[
b"https://[2001:0db8:85a3:0000:0000:8a2e:0370:effe]:9988",
b"https",
b"2001:0db8:85a3:0000:0000:8a2e:0370:effe",
9988,
None,
],
[
b"https://[2001:0db8:85a3:0000:0000:8a2e:0370:1234]:9988",
b"https",
b"2001:0db8:85a3:0000:0000:8a2e:0370:1234",
9988,
None,
],
[b"https://[::1]:9988", b"https", b"::1", 9988, None],
# with credentials
[
b"https://user:pass@1.2.3.4:9988",
b"https",
b"1.2.3.4",
9988,
b"user:pass",
],
[b"user:pass@1.2.3.4:9988", b"http", b"1.2.3.4", 9988, b"user:pass"],
[
b"https://user:pass@proxy.local:9988",
b"https",
b"proxy.local",
9988,
b"user:pass",
],
[
b"user:pass@proxy.local:9988",
b"http",
b"proxy.local",
9988,
b"user:pass",
],
]
)
def test_parse_proxy(
self,
proxy_string: bytes,
expected_scheme: bytes,
expected_hostname: bytes,
expected_port: int,
expected_credentials: Optional[bytes],
):
"""
Tests that a given proxy URL will be broken into the components.
Args:
proxy_string: The proxy connection string.
expected_scheme: Expected value of proxy scheme.
expected_hostname: Expected value of proxy hostname.
expected_port: Expected value of proxy port.
expected_credentials: Expected value of credentials.
Must be in form '<username>:<password>' or None
"""
proxy_cred = None
if expected_credentials:
proxy_cred = ProxyCredentials(expected_credentials)
self.assertEqual(
(
expected_scheme,
expected_hostname,
expected_port,
proxy_cred,
),
parse_proxy(proxy_string),
)
class MatrixFederationAgentTests(TestCase):
def setUp(self):
self.reactor = ThreadedMemoryReactorClock()
def _make_connection(
self,
client_factory: IProtocolFactory,
server_factory: IProtocolFactory,
ssl: bool = False,
expected_sni: Optional[bytes] = None,
tls_sanlist: Optional[Iterable[bytes]] = None,
) -> IProtocol:
self, client_factory, server_factory, ssl=False, expected_sni=None
):
"""Builds a test server, and completes the outgoing client connection
Args:
client_factory: the the factory that the
client_factory (interfaces.IProtocolFactory): the the factory that the
application is trying to use to make the outbound connection. We will
invoke it to build the client Protocol
server_factory: a factory to build the
server_factory (interfaces.IProtocolFactory): a factory to build the
server-side protocol
ssl: If true, we will expect an ssl connection and wrap
ssl (bool): If true, we will expect an ssl connection and wrap
server_factory with a TLSMemoryBIOFactory
expected_sni: the expected SNI value
tls_sanlist: list of SAN entries for the TLS cert presented by the server.
Defaults to [b'DNS:test.com']
expected_sni (bytes|None): the expected SNI value
Returns:
the server Protocol returned by server_factory
IProtocol: the server Protocol returned by server_factory
"""
if ssl:
server_factory = _wrap_server_factory_for_tls(server_factory, tls_sanlist)
server_factory = _wrap_server_factory_for_tls(server_factory)
server_protocol = server_factory.buildProtocol(None)
@@ -276,28 +98,22 @@ class MatrixFederationAgentTests(TestCase):
self.assertEqual(
server_name,
expected_sni,
f"Expected SNI {expected_sni!s} but got {server_name!s}",
"Expected SNI %s but got %s" % (expected_sni, server_name),
)
return http_protocol
def _test_request_direct_connection(
self,
agent: ProxyAgent,
scheme: bytes,
hostname: bytes,
path: bytes,
):
def _test_request_direct_connection(self, agent, scheme, hostname, path):
"""Runs a test case for a direct connection not going through a proxy.
Args:
agent: the proxy agent being tested
agent (ProxyAgent): the proxy agent being tested
scheme: expected to be either "http" or "https"
scheme (bytes): expected to be either "http" or "https"
hostname: the hostname to connect to in the test
hostname (bytes): the hostname to connect to in the test
path: the path to connect to in the test
path (bytes): the path to connect to in the test
"""
is_https = scheme == b"https"
@@ -392,7 +208,7 @@ class MatrixFederationAgentTests(TestCase):
"""
Tests that requests can be made through a proxy.
"""
self._do_http_request_via_proxy(ssl=False, auth_credentials=None)
self._do_http_request_via_proxy(auth_credentials=None)
@patch.dict(
os.environ,
@@ -402,28 +218,12 @@ class MatrixFederationAgentTests(TestCase):
"""
Tests that authenticated requests can be made through a proxy.
"""
self._do_http_request_via_proxy(ssl=False, auth_credentials=b"bob:pinkponies")
@patch.dict(
os.environ, {"http_proxy": "https://proxy.com:8888", "no_proxy": "unused.com"}
)
def test_http_request_via_https_proxy(self):
self._do_http_request_via_proxy(ssl=True, auth_credentials=None)
@patch.dict(
os.environ,
{
"http_proxy": "https://bob:pinkponies@proxy.com:8888",
"no_proxy": "unused.com",
},
)
def test_http_request_via_https_proxy_with_auth(self):
self._do_http_request_via_proxy(ssl=True, auth_credentials=b"bob:pinkponies")
self._do_http_request_via_proxy(auth_credentials="bob:pinkponies")
@patch.dict(os.environ, {"https_proxy": "proxy.com", "no_proxy": "unused.com"})
def test_https_request_via_proxy(self):
"""Tests that TLS-encrypted requests can be made through a proxy"""
self._do_https_request_via_proxy(ssl=False, auth_credentials=None)
self._do_https_request_via_proxy(auth_credentials=None)
@patch.dict(
os.environ,
@@ -431,40 +231,16 @@ class MatrixFederationAgentTests(TestCase):
)
def test_https_request_via_proxy_with_auth(self):
"""Tests that authenticated, TLS-encrypted requests can be made through a proxy"""
self._do_https_request_via_proxy(ssl=False, auth_credentials=b"bob:pinkponies")
@patch.dict(
os.environ, {"https_proxy": "https://proxy.com", "no_proxy": "unused.com"}
)
def test_https_request_via_https_proxy(self):
"""Tests that TLS-encrypted requests can be made through a proxy"""
self._do_https_request_via_proxy(ssl=True, auth_credentials=None)
@patch.dict(
os.environ,
{"https_proxy": "https://bob:pinkponies@proxy.com", "no_proxy": "unused.com"},
)
def test_https_request_via_https_proxy_with_auth(self):
"""Tests that authenticated, TLS-encrypted requests can be made through a proxy"""
self._do_https_request_via_proxy(ssl=True, auth_credentials=b"bob:pinkponies")
self._do_https_request_via_proxy(auth_credentials="bob:pinkponies")
def _do_http_request_via_proxy(
self,
ssl: bool = False,
auth_credentials: Optional[bytes] = None,
auth_credentials: Optional[str] = None,
):
"""Send a http request via an agent and check that it is correctly received at
the proxy. The proxy can use either http or https.
Args:
ssl: True if we expect the request to connect via https to proxy
auth_credentials: credentials to authenticate at proxy
"""
if ssl:
agent = ProxyAgent(
self.reactor, use_proxy=True, contextFactory=get_test_https_policy()
)
else:
agent = ProxyAgent(self.reactor, use_proxy=True)
Tests that requests can be made through a proxy.
"""
agent = ProxyAgent(self.reactor, use_proxy=True)
self.reactor.lookups["proxy.com"] = "1.2.3.5"
d = agent.request(b"GET", b"http://test.com")
@@ -478,11 +254,7 @@ class MatrixFederationAgentTests(TestCase):
# make a test server, and wire up the client
http_server = self._make_connection(
client_factory,
_get_test_protocol_factory(),
ssl=ssl,
tls_sanlist=[b"DNS:proxy.com"] if ssl else None,
expected_sni=b"proxy.com" if ssl else None,
client_factory, _get_test_protocol_factory()
)
# the FakeTransport is async, so we need to pump the reactor
@@ -500,7 +272,7 @@ class MatrixFederationAgentTests(TestCase):
if auth_credentials is not None:
# Compute the correct header value for Proxy-Authorization
encoded_credentials = base64.b64encode(auth_credentials)
encoded_credentials = base64.b64encode(b"bob:pinkponies")
expected_header_value = b"Basic " + encoded_credentials
# Validate the header's value
@@ -523,15 +295,8 @@ class MatrixFederationAgentTests(TestCase):
def _do_https_request_via_proxy(
self,
ssl: bool = False,
auth_credentials: Optional[bytes] = None,
auth_credentials: Optional[str] = None,
):
"""Send a https request via an agent and check that it is correctly received at
the proxy and client. The proxy can use either http or https.
Args:
ssl: True if we expect the request to connect via https to proxy
auth_credentials: credentials to authenticate at proxy
"""
agent = ProxyAgent(
self.reactor,
contextFactory=get_test_https_policy(),
@@ -548,15 +313,18 @@ class MatrixFederationAgentTests(TestCase):
self.assertEqual(host, "1.2.3.5")
self.assertEqual(port, 1080)
# make a test server to act as the proxy, and wire up the client
# make a test HTTP server, and wire up the client
proxy_server = self._make_connection(
client_factory,
_get_test_protocol_factory(),
ssl=ssl,
tls_sanlist=[b"DNS:proxy.com"] if ssl else None,
expected_sni=b"proxy.com" if ssl else None,
client_factory, _get_test_protocol_factory()
)
assert isinstance(proxy_server, HTTPChannel)
# fish the transports back out so that we can do the old switcheroo
s2c_transport = proxy_server.transport
client_protocol = s2c_transport.other
c2s_transport = client_protocol.transport
# the FakeTransport is async, so we need to pump the reactor
self.reactor.advance(0)
# now there should be a pending CONNECT request
self.assertEqual(len(proxy_server.requests), 1)
@@ -572,7 +340,7 @@ class MatrixFederationAgentTests(TestCase):
if auth_credentials is not None:
# Compute the correct header value for Proxy-Authorization
encoded_credentials = base64.b64encode(auth_credentials)
encoded_credentials = base64.b64encode(b"bob:pinkponies")
expected_header_value = b"Basic " + encoded_credentials
# Validate the header's value
@@ -584,49 +352,31 @@ class MatrixFederationAgentTests(TestCase):
# tell the proxy server not to close the connection
proxy_server.persistent = True
# this just stops the http Request trying to do a chunked response
# request.setHeader(b"Content-Length", b"0")
request.finish()
# now we make another test server to act as the upstream HTTP server.
server_ssl_protocol = _wrap_server_factory_for_tls(
_get_test_protocol_factory()
).buildProtocol(None)
# now we can replace the proxy channel with a new, SSL-wrapped HTTP channel
ssl_factory = _wrap_server_factory_for_tls(_get_test_protocol_factory())
ssl_protocol = ssl_factory.buildProtocol(None)
http_server = ssl_protocol.wrappedProtocol
# Tell the HTTP server to send outgoing traffic back via the proxy's transport.
proxy_server_transport = proxy_server.transport
server_ssl_protocol.makeConnection(proxy_server_transport)
# ... and replace the protocol on the proxy's transport with the
# TLSMemoryBIOProtocol for the test server, so that incoming traffic
# to the proxy gets sent over to the HTTP(s) server.
#
# This needs a bit of gut-wrenching, which is different depending on whether
# the proxy is using TLS or not.
#
# (an alternative, possibly more elegant, approach would be to use a custom
# Protocol to implement the proxy, which starts out by forwarding to an
# HTTPChannel (to implement the CONNECT command) and can then be switched
# into a mode where it forwards its traffic to another Protocol.)
if ssl:
assert isinstance(proxy_server_transport, TLSMemoryBIOProtocol)
proxy_server_transport.wrappedProtocol = server_ssl_protocol
else:
assert isinstance(proxy_server_transport, FakeTransport)
client_protocol = proxy_server_transport.other
c2s_transport = client_protocol.transport
c2s_transport.other = server_ssl_protocol
ssl_protocol.makeConnection(
FakeTransport(client_protocol, self.reactor, ssl_protocol)
)
c2s_transport.other = ssl_protocol
self.reactor.advance(0)
server_name = server_ssl_protocol._tlsConnection.get_servername()
server_name = ssl_protocol._tlsConnection.get_servername()
expected_sni = b"test.com"
self.assertEqual(
server_name,
expected_sni,
f"Expected SNI {expected_sni!s} but got {server_name!s}",
"Expected SNI %s but got %s" % (expected_sni, server_name),
)
# now there should be a pending request
http_server = server_ssl_protocol.wrappedProtocol
self.assertEqual(len(http_server.requests), 1)
request = http_server.requests[0]
@@ -760,7 +510,7 @@ class MatrixFederationAgentTests(TestCase):
self.assertEqual(
server_name,
expected_sni,
f"Expected SNI {expected_sni!s} but got {server_name!s}",
"Expected SNI %s but got %s" % (expected_sni, server_name),
)
# now there should be a pending request
@@ -779,48 +529,16 @@ class MatrixFederationAgentTests(TestCase):
body = self.successResultOf(treq.content(resp))
self.assertEqual(body, b"result")
@patch.dict(os.environ, {"http_proxy": "proxy.com:8888"})
def test_proxy_with_no_scheme(self):
http_proxy_agent = ProxyAgent(self.reactor, use_proxy=True)
self.assertIsInstance(http_proxy_agent.http_proxy_endpoint, HostnameEndpoint)
self.assertEqual(http_proxy_agent.http_proxy_endpoint._hostStr, "proxy.com")
self.assertEqual(http_proxy_agent.http_proxy_endpoint._port, 8888)
@patch.dict(os.environ, {"http_proxy": "socks://proxy.com:8888"})
def test_proxy_with_unsupported_scheme(self):
with self.assertRaises(ValueError):
ProxyAgent(self.reactor, use_proxy=True)
@patch.dict(os.environ, {"http_proxy": "http://proxy.com:8888"})
def test_proxy_with_http_scheme(self):
http_proxy_agent = ProxyAgent(self.reactor, use_proxy=True)
self.assertIsInstance(http_proxy_agent.http_proxy_endpoint, HostnameEndpoint)
self.assertEqual(http_proxy_agent.http_proxy_endpoint._hostStr, "proxy.com")
self.assertEqual(http_proxy_agent.http_proxy_endpoint._port, 8888)
@patch.dict(os.environ, {"http_proxy": "https://proxy.com:8888"})
def test_proxy_with_https_scheme(self):
https_proxy_agent = ProxyAgent(self.reactor, use_proxy=True)
self.assertIsInstance(https_proxy_agent.http_proxy_endpoint, _WrapperEndpoint)
self.assertEqual(
https_proxy_agent.http_proxy_endpoint._wrappedEndpoint._hostStr, "proxy.com"
)
self.assertEqual(
https_proxy_agent.http_proxy_endpoint._wrappedEndpoint._port, 8888
)
def _wrap_server_factory_for_tls(
factory: IProtocolFactory, sanlist: Iterable[bytes] = None
) -> IProtocolFactory:
def _wrap_server_factory_for_tls(factory, sanlist=None):
"""Wrap an existing Protocol Factory with a test TLSMemoryBIOFactory
The resultant factory will create a TLS server which presents a certificate
signed by our test CA, valid for the domains in `sanlist`
Args:
factory: protocol factory to wrap
sanlist: list of domains the cert should be valid for
factory (interfaces.IProtocolFactory): protocol factory to wrap
sanlist (iterable[bytes]): list of domains the cert should be valid for
Returns:
interfaces.IProtocolFactory
@@ -834,7 +552,7 @@ def _wrap_server_factory_for_tls(
)
def _get_test_protocol_factory() -> IProtocolFactory:
def _get_test_protocol_factory():
"""Get a protocol Factory which will build an HTTPChannel
Returns:
@@ -848,6 +566,6 @@ def _get_test_protocol_factory() -> IProtocolFactory:
return server_factory
def _log_request(request: str):
def _log_request(request):
"""Implements Factory.log, which is expected by Request.finish"""
logger.info(f"Completed request {request}")
logger.info("Completed request %s", request)
+2 -107
View File
@@ -15,14 +15,9 @@
import json
import synapse.rest.admin
from synapse.api.constants import (
EventContentFields,
EventTypes,
ReadReceiptEventFields,
RelationTypes,
)
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.rest.client.v1 import login, room
from synapse.rest.client.v2_alpha import knock, read_marker, receipts, sync
from synapse.rest.client.v2_alpha import knock, read_marker, sync
from tests import unittest
from tests.federation.transport.test_knocking import (
@@ -373,88 +368,6 @@ class SyncKnockTestCase(
)
class ReadReceiptsTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
receipts.register_servlets,
room.register_servlets,
sync.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.url = "/sync?since=%s"
self.next_batch = "s0"
# Register the first user
self.user_id = self.register_user("kermit", "monkey")
self.tok = self.login("kermit", "monkey")
# Create the room
self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
# Register the second user
self.user2 = self.register_user("kermit2", "monkey")
self.tok2 = self.login("kermit2", "monkey")
# Join the second user
self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2)
@override_config({"experimental_features": {"msc2285_enabled": True}})
def test_hidden_read_receipts(self):
# Send a message as the first user
res = self.helper.send(self.room_id, body="hello", tok=self.tok)
# Send a read receipt to tell the server the first user's message was read
body = json.dumps({ReadReceiptEventFields.MSC2285_HIDDEN: True}).encode("utf8")
channel = self.make_request(
"POST",
"/rooms/%s/receipt/m.read/%s" % (self.room_id, res["event_id"]),
body,
access_token=self.tok2,
)
self.assertEqual(channel.code, 200)
# Test that the first user can't see the other user's hidden read receipt
self.assertEqual(self._get_read_receipt(), None)
def test_read_receipt_with_empty_body(self):
# Send a message as the first user
res = self.helper.send(self.room_id, body="hello", tok=self.tok)
# Send a read receipt for this message with an empty body
channel = self.make_request(
"POST",
"/rooms/%s/receipt/m.read/%s" % (self.room_id, res["event_id"]),
access_token=self.tok2,
)
self.assertEqual(channel.code, 200)
def _get_read_receipt(self):
"""Syncs and returns the read receipt."""
# Checks if event is a read receipt
def is_read_receipt(event):
return event["type"] == "m.receipt"
# Sync
channel = self.make_request(
"GET",
self.url % self.next_batch,
access_token=self.tok,
)
self.assertEqual(channel.code, 200)
# Store the next batch for the next request.
self.next_batch = channel.json_body["next_batch"]
# Return the read receipt
ephemeral_events = channel.json_body["rooms"]["join"][self.room_id][
"ephemeral"
]["events"]
return next(filter(is_read_receipt, ephemeral_events), None)
class UnreadMessagesTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
@@ -462,7 +375,6 @@ class UnreadMessagesTestCase(unittest.HomeserverTestCase):
read_marker.register_servlets,
room.register_servlets,
sync.register_servlets,
receipts.register_servlets,
]
def prepare(self, reactor, clock, hs):
@@ -536,23 +448,6 @@ class UnreadMessagesTestCase(unittest.HomeserverTestCase):
# Check that the unread counter is back to 0.
self._check_unread_count(0)
# Check that hidden read receipts don't break unread counts
res = self.helper.send(self.room_id, "hello", tok=self.tok2)
self._check_unread_count(1)
# Send a read receipt to tell the server we've read the latest event.
body = json.dumps({ReadReceiptEventFields.MSC2285_HIDDEN: True}).encode("utf8")
channel = self.make_request(
"POST",
"/rooms/%s/receipt/m.read/%s" % (self.room_id, res["event_id"]),
body,
access_token=self.tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Check that the unread counter is back to 0.
self._check_unread_count(0)
# Check that room name changes increase the unread counter.
self.helper.send_state(
self.room_id,
-57
View File
@@ -15,9 +15,7 @@
import attr
from parameterized import parameterized
from synapse.api.room_versions import RoomVersions
from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder
import tests.unittest
import tests.utils
@@ -506,61 +504,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
)
self.assertSetEqual(difference, set())
def test_prune_inbound_federation_queue(self):
"Test that pruning of inbound federation queues work"
room_id = "some_room_id"
# Insert a bunch of events that all reference the previous one.
self.get_success(
self.store.db_pool.simple_insert_many(
table="federation_inbound_events_staging",
values=[
{
"origin": "some_origin",
"room_id": room_id,
"received_ts": 0,
"event_id": f"$fake_event_id_{i + 1}",
"event_json": json_encoder.encode(
{"prev_events": [f"$fake_event_id_{i}"]}
),
"internal_metadata": "{}",
}
for i in range(500)
],
desc="test_prune_inbound_federation_queue",
)
)
# Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't.
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertTrue(pruned)
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertFalse(pruned)
# Assert that we only have a single event left in the queue, and that it
# is the last one.
count = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="test_prune_inbound_federation_queue",
)
)
self.assertEqual(count, 1)
_, event_id = self.get_success(
self.store.get_next_staged_event_id_for_room(room_id)
)
self.assertEqual(event_id, "$fake_event_id_500")
@attr.s
class FakeEvent:
-36
View File
@@ -1,36 +0,0 @@
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
class SQLTransactionLimitTestCase(unittest.HomeserverTestCase):
"""Test SQL transaction limit doesn't break transactions."""
def make_homeserver(self, reactor, clock):
return self.setup_test_homeserver(db_txn_limit=1000)
def test_config(self):
db_config = self.hs.config.get_single_database()
self.assertEqual(db_config.config["txn_limit"], 1000)
def test_select(self):
def do_select(txn):
txn.execute("SELECT 1")
db_pool = self.hs.get_datastores().databases[0]
# force txn limit to roll over at least once
for _ in range(0, 1001):
self.get_success_or_raise(db_pool.runInteraction("test_select", do_select))

Some files were not shown because too many files have changed in this diff Show More