Compare commits
16 Commits
quenting/m
...
rei/ci_deb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9f212ab44 | ||
|
|
85e3adba86 | ||
|
|
d3bdf8b091 | ||
|
|
d8ab5434d5 | ||
|
|
4333eff1d5 | ||
|
|
c9f04f3484 | ||
|
|
a387d6ecf8 | ||
|
|
9e473d9e38 | ||
|
|
d2ea7e32f5 | ||
|
|
2db0f1e49b | ||
|
|
a256423553 | ||
|
|
e91aa4fd2f | ||
|
|
499c1631de | ||
|
|
4ecd9aba95 | ||
|
|
4e947d05ab | ||
|
|
6514381b02 |
22
.github/workflows/tests.yml
vendored
22
.github/workflows/tests.yml
vendored
@@ -373,7 +373,7 @@ jobs:
|
||||
|
||||
calculate-test-jobs:
|
||||
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
|
||||
needs: linting-done
|
||||
# needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
@@ -393,6 +393,7 @@ jobs:
|
||||
- changes
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }}
|
||||
|
||||
@@ -426,7 +427,24 @@ jobs:
|
||||
if: ${{ matrix.job.postgres-version }}
|
||||
timeout-minutes: 2
|
||||
run: until pg_isready -h localhost; do sleep 1; done
|
||||
- run: poetry run trial --jobs=6 tests
|
||||
- run: |
|
||||
(
|
||||
while true; do
|
||||
echo "......."
|
||||
date
|
||||
df -h | grep root
|
||||
free -m
|
||||
sleep 10
|
||||
done
|
||||
) &
|
||||
MONITOR_PID=$!
|
||||
|
||||
poetry run trial --jobs=6 tests
|
||||
STATUS=$?
|
||||
|
||||
kill $MONITOR_PID
|
||||
exit $STATUS
|
||||
|
||||
env:
|
||||
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
|
||||
SYNAPSE_POSTGRES_HOST: /var/run/postgresql
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Fix `LaterGauge` metrics to collect from all servers.
|
||||
1
changelog.d/18762.feature
Normal file
1
changelog.d/18762.feature
Normal file
@@ -0,0 +1 @@
|
||||
Implement the push rules for experimental [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-doc/issues/4306).
|
||||
1
changelog.d/18787.misc
Normal file
1
changelog.d/18787.misc
Normal file
@@ -0,0 +1 @@
|
||||
CI debugging.
|
||||
@@ -1 +0,0 @@
|
||||
Allow serving media with a redirect to an unauthenticated, short-lived, signed URL.
|
||||
@@ -127,7 +127,6 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
"^/_synapse/admin/v1/quarantine_media/.*$",
|
||||
"^/_matrix/client/v1/media/.*$",
|
||||
"^/_matrix/federation/v1/media/.*$",
|
||||
"^/_synapse/media/",
|
||||
],
|
||||
# The first configured media worker will run the media background jobs
|
||||
"shared_extra_conf": {
|
||||
|
||||
@@ -2182,39 +2182,6 @@ media_upload_limits:
|
||||
max_size: 500M
|
||||
```
|
||||
---
|
||||
### `media_redirect`
|
||||
|
||||
*(object)* When enabled, Synapse will use HTTP redirect responses to serve media instead of directly serving the media from the media store. This can help with caching, but requires /_synapse/media/* to be routed to the media worker.
|
||||
|
||||
This setting has the following sub-options:
|
||||
|
||||
* `enabled` (boolean): Enables the media redirect feature. If enabled, you must specify a `media_redirect.secret` or `media_redirect.secret_path`. Defaults to `false`.
|
||||
|
||||
* `secret` (string|null): Secret used to sign media redirect URLs. This must be set if `media_redirect.enabled` is set. Defaults to `null`.
|
||||
|
||||
* `secret_path` (string|null): An alternative to `media_redirect.secret` that specifies a file containing the secret. Defaults to `null`.
|
||||
|
||||
* `ttl` (duration): How long the redirect URLs should be valid for. Defaults to `"10m"`.
|
||||
|
||||
Default configuration:
|
||||
```yaml
|
||||
media_redirect:
|
||||
enabled: false
|
||||
```
|
||||
|
||||
Example configurations:
|
||||
```yaml
|
||||
media_redirect:
|
||||
enabled: true
|
||||
secret: aiCh9gu4Zahvueveisooquu7chaiw9Ee
|
||||
```
|
||||
|
||||
```yaml
|
||||
media_redirect:
|
||||
enabled: true
|
||||
secret_path: /path/to/secrets/file
|
||||
```
|
||||
---
|
||||
### `max_image_pixels`
|
||||
|
||||
*(byte size)* Maximum number of pixels that will be thumbnailed. Defaults to `"32M"`.
|
||||
|
||||
@@ -765,7 +765,6 @@ Handles the media repository. It can handle all endpoints starting with:
|
||||
/_matrix/media/
|
||||
/_matrix/client/v1/media/
|
||||
/_matrix/federation/v1/media/
|
||||
/_synapse/media/
|
||||
|
||||
... and the following regular expressions matching media-specific administration APIs:
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ fn bench_match_exact(b: &mut Bencher) {
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -71,10 +72,10 @@ fn bench_match_exact(b: &mut Bencher) {
|
||||
},
|
||||
));
|
||||
|
||||
let matched = eval.match_condition(&condition, None, None).unwrap();
|
||||
let matched = eval.match_condition(&condition, None, None, None).unwrap();
|
||||
assert!(matched, "Didn't match");
|
||||
|
||||
b.iter(|| eval.match_condition(&condition, None, None).unwrap());
|
||||
b.iter(|| eval.match_condition(&condition, None, None, None).unwrap());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
@@ -107,6 +108,7 @@ fn bench_match_word(b: &mut Bencher) {
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -117,10 +119,10 @@ fn bench_match_word(b: &mut Bencher) {
|
||||
},
|
||||
));
|
||||
|
||||
let matched = eval.match_condition(&condition, None, None).unwrap();
|
||||
let matched = eval.match_condition(&condition, None, None, None).unwrap();
|
||||
assert!(matched, "Didn't match");
|
||||
|
||||
b.iter(|| eval.match_condition(&condition, None, None).unwrap());
|
||||
b.iter(|| eval.match_condition(&condition, None, None, None).unwrap());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
@@ -153,6 +155,7 @@ fn bench_match_word_miss(b: &mut Bencher) {
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -163,10 +166,10 @@ fn bench_match_word_miss(b: &mut Bencher) {
|
||||
},
|
||||
));
|
||||
|
||||
let matched = eval.match_condition(&condition, None, None).unwrap();
|
||||
let matched = eval.match_condition(&condition, None, None, None).unwrap();
|
||||
assert!(!matched, "Didn't match");
|
||||
|
||||
b.iter(|| eval.match_condition(&condition, None, None).unwrap());
|
||||
b.iter(|| eval.match_condition(&condition, None, None, None).unwrap());
|
||||
}
|
||||
|
||||
#[bench]
|
||||
@@ -199,6 +202,7 @@ fn bench_eval_message(b: &mut Bencher) {
|
||||
vec![],
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -210,7 +214,8 @@ fn bench_eval_message(b: &mut Bencher) {
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
);
|
||||
|
||||
b.iter(|| eval.run(&rules, Some("bob"), Some("person")));
|
||||
b.iter(|| eval.run(&rules, Some("bob"), Some("person"), None));
|
||||
}
|
||||
|
||||
@@ -290,6 +290,26 @@ pub const BASE_APPEND_CONTENT_RULES: &[PushRule] = &[PushRule {
|
||||
}];
|
||||
|
||||
pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
|
||||
PushRule {
|
||||
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.unsubscribed_thread"),
|
||||
priority_class: 1,
|
||||
conditions: Cow::Borrowed(&[Condition::Known(
|
||||
KnownCondition::Msc4306ThreadSubscription { subscribed: false },
|
||||
)]),
|
||||
actions: Cow::Borrowed(&[]),
|
||||
default: true,
|
||||
default_enabled: true,
|
||||
},
|
||||
PushRule {
|
||||
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.subscribed_thread"),
|
||||
priority_class: 1,
|
||||
conditions: Cow::Borrowed(&[Condition::Known(
|
||||
KnownCondition::Msc4306ThreadSubscription { subscribed: true },
|
||||
)]),
|
||||
actions: Cow::Borrowed(&[Action::Notify, SOUND_ACTION]),
|
||||
default: true,
|
||||
default_enabled: true,
|
||||
},
|
||||
PushRule {
|
||||
rule_id: Cow::Borrowed("global/underride/.m.rule.call"),
|
||||
priority_class: 1,
|
||||
|
||||
@@ -106,8 +106,11 @@ pub struct PushRuleEvaluator {
|
||||
/// flag as MSC1767 (extensible events core).
|
||||
msc3931_enabled: bool,
|
||||
|
||||
// If MSC4210 (remove legacy mentions) is enabled.
|
||||
/// If MSC4210 (remove legacy mentions) is enabled.
|
||||
msc4210_enabled: bool,
|
||||
|
||||
/// If MSC4306 (thread subscriptions) is enabled.
|
||||
msc4306_enabled: bool,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
@@ -126,6 +129,7 @@ impl PushRuleEvaluator {
|
||||
room_version_feature_flags,
|
||||
msc3931_enabled,
|
||||
msc4210_enabled,
|
||||
msc4306_enabled,
|
||||
))]
|
||||
pub fn py_new(
|
||||
flattened_keys: BTreeMap<String, JsonValue>,
|
||||
@@ -138,6 +142,7 @@ impl PushRuleEvaluator {
|
||||
room_version_feature_flags: Vec<String>,
|
||||
msc3931_enabled: bool,
|
||||
msc4210_enabled: bool,
|
||||
msc4306_enabled: bool,
|
||||
) -> Result<Self, Error> {
|
||||
let body = match flattened_keys.get("content.body") {
|
||||
Some(JsonValue::Value(SimpleJsonValue::Str(s))) => s.clone().into_owned(),
|
||||
@@ -156,6 +161,7 @@ impl PushRuleEvaluator {
|
||||
room_version_feature_flags,
|
||||
msc3931_enabled,
|
||||
msc4210_enabled,
|
||||
msc4306_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -167,12 +173,19 @@ impl PushRuleEvaluator {
|
||||
///
|
||||
/// Returns the set of actions, if any, that match (filtering out any
|
||||
/// `dont_notify` and `coalesce` actions).
|
||||
#[pyo3(signature = (push_rules, user_id=None, display_name=None))]
|
||||
///
|
||||
/// msc4306_thread_subscription_state: (Only populated if MSC4306 is enabled)
|
||||
/// The thread subscription state corresponding to the thread containing this event.
|
||||
/// - `None` if the event is not in a thread, or if MSC4306 is disabled.
|
||||
/// - `Some(true)` if the event is in a thread and the user has a subscription for that thread
|
||||
/// - `Some(false)` if the event is in a thread and the user does NOT have a subscription for that thread
|
||||
#[pyo3(signature = (push_rules, user_id=None, display_name=None, msc4306_thread_subscription_state=None))]
|
||||
pub fn run(
|
||||
&self,
|
||||
push_rules: &FilteredPushRules,
|
||||
user_id: Option<&str>,
|
||||
display_name: Option<&str>,
|
||||
msc4306_thread_subscription_state: Option<bool>,
|
||||
) -> Vec<Action> {
|
||||
'outer: for (push_rule, enabled) in push_rules.iter() {
|
||||
if !enabled {
|
||||
@@ -204,7 +217,12 @@ impl PushRuleEvaluator {
|
||||
Condition::Known(KnownCondition::RoomVersionSupports { feature: _ }),
|
||||
);
|
||||
|
||||
match self.match_condition(condition, user_id, display_name) {
|
||||
match self.match_condition(
|
||||
condition,
|
||||
user_id,
|
||||
display_name,
|
||||
msc4306_thread_subscription_state,
|
||||
) {
|
||||
Ok(true) => {}
|
||||
Ok(false) => continue 'outer,
|
||||
Err(err) => {
|
||||
@@ -237,14 +255,20 @@ impl PushRuleEvaluator {
|
||||
}
|
||||
|
||||
/// Check if the given condition matches.
|
||||
#[pyo3(signature = (condition, user_id=None, display_name=None))]
|
||||
#[pyo3(signature = (condition, user_id=None, display_name=None, msc4306_thread_subscription_state=None))]
|
||||
fn matches(
|
||||
&self,
|
||||
condition: Condition,
|
||||
user_id: Option<&str>,
|
||||
display_name: Option<&str>,
|
||||
msc4306_thread_subscription_state: Option<bool>,
|
||||
) -> bool {
|
||||
match self.match_condition(&condition, user_id, display_name) {
|
||||
match self.match_condition(
|
||||
&condition,
|
||||
user_id,
|
||||
display_name,
|
||||
msc4306_thread_subscription_state,
|
||||
) {
|
||||
Ok(true) => true,
|
||||
Ok(false) => false,
|
||||
Err(err) => {
|
||||
@@ -262,6 +286,7 @@ impl PushRuleEvaluator {
|
||||
condition: &Condition,
|
||||
user_id: Option<&str>,
|
||||
display_name: Option<&str>,
|
||||
msc4306_thread_subscription_state: Option<bool>,
|
||||
) -> Result<bool, Error> {
|
||||
let known_condition = match condition {
|
||||
Condition::Known(known) => known,
|
||||
@@ -393,6 +418,13 @@ impl PushRuleEvaluator {
|
||||
&& self.room_version_feature_flags.contains(&flag)
|
||||
}
|
||||
}
|
||||
KnownCondition::Msc4306ThreadSubscription { subscribed } => {
|
||||
if !self.msc4306_enabled {
|
||||
false
|
||||
} else {
|
||||
msc4306_thread_subscription_state == Some(*subscribed)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
@@ -536,10 +568,11 @@ fn push_rule_evaluator() {
|
||||
vec![],
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result = evaluator.run(&FilteredPushRules::default(), None, Some("bob"));
|
||||
let result = evaluator.run(&FilteredPushRules::default(), None, Some("bob"), None);
|
||||
assert_eq!(result.len(), 3);
|
||||
}
|
||||
|
||||
@@ -566,6 +599,7 @@ fn test_requires_room_version_supports_condition() {
|
||||
flags,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -575,6 +609,7 @@ fn test_requires_room_version_supports_condition() {
|
||||
&FilteredPushRules::default(),
|
||||
Some("@bob:example.org"),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
assert_eq!(result.len(), 3);
|
||||
|
||||
@@ -593,7 +628,17 @@ fn test_requires_room_version_supports_condition() {
|
||||
};
|
||||
let rules = PushRules::new(vec![custom_rule]);
|
||||
result = evaluator.run(
|
||||
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false, false),
|
||||
&FilteredPushRules::py_new(
|
||||
rules,
|
||||
BTreeMap::new(),
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
@@ -369,6 +369,10 @@ pub enum KnownCondition {
|
||||
RoomVersionSupports {
|
||||
feature: Cow<'static, str>,
|
||||
},
|
||||
#[serde(rename = "io.element.msc4306.thread_subscription")]
|
||||
Msc4306ThreadSubscription {
|
||||
subscribed: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl<'source> IntoPyObject<'source> for Condition {
|
||||
@@ -547,11 +551,13 @@ pub struct FilteredPushRules {
|
||||
msc3664_enabled: bool,
|
||||
msc4028_push_encrypted_events: bool,
|
||||
msc4210_enabled: bool,
|
||||
msc4306_enabled: bool,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl FilteredPushRules {
|
||||
#[new]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn py_new(
|
||||
push_rules: PushRules,
|
||||
enabled_map: BTreeMap<String, bool>,
|
||||
@@ -560,6 +566,7 @@ impl FilteredPushRules {
|
||||
msc3664_enabled: bool,
|
||||
msc4028_push_encrypted_events: bool,
|
||||
msc4210_enabled: bool,
|
||||
msc4306_enabled: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
push_rules,
|
||||
@@ -569,6 +576,7 @@ impl FilteredPushRules {
|
||||
msc3664_enabled,
|
||||
msc4028_push_encrypted_events,
|
||||
msc4210_enabled,
|
||||
msc4306_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -619,6 +627,10 @@ impl FilteredPushRules {
|
||||
return false;
|
||||
}
|
||||
|
||||
if !self.msc4306_enabled && rule.rule_id.contains("/.io.element.msc4306.rule.") {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
})
|
||||
.map(|r| {
|
||||
|
||||
@@ -2433,44 +2433,6 @@ properties:
|
||||
max_size: 100M
|
||||
- time_period: 1w
|
||||
max_size: 500M
|
||||
media_redirect:
|
||||
type: object
|
||||
description: >-
|
||||
When enabled, Synapse will use HTTP redirect responses to serve media
|
||||
instead of directly serving the media from the media store. This can help
|
||||
with caching, but requires /_synapse/media/* to be routed to the media
|
||||
worker.
|
||||
properties:
|
||||
enabled:
|
||||
type: boolean
|
||||
description: >-
|
||||
Enables the media redirect feature. If enabled, you must specify a
|
||||
`media_redirect.secret` or `media_redirect.secret_path`.
|
||||
default: false
|
||||
secret:
|
||||
type: ["string", "null"]
|
||||
description: >-
|
||||
Secret used to sign media redirect URLs. This must be set if
|
||||
`media_redirect.enabled` is set.
|
||||
default: null
|
||||
secret_path:
|
||||
type: ["string", "null"]
|
||||
description: >-
|
||||
An alternative to `media_redirect.secret` that specifies a file
|
||||
containing the secret.
|
||||
default: null
|
||||
ttl:
|
||||
$ref: "#/$defs/duration"
|
||||
description: >-
|
||||
How long the redirect URLs should be valid for.
|
||||
default: 10m
|
||||
default:
|
||||
enabled: false
|
||||
examples:
|
||||
- enabled: true
|
||||
secret: aiCh9gu4Zahvueveisooquu7chaiw9Ee
|
||||
- enabled: true
|
||||
secret_path: /path/to/secrets/file
|
||||
max_image_pixels:
|
||||
$ref: "#/$defs/bytes"
|
||||
description: Maximum number of pixels that will be thumbnailed.
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
import attr
|
||||
|
||||
@@ -30,7 +30,7 @@ from synapse.types import JsonDict
|
||||
from synapse.util.check_dependencies import check_requirements
|
||||
from synapse.util.module_loader import load_module
|
||||
|
||||
from ._base import Config, ConfigError, read_file
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -130,9 +130,7 @@ class MediaUploadLimit:
|
||||
class ContentRepositoryConfig(Config):
|
||||
section = "media"
|
||||
|
||||
def read_config(
|
||||
self, config: JsonDict, allow_secrets_in_config: bool, **kwargs: Any
|
||||
) -> None:
|
||||
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
||||
# Only enable the media repo if either the media repo is enabled or the
|
||||
# current worker app is the media repo.
|
||||
if (
|
||||
@@ -292,55 +290,6 @@ class ContentRepositoryConfig(Config):
|
||||
|
||||
self.enable_authenticated_media = config.get("enable_authenticated_media", True)
|
||||
|
||||
redirect_config = config.get("media_redirect", {})
|
||||
if redirect_config is None:
|
||||
redirect_config = {}
|
||||
if not isinstance(redirect_config, dict):
|
||||
raise ConfigError(
|
||||
"`media_redirect` must be a dictionary",
|
||||
("media_redirect",),
|
||||
)
|
||||
|
||||
# Whether we should use a redirect to /_synapse/media/* when serving
|
||||
# media for better caching. This requires this endpoint to be routed to
|
||||
# the media worker.
|
||||
self.use_redirect = redirect_config.get("enabled", False)
|
||||
|
||||
redirect_secret = redirect_config.get("secret")
|
||||
if redirect_secret and not allow_secrets_in_config:
|
||||
raise ConfigError(
|
||||
"Config options that expect an in-line secret as value are disabled",
|
||||
("media_redirect", "secret"),
|
||||
)
|
||||
if redirect_secret is not None and not isinstance(redirect_secret, str):
|
||||
raise ConfigError(
|
||||
"`media_redirect.secret` must be a string.",
|
||||
("media_redirect", "secret"),
|
||||
)
|
||||
|
||||
redirect_secret_path = redirect_config.get("secret_path")
|
||||
if redirect_secret_path:
|
||||
if redirect_secret:
|
||||
raise ConfigError(
|
||||
"You have configured both `media_redirect.secret` and `media_redirect.secret_path`.\n"
|
||||
"These are mutually incompatible.",
|
||||
("media_redirect", "secret_path"),
|
||||
)
|
||||
redirect_secret = read_file(
|
||||
redirect_secret_path, ("media_redirect", "secret_path")
|
||||
).strip()
|
||||
|
||||
self.redirect_secret: Optional[bytes] = (
|
||||
redirect_secret.encode("utf-8") if redirect_secret else None
|
||||
)
|
||||
|
||||
if self.use_redirect and self.redirect_secret is None:
|
||||
raise ConfigError(
|
||||
"You have configured `media_redirect.enabled` but not set `media_redirect.secret` or `media_redirect.secret_path`."
|
||||
)
|
||||
|
||||
self.redirect_ttl_ms = self.parse_duration(redirect_config.get("ttl", "10m"))
|
||||
|
||||
self.media_upload_limits: List[MediaUploadLimit] = []
|
||||
for limit_config in config.get("media_upload_limits", []):
|
||||
time_period_ms = self.parse_duration(limit_config["time_period"])
|
||||
|
||||
@@ -37,7 +37,6 @@ Events are replicated via a separate events stream.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
@@ -68,25 +67,6 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QueueNames(str, Enum):
|
||||
PRESENCE_MAP = "presence_map"
|
||||
KEYED_EDU = "keyed_edu"
|
||||
KEYED_EDU_CHANGED = "keyed_edu_changed"
|
||||
EDUS = "edus"
|
||||
POS_TIME = "pos_time"
|
||||
PRESENCE_DESTINATIONS = "presence_destinations"
|
||||
|
||||
|
||||
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
|
||||
|
||||
for queue_name in QueueNames:
|
||||
queue_name_to_gauge_map[queue_name] = LaterGauge(
|
||||
name=f"synapse_federation_send_queue_{queue_name.value}_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"""A drop in replacement for FederationSender"""
|
||||
|
||||
@@ -131,15 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
# lambda binds to the queue rather than to the name of the queue which
|
||||
# changes. ARGH.
|
||||
def register(queue_name: QueueNames, queue: Sized) -> None:
|
||||
queue_name_to_gauge_map[queue_name].register_hook(
|
||||
lambda: {(self.server_name,): len(queue)}
|
||||
def register(name: str, queue: Sized) -> None:
|
||||
LaterGauge(
|
||||
name="synapse_federation_send_queue_%s_size" % (queue_name,),
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(queue)},
|
||||
)
|
||||
|
||||
for queue_name in QueueNames:
|
||||
queue = getattr(self, queue_name.value)
|
||||
assert isinstance(queue, Sized)
|
||||
register(queue_name, queue=queue)
|
||||
for queue_name in [
|
||||
"presence_map",
|
||||
"keyed_edu",
|
||||
"keyed_edu_changed",
|
||||
"edus",
|
||||
"pos_time",
|
||||
"presence_destinations",
|
||||
]:
|
||||
register(queue_name, getattr(self, queue_name))
|
||||
|
||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||
|
||||
|
||||
@@ -199,24 +199,6 @@ sent_pdus_destination_dist_total = Counter(
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_destinations_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_destinations",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_pdus_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_pdus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_edus_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_edus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# Time (in s) to wait before trying to wake up destinations that have
|
||||
# catch-up outstanding.
|
||||
# Please note that rate limiting still applies, so while the loop is
|
||||
@@ -416,28 +398,38 @@ class FederationSender(AbstractFederationSender):
|
||||
# map from destination to PerDestinationQueue
|
||||
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
|
||||
|
||||
transaction_queue_pending_destinations_gauge.register_hook(
|
||||
lambda: {
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_destinations",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
(self.server_name,): sum(
|
||||
1
|
||||
for d in self._per_destination_queues.values()
|
||||
if d.transmission_loop_running
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
transaction_queue_pending_pdus_gauge.register_hook(
|
||||
lambda: {
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_pdus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
(self.server_name,): sum(
|
||||
d.pending_pdu_count() for d in self._per_destination_queues.values()
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
transaction_queue_pending_edus_gauge.register_hook(
|
||||
lambda: {
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_edus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
(self.server_name,): sum(
|
||||
d.pending_edu_count() for d in self._per_destination_queues.values()
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
self._is_processing = False
|
||||
|
||||
@@ -826,12 +826,7 @@ class FederationMediaDownloadServlet(BaseFederationServerServlet):
|
||||
)
|
||||
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
|
||||
await self.media_repo.get_local_media(
|
||||
request,
|
||||
media_id,
|
||||
None,
|
||||
max_timeout_ms,
|
||||
federation=True,
|
||||
may_redirect=True,
|
||||
request, media_id, None, max_timeout_ms, federation=True
|
||||
)
|
||||
|
||||
|
||||
@@ -878,27 +873,11 @@ class FederationMediaThumbnailServlet(BaseFederationServerServlet):
|
||||
|
||||
if self.dynamic_thumbnails:
|
||||
await self.thumbnail_provider.select_or_generate_local_thumbnail(
|
||||
request,
|
||||
media_id,
|
||||
width,
|
||||
height,
|
||||
method,
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
for_federation=True,
|
||||
may_redirect=True,
|
||||
request, media_id, width, height, method, m_type, max_timeout_ms, True
|
||||
)
|
||||
else:
|
||||
await self.thumbnail_provider.respond_local_thumbnail(
|
||||
request,
|
||||
media_id,
|
||||
width,
|
||||
height,
|
||||
method,
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
for_federation=True,
|
||||
may_redirect=True,
|
||||
request, media_id, width, height, method, m_type, max_timeout_ms, True
|
||||
)
|
||||
self.media_repo.mark_recently_accessed(None, media_id)
|
||||
|
||||
|
||||
@@ -173,18 +173,6 @@ state_transition_counter = Counter(
|
||||
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
presence_user_to_current_state_size_gauge = LaterGauge(
|
||||
name="synapse_handlers_presence_user_to_current_state_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
presence_wheel_timer_size_gauge = LaterGauge(
|
||||
name="synapse_handlers_presence_wheel_timer_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
# "currently_active"
|
||||
LAST_ACTIVE_GRANULARITY = 60 * 1000
|
||||
@@ -791,8 +779,11 @@ class PresenceHandler(BasePresenceHandler):
|
||||
EduTypes.PRESENCE, self.incoming_presence
|
||||
)
|
||||
|
||||
presence_user_to_current_state_size_gauge.register_hook(
|
||||
lambda: {(self.server_name,): len(self.user_to_current_state)}
|
||||
LaterGauge(
|
||||
name="synapse_handlers_presence_user_to_current_state_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
|
||||
)
|
||||
|
||||
# The per-device presence state, maps user to devices to per-device presence state.
|
||||
@@ -891,8 +882,11 @@ class PresenceHandler(BasePresenceHandler):
|
||||
60 * 1000,
|
||||
)
|
||||
|
||||
presence_wheel_timer_size_gauge.register_hook(
|
||||
lambda: {(self.server_name,): len(self.wheel_timer)}
|
||||
LaterGauge(
|
||||
name="synapse_handlers_presence_wheel_timer_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
|
||||
)
|
||||
|
||||
# Used to handle sending of presence to newly joined users/servers
|
||||
|
||||
@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
|
||||
return counts
|
||||
|
||||
|
||||
in_flight_requests = LaterGauge(
|
||||
LaterGauge(
|
||||
name="synapse_http_server_in_flight_requests_count",
|
||||
desc="",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
caller=_get_in_flight_counts,
|
||||
)
|
||||
in_flight_requests.register_hook(_get_in_flight_counts)
|
||||
|
||||
|
||||
class RequestMetrics:
|
||||
|
||||
@@ -972,25 +972,6 @@ def set_corp_headers(request: Request) -> None:
|
||||
request.setHeader(b"Cross-Origin-Resource-Policy", b"cross-origin")
|
||||
|
||||
|
||||
def set_headers_for_media_response(request: "SynapseRequest") -> None:
|
||||
"""Set the appropriate headers when serving media responses to clients"""
|
||||
set_cors_headers(request)
|
||||
set_corp_headers(request)
|
||||
request.setHeader(
|
||||
b"Content-Security-Policy",
|
||||
b"sandbox;"
|
||||
b" default-src 'none';"
|
||||
b" script-src 'none';"
|
||||
b" plugin-types application/pdf;"
|
||||
b" style-src 'unsafe-inline';"
|
||||
b" media-src 'self';"
|
||||
b" object-src 'self';",
|
||||
)
|
||||
# Limited non-standard form of CSP for IE11
|
||||
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
|
||||
request.setHeader(b"Referrer-Policy", b"no-referrer")
|
||||
|
||||
|
||||
def respond_with_html(request: Request, code: int, html: str) -> None:
|
||||
"""
|
||||
Wraps `respond_with_html_bytes` by first encoding HTML from a str to UTF-8 bytes.
|
||||
|
||||
@@ -714,17 +714,6 @@ def parse_strings_from_args(
|
||||
return default
|
||||
|
||||
|
||||
@overload
|
||||
def parse_string_from_args(
|
||||
args: Mapping[bytes, Sequence[bytes]],
|
||||
name: str,
|
||||
default: str,
|
||||
*,
|
||||
allowed_values: Optional[StrCollection] = None,
|
||||
encoding: str = "ascii",
|
||||
) -> str: ...
|
||||
|
||||
|
||||
@overload
|
||||
def parse_string_from_args(
|
||||
args: Mapping[bytes, Sequence[bytes]],
|
||||
|
||||
@@ -36,7 +36,6 @@ from typing import (
|
||||
Tuple,
|
||||
Type,
|
||||
)
|
||||
from uuid import uuid4
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
@@ -64,8 +63,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CRLF = b"\r\n"
|
||||
|
||||
# list all text content types that will have the charset default to UTF-8 when
|
||||
# none is given
|
||||
TEXT_CONTENT_TYPES = [
|
||||
@@ -444,35 +441,6 @@ async def respond_with_responder(
|
||||
finish_request(request)
|
||||
|
||||
|
||||
def respond_with_multipart_location(
|
||||
request: SynapseRequest,
|
||||
location: bytes,
|
||||
) -> None:
|
||||
"""Responds to a media request with a multipart/mixed response, with the
|
||||
(empty) media metadata as a JSON object in the first part, and a `Location`
|
||||
header as the second part
|
||||
|
||||
Args:
|
||||
request: The incoming request.
|
||||
location: The URL to give in the `Location` header.
|
||||
"""
|
||||
boundary = uuid4().hex.encode("ascii") # Pick a random boundary
|
||||
request.setResponseCode(200)
|
||||
request.setHeader(
|
||||
b"Content-Type",
|
||||
b"multipart/mixed; boundary=" + boundary,
|
||||
)
|
||||
request.write(b"--" + boundary + CRLF)
|
||||
request.write(b"Content-Type: application/json" + CRLF + CRLF)
|
||||
# This is an empty JSON object for now, we don't have any
|
||||
# metadata associated with media yet
|
||||
request.write(b"{}")
|
||||
request.write(CRLF + b"--" + boundary + CRLF)
|
||||
request.write(b"Location: " + location + CRLF + CRLF)
|
||||
request.write(CRLF + b"--" + boundary + b"--" + CRLF)
|
||||
finish_request(request)
|
||||
|
||||
|
||||
def respond_with_304(request: SynapseRequest) -> None:
|
||||
request.setResponseCode(304)
|
||||
|
||||
|
||||
@@ -19,17 +19,12 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
import base64
|
||||
import errno
|
||||
import hashlib
|
||||
import hmac
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from http.client import TEMPORARY_REDIRECT
|
||||
from io import BytesIO
|
||||
from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import attr
|
||||
from matrix_common.types.mxc_uri import MXCUri
|
||||
@@ -49,7 +44,7 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config.repository import ThumbnailRequirement
|
||||
from synapse.http.server import respond_with_json, respond_with_redirect
|
||||
from synapse.http.server import respond_with_json
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.logging.opentracing import trace
|
||||
@@ -60,7 +55,6 @@ from synapse.media._base import (
|
||||
check_for_cached_entry_and_respond,
|
||||
get_filename_from_headers,
|
||||
respond_404,
|
||||
respond_with_multipart_location,
|
||||
respond_with_multipart_responder,
|
||||
respond_with_responder,
|
||||
)
|
||||
@@ -75,7 +69,7 @@ from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
|
||||
from synapse.media.url_previewer import UrlPreviewer
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia
|
||||
from synapse.types import StrSequence, UserID
|
||||
from synapse.types import UserID
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -130,8 +124,6 @@ class MediaRepository:
|
||||
cfg=hs.config.ratelimiting.remote_media_downloads,
|
||||
)
|
||||
|
||||
self._media_request_signature_secret = hs.config.media.redirect_secret
|
||||
|
||||
# List of StorageProviders where we should search for media and
|
||||
# potentially upload to.
|
||||
storage_providers = []
|
||||
@@ -474,7 +466,6 @@ class MediaRepository:
|
||||
max_timeout_ms: int,
|
||||
allow_authenticated: bool = True,
|
||||
federation: bool = False,
|
||||
may_redirect: bool = False,
|
||||
) -> None:
|
||||
"""Responds to requests for local media, if exists, or returns 404.
|
||||
|
||||
@@ -486,12 +477,8 @@ class MediaRepository:
|
||||
the filename in the Content-Disposition header of the response.
|
||||
max_timeout_ms: the maximum number of milliseconds to wait for the
|
||||
media to be uploaded.
|
||||
allow_authenticated: whether media marked as authenticated may be
|
||||
served to this request
|
||||
federation: whether the local media being fetched is for a
|
||||
federation request
|
||||
may_redirect: whether the request may issue a redirect instead of
|
||||
serving the media directly
|
||||
allow_authenticated: whether media marked as authenticated may be served to this request
|
||||
federation: whether the local media being fetched is for a federation request
|
||||
|
||||
Returns:
|
||||
Resolves once a response has successfully been written to request
|
||||
@@ -506,19 +493,6 @@ class MediaRepository:
|
||||
|
||||
self.mark_recently_accessed(None, media_id)
|
||||
|
||||
if self.hs.config.media.use_redirect and may_redirect:
|
||||
location = self.signed_location_for_media(media_id, name)
|
||||
|
||||
if federation:
|
||||
respond_with_multipart_location(request, location.encode("ascii"))
|
||||
|
||||
else:
|
||||
respond_with_redirect(
|
||||
request, location.encode("ascii"), TEMPORARY_REDIRECT
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
# Once we've checked auth we can return early if the media is cached on
|
||||
# the client
|
||||
if check_for_cached_entry_and_respond(request):
|
||||
@@ -1571,145 +1545,3 @@ class MediaRepository:
|
||||
removed_media.append(media_id)
|
||||
|
||||
return removed_media, len(removed_media)
|
||||
|
||||
def download_media_key(
|
||||
self,
|
||||
media_id: str,
|
||||
exp: int,
|
||||
name: Optional[str] = None,
|
||||
) -> StrSequence:
|
||||
"""Get the key used for the download media signature
|
||||
|
||||
Args:
|
||||
media_id: The media ID of the content. (This is the same as
|
||||
the file_id for local content.)
|
||||
exp: The expiration time of the signature, as a unix timestamp in ms.
|
||||
name: Optional name that, if specified, will be used as
|
||||
the filename in the Content-Disposition header of the response.
|
||||
"""
|
||||
if name is not None:
|
||||
return ("download", media_id, str(exp), name)
|
||||
|
||||
return ("download", media_id, str(exp))
|
||||
|
||||
def signed_location_for_media(
|
||||
self,
|
||||
media_id: str,
|
||||
name: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Get the signed location for a media download
|
||||
|
||||
That URL will serve the media with no extra authentication for a limited
|
||||
time, allowing the media to be cached by a CDN more easily.
|
||||
"""
|
||||
|
||||
# XXX: One potential improvement here would be to round the `exp` to
|
||||
# the nearest 5 minutes, so that a CDN/cache can always cache the
|
||||
# media for a little bit
|
||||
exp = self.clock.time_msec() + self.hs.config.media.redirect_ttl_ms
|
||||
key = self.download_media_key(
|
||||
media_id=media_id,
|
||||
exp=exp,
|
||||
name=name,
|
||||
)
|
||||
signature = self.compute_media_request_signature(key)
|
||||
|
||||
# This *could* in theory be a relative redirect, but Synapse has a
|
||||
# bug where it always treats it as absolute. Because this is used
|
||||
# for federation request, we can't just fix the bug in Synapse and
|
||||
# use a relative redirect, we have to wait for the fix to be rolled
|
||||
# out across the federation.
|
||||
name_path = f"/{name}" if name else ""
|
||||
return f"{self.hs.config.server.public_baseurl}_synapse/media/download/{media_id}{name_path}?exp={exp}&sig={signature}"
|
||||
|
||||
def thumbnail_media_key(
|
||||
self, media_id: str, parameters: str, exp: int
|
||||
) -> StrSequence:
|
||||
"""Get the key used for the thumbnail media signature
|
||||
|
||||
Args:
|
||||
media_id: The media ID of the content. (This is the same as
|
||||
the file_id for local content.)
|
||||
parameters: The parameters of the thumbnail request as a string,
|
||||
e.g. "width=100&height=100"
|
||||
exp: The expiration time of the signature, as a unix timestamp in ms.
|
||||
"""
|
||||
return ("thumbnail", media_id, parameters, str(exp))
|
||||
|
||||
def signed_location_for_thumbnail(
|
||||
self,
|
||||
media_id: str,
|
||||
parameters: dict[str, str],
|
||||
) -> str:
|
||||
"""Get the signed location for a thumbnail media
|
||||
|
||||
That URL will serve the media with no extra authentication for a limited
|
||||
time, allowing the media to be cached by a CDN more easily.
|
||||
"""
|
||||
|
||||
# XXX: One potential improvement here would be to round the `exp` to
|
||||
# the nearest 5 minutes, so that a CDN/cache can always cache the
|
||||
# media for a little bit
|
||||
exp = self.clock.time_msec() + self.hs.config.media.redirect_ttl_ms
|
||||
parameters_str = base64.urlsafe_b64encode(
|
||||
urlencode(sorted(parameters.items())).encode("utf-8")
|
||||
).decode("ascii")
|
||||
key = self.thumbnail_media_key(
|
||||
media_id=media_id,
|
||||
parameters=parameters_str,
|
||||
exp=exp,
|
||||
)
|
||||
signature = self.compute_media_request_signature(key)
|
||||
|
||||
# This *could* in theory be a relative redirect, but Synapse has a
|
||||
# bug where it always treats it as absolute. Because this is used
|
||||
# for federation request, we can't just fix the bug in Synapse and
|
||||
# use a relative redirect, we have to wait for the fix to be rolled
|
||||
# out across the federation.
|
||||
return f"{self.hs.config.server.public_baseurl}_synapse/media/thumbnail/{media_id}/{parameters_str}?exp={exp}&sig={signature}"
|
||||
|
||||
def compute_media_request_signature(self, payload: StrSequence) -> str:
|
||||
"""Compute the signature for a signed media request
|
||||
|
||||
This currently uses a HMAC-SHA256 signature encoded as hex, but this could
|
||||
be swapped to an asymmetric signature.
|
||||
"""
|
||||
assert self._media_request_signature_secret is not None, (
|
||||
"media request signature secret not set"
|
||||
)
|
||||
|
||||
# XXX: alternatively, we could do multiple rounds of HMAC with the
|
||||
# different segments, like AWS SigV4 does
|
||||
bytes_payload = "|".join(payload).encode("utf-8")
|
||||
|
||||
digest = hmac.digest(
|
||||
key=self._media_request_signature_secret,
|
||||
msg=bytes_payload,
|
||||
digest=hashlib.sha256,
|
||||
)
|
||||
signature = digest.hex()
|
||||
return signature
|
||||
|
||||
def verify_media_request_signature(
|
||||
self, payload: StrSequence, signature: str
|
||||
) -> bool:
|
||||
"""Verify the signature for a signed media request
|
||||
|
||||
Returns True if the signature is valid, False otherwise.
|
||||
"""
|
||||
# In case there is no secret, we can't verify the signature
|
||||
if self._media_request_signature_secret is None:
|
||||
return False
|
||||
|
||||
bytes_payload = "|".join(payload).encode("utf-8")
|
||||
decoded_signature = bytes.fromhex(signature)
|
||||
computed_signature = hmac.digest(
|
||||
key=self._media_request_signature_secret,
|
||||
msg=bytes_payload,
|
||||
digest=hashlib.sha256,
|
||||
)
|
||||
|
||||
if len(computed_signature) != len(decoded_signature):
|
||||
return False
|
||||
|
||||
return hmac.compare_digest(computed_signature, decoded_signature)
|
||||
|
||||
@@ -59,7 +59,7 @@ from synapse.util import Clock
|
||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||
|
||||
from ..types import JsonDict
|
||||
from ._base import CRLF, FileInfo, Responder
|
||||
from ._base import FileInfo, Responder
|
||||
from .filepath import MediaFilePaths
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -68,6 +68,8 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CRLF = b"\r\n"
|
||||
|
||||
|
||||
class SHA256TransparentIOWriter:
|
||||
"""Will generate a SHA256 hash from a source stream transparently.
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from http.client import TEMPORARY_REDIRECT
|
||||
from io import BytesIO
|
||||
from types import TracebackType
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple, Type
|
||||
@@ -29,7 +28,7 @@ from PIL import Image
|
||||
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error
|
||||
from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
|
||||
from synapse.http.server import respond_with_json, respond_with_redirect
|
||||
from synapse.http.server import respond_with_json
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.media._base import (
|
||||
@@ -38,7 +37,6 @@ from synapse.media._base import (
|
||||
check_for_cached_entry_and_respond,
|
||||
respond_404,
|
||||
respond_with_file,
|
||||
respond_with_multipart_location,
|
||||
respond_with_multipart_responder,
|
||||
respond_with_responder,
|
||||
)
|
||||
@@ -271,7 +269,6 @@ class ThumbnailProvider:
|
||||
self.media_repo = media_repo
|
||||
self.media_storage = media_storage
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails
|
||||
|
||||
async def respond_local_thumbnail(
|
||||
@@ -285,7 +282,6 @@ class ThumbnailProvider:
|
||||
max_timeout_ms: int,
|
||||
for_federation: bool,
|
||||
allow_authenticated: bool = True,
|
||||
may_redirect: bool = False,
|
||||
) -> None:
|
||||
media_info = await self.media_repo.get_local_media_info(
|
||||
request, media_id, max_timeout_ms
|
||||
@@ -299,26 +295,6 @@ class ThumbnailProvider:
|
||||
if media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
if self.hs.config.media.use_redirect and may_redirect:
|
||||
location = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id,
|
||||
{
|
||||
"width": str(width),
|
||||
"height": str(height),
|
||||
"method": method,
|
||||
"type": m_type,
|
||||
},
|
||||
)
|
||||
|
||||
if for_federation:
|
||||
respond_with_multipart_location(request, location.encode("ascii"))
|
||||
else:
|
||||
respond_with_redirect(
|
||||
request, location.encode("ascii"), TEMPORARY_REDIRECT
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
# Once we've checked auth we can return early if the media is cached on
|
||||
# the client
|
||||
if check_for_cached_entry_and_respond(request):
|
||||
@@ -351,7 +327,6 @@ class ThumbnailProvider:
|
||||
max_timeout_ms: int,
|
||||
for_federation: bool,
|
||||
allow_authenticated: bool = True,
|
||||
may_redirect: bool = False,
|
||||
) -> None:
|
||||
media_info = await self.media_repo.get_local_media_info(
|
||||
request, media_id, max_timeout_ms
|
||||
@@ -365,27 +340,6 @@ class ThumbnailProvider:
|
||||
if media_info.authenticated:
|
||||
raise NotFoundError()
|
||||
|
||||
if self.hs.config.media.use_redirect and may_redirect:
|
||||
location = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id,
|
||||
{
|
||||
"width": str(desired_width),
|
||||
"height": str(desired_height),
|
||||
"method": desired_method,
|
||||
"type": desired_type,
|
||||
},
|
||||
)
|
||||
|
||||
if for_federation:
|
||||
respond_with_multipart_location(request, location.encode("ascii"))
|
||||
|
||||
else:
|
||||
respond_with_redirect(
|
||||
request, location.encode("ascii"), TEMPORARY_REDIRECT
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
# Once we've checked auth we can return early if the media is cached on
|
||||
# the client
|
||||
if check_for_cached_entry_and_respond(request):
|
||||
|
||||
@@ -31,7 +31,6 @@ from typing import (
|
||||
Dict,
|
||||
Generic,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
@@ -74,6 +73,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
METRICS_PREFIX = "/_synapse/metrics"
|
||||
|
||||
all_gauges: Dict[str, Collector] = {}
|
||||
|
||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||
|
||||
SERVER_NAME_LABEL = "server_name"
|
||||
@@ -162,47 +163,42 @@ class LaterGauge(Collector):
|
||||
name: str
|
||||
desc: str
|
||||
labelnames: Optional[StrSequence] = attr.ib(hash=False)
|
||||
# List of callbacks: each callback should either return a value (if there are no
|
||||
# labels for this metric), or dict mapping from a label tuple to a value
|
||||
_hooks: List[
|
||||
Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
]
|
||||
] = attr.ib(factory=list, hash=False)
|
||||
# callback: should either return a value (if there are no labels for this metric),
|
||||
# or dict mapping from a label tuple to a value
|
||||
caller: Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
]
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
|
||||
# (we don't enforce it here, one level up).
|
||||
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
|
||||
|
||||
for hook in self._hooks:
|
||||
try:
|
||||
hook_result = hook()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception running callback for LaterGauge(%s)", self.name
|
||||
)
|
||||
yield g
|
||||
return
|
||||
|
||||
if isinstance(hook_result, (int, float)):
|
||||
g.add_metric([], hook_result)
|
||||
else:
|
||||
for k, v in hook_result.items():
|
||||
g.add_metric(k, v)
|
||||
|
||||
try:
|
||||
calls = self.caller()
|
||||
except Exception:
|
||||
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
|
||||
yield g
|
||||
return
|
||||
|
||||
def register_hook(
|
||||
self,
|
||||
hook: Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
],
|
||||
) -> None:
|
||||
self._hooks.append(hook)
|
||||
if isinstance(calls, (int, float)):
|
||||
g.add_metric([], calls)
|
||||
else:
|
||||
for k, v in calls.items():
|
||||
g.add_metric(k, v)
|
||||
|
||||
yield g
|
||||
|
||||
def __attrs_post_init__(self) -> None:
|
||||
self._register()
|
||||
|
||||
def _register(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
||||
@@ -254,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
# Protects access to _registrations
|
||||
self._lock = threading.Lock()
|
||||
|
||||
REGISTRY.register(self)
|
||||
self._register_with_collector()
|
||||
|
||||
def register(
|
||||
self,
|
||||
@@ -345,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
|
||||
"""
|
||||
|
||||
@@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter(
|
||||
labelnames=["stream", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
notifier_listeners_gauge = LaterGauge(
|
||||
name="synapse_notifier_listeners",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
notifier_rooms_gauge = LaterGauge(
|
||||
name="synapse_notifier_rooms",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
notifier_users_gauge = LaterGauge(
|
||||
name="synapse_notifier_users",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@@ -299,16 +281,28 @@ class Notifier:
|
||||
)
|
||||
}
|
||||
|
||||
notifier_listeners_gauge.register_hook(count_listeners)
|
||||
notifier_rooms_gauge.register_hook(
|
||||
lambda: {
|
||||
LaterGauge(
|
||||
name="synapse_notifier_listeners",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=count_listeners,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_notifier_rooms",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
(self.server_name,): count(
|
||||
bool, list(self.room_to_user_streams.values())
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
notifier_users_gauge.register_hook(
|
||||
lambda: {(self.server_name,): len(self.user_to_user_stream)}
|
||||
LaterGauge(
|
||||
name="synapse_notifier_users",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
|
||||
)
|
||||
|
||||
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
||||
|
||||
@@ -25,6 +25,7 @@ from typing import (
|
||||
Any,
|
||||
Collection,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
@@ -477,8 +478,18 @@ class BulkPushRuleEvaluator:
|
||||
event.room_version.msc3931_push_features,
|
||||
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
|
||||
self.hs.config.experimental.msc4210_enabled,
|
||||
self.hs.config.experimental.msc4306_enabled,
|
||||
)
|
||||
|
||||
msc4306_thread_subscribers: Optional[FrozenSet[str]] = None
|
||||
if self.hs.config.experimental.msc4306_enabled and thread_id != MAIN_TIMELINE:
|
||||
# pull out, in batch, all local subscribers to this thread
|
||||
# (in the common case, they will all be getting processed for push
|
||||
# rules right now)
|
||||
msc4306_thread_subscribers = await self.store.get_subscribers_to_thread(
|
||||
event.room_id, thread_id
|
||||
)
|
||||
|
||||
for uid, rules in rules_by_user.items():
|
||||
if event.sender == uid:
|
||||
continue
|
||||
@@ -503,7 +514,13 @@ class BulkPushRuleEvaluator:
|
||||
# current user, it'll be added to the dict later.
|
||||
actions_by_user[uid] = []
|
||||
|
||||
actions = evaluator.run(rules, uid, display_name)
|
||||
msc4306_thread_subscription_state: Optional[bool] = None
|
||||
if msc4306_thread_subscribers is not None:
|
||||
msc4306_thread_subscription_state = uid in msc4306_thread_subscribers
|
||||
|
||||
actions = evaluator.run(
|
||||
rules, uid, display_name, msc4306_thread_subscription_state
|
||||
)
|
||||
if "notify" in actions:
|
||||
# Push rules say we should notify the user of this event
|
||||
actions_by_user[uid] = actions
|
||||
|
||||
@@ -106,18 +106,6 @@ user_ip_cache_counter = Counter(
|
||||
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
tcp_resource_total_connections_gauge = LaterGauge(
|
||||
name="synapse_replication_tcp_resource_total_connections",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
tcp_command_queue_gauge = LaterGauge(
|
||||
name="synapse_replication_tcp_command_queue",
|
||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
# the type of the entries in _command_queues_by_stream
|
||||
_StreamCommandQueue = Deque[
|
||||
@@ -255,8 +243,11 @@ class ReplicationCommandHandler:
|
||||
# outgoing replication commands to.)
|
||||
self._connections: List[IReplicationConnection] = []
|
||||
|
||||
tcp_resource_total_connections_gauge.register_hook(
|
||||
lambda: {(self.server_name,): len(self._connections)}
|
||||
LaterGauge(
|
||||
name="synapse_replication_tcp_resource_total_connections",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self._connections)},
|
||||
)
|
||||
|
||||
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
||||
@@ -275,11 +266,14 @@ class ReplicationCommandHandler:
|
||||
# from that connection.
|
||||
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
|
||||
|
||||
tcp_command_queue_gauge.register_hook(
|
||||
lambda: {
|
||||
LaterGauge(
|
||||
name="synapse_replication_tcp_command_queue",
|
||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
(stream_name, self.server_name): len(queue)
|
||||
for stream_name, queue in self._command_queues_by_stream.items()
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
self._is_master = hs.config.worker.worker_app is None
|
||||
|
||||
@@ -527,11 +527,9 @@ pending_commands = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_pending_commands",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
)
|
||||
pending_commands.register_hook(
|
||||
lambda: {
|
||||
caller=lambda: {
|
||||
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -546,11 +544,9 @@ transport_send_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_send_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
)
|
||||
transport_send_buffer.register_hook(
|
||||
lambda: {
|
||||
caller=lambda: {
|
||||
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -575,12 +571,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
)
|
||||
tcp_transport_kernel_send_buffer.register_hook(
|
||||
lambda: {
|
||||
caller=lambda: {
|
||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
|
||||
for p in connected_connections
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -588,10 +582,8 @@ tcp_transport_kernel_read_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
)
|
||||
tcp_transport_kernel_read_buffer.register_hook(
|
||||
lambda: {
|
||||
caller=lambda: {
|
||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
|
||||
for p in connected_connections
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
@@ -30,7 +30,6 @@ from synapse.http.server import (
|
||||
respond_with_json_bytes,
|
||||
set_corp_headers,
|
||||
set_cors_headers,
|
||||
set_headers_for_media_response,
|
||||
)
|
||||
from synapse.http.servlet import RestServlet, parse_integer, parse_string
|
||||
from synapse.http.site import SynapseRequest
|
||||
@@ -170,8 +169,7 @@ class ThumbnailResource(RestServlet):
|
||||
method,
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
for_federation=False,
|
||||
may_redirect=True,
|
||||
False,
|
||||
)
|
||||
else:
|
||||
await self.thumbnailer.respond_local_thumbnail(
|
||||
@@ -182,8 +180,7 @@ class ThumbnailResource(RestServlet):
|
||||
method,
|
||||
m_type,
|
||||
max_timeout_ms,
|
||||
for_federation=False,
|
||||
may_redirect=True,
|
||||
False,
|
||||
)
|
||||
self.media_repo.mark_recently_accessed(None, media_id)
|
||||
else:
|
||||
@@ -241,7 +238,21 @@ class DownloadResource(RestServlet):
|
||||
|
||||
await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
set_headers_for_media_response(request)
|
||||
set_cors_headers(request)
|
||||
set_corp_headers(request)
|
||||
request.setHeader(
|
||||
b"Content-Security-Policy",
|
||||
b"sandbox;"
|
||||
b" default-src 'none';"
|
||||
b" script-src 'none';"
|
||||
b" plugin-types application/pdf;"
|
||||
b" style-src 'unsafe-inline';"
|
||||
b" media-src 'self';"
|
||||
b" object-src 'self';",
|
||||
)
|
||||
# Limited non-standard form of CSP for IE11
|
||||
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
|
||||
request.setHeader(b"Referrer-Policy", b"no-referrer")
|
||||
max_timeout_ms = parse_integer(
|
||||
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
|
||||
)
|
||||
@@ -249,7 +260,7 @@ class DownloadResource(RestServlet):
|
||||
|
||||
if self._is_mine_server_name(server_name):
|
||||
await self.media_repo.get_local_media(
|
||||
request, media_id, file_name, max_timeout_ms, may_redirect=True
|
||||
request, media_id, file_name, max_timeout_ms
|
||||
)
|
||||
else:
|
||||
ip_address = request.getClientAddress().host
|
||||
|
||||
@@ -23,9 +23,7 @@ import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.http.server import (
|
||||
set_headers_for_media_response,
|
||||
)
|
||||
from synapse.http.server import set_corp_headers, set_cors_headers
|
||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.media._base import (
|
||||
@@ -64,7 +62,21 @@ class DownloadResource(RestServlet):
|
||||
# Validate the server name, raising if invalid
|
||||
parse_and_validate_server_name(server_name)
|
||||
|
||||
set_headers_for_media_response(request)
|
||||
set_cors_headers(request)
|
||||
set_corp_headers(request)
|
||||
request.setHeader(
|
||||
b"Content-Security-Policy",
|
||||
b"sandbox;"
|
||||
b" default-src 'none';"
|
||||
b" script-src 'none';"
|
||||
b" plugin-types application/pdf;"
|
||||
b" style-src 'unsafe-inline';"
|
||||
b" media-src 'self';"
|
||||
b" object-src 'self';",
|
||||
)
|
||||
# Limited non-standard form of CSP for IE11
|
||||
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
|
||||
request.setHeader(b"Referrer-Policy", b"no-referrer")
|
||||
max_timeout_ms = parse_integer(
|
||||
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
|
||||
)
|
||||
|
||||
@@ -31,7 +31,6 @@ from synapse.rest.synapse.client.rendezvous import MSC4108RendezvousSessionResou
|
||||
from synapse.rest.synapse.client.sso_register import SsoRegisterResource
|
||||
from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource
|
||||
from synapse.rest.synapse.mas import MasResource
|
||||
from synapse.rest.synapse.media import MediaResource
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -65,9 +64,6 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc
|
||||
resources["/_synapse/jwks"] = JwksResource(hs)
|
||||
resources["/_synapse/mas"] = MasResource(hs)
|
||||
|
||||
if hs.config.media.can_load_media_repo:
|
||||
resources["/_synapse/media"] = MediaResource(hs)
|
||||
|
||||
# provider-specific SSO bits. Only load these if they are enabled, since they
|
||||
# rely on optional dependencies.
|
||||
if hs.config.oidc.oidc_enabled:
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl_3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from .download import DownloadResource
|
||||
from .thumbnail import ThumbnailResource
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MediaResource(Resource):
|
||||
"""
|
||||
Provides endpoints for signed media downloads and thumbnails.
|
||||
|
||||
All endpoints are mounted under the path `/_synapse/media/` and only work
|
||||
on the media worker.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
assert hs.config.media.can_load_media_repo, (
|
||||
"This resource should only be mounted on workers that can load the media repo"
|
||||
)
|
||||
|
||||
Resource.__init__(self)
|
||||
self.putChild(b"download", DownloadResource(hs))
|
||||
self.putChild(b"thumbnail", ThumbnailResource(hs))
|
||||
@@ -1,111 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl_3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.api.errors import NotFoundError
|
||||
from synapse.http.server import (
|
||||
DirectServeJsonResource,
|
||||
set_headers_for_media_response,
|
||||
)
|
||||
from synapse.http.servlet import parse_integer, parse_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloadResource(DirectServeJsonResource):
|
||||
"""
|
||||
Serves media from the media repository, with a temporary signed URL which
|
||||
expires after a set amount of time.
|
||||
|
||||
GET /_synapse/media/download/{media_id}?exp={exp}&sig={sig}
|
||||
GET /_synapse/media/download/{media_id}/{name}?exp={exp}&sig={sig}
|
||||
|
||||
The intent of this resource is to allow the federation and client media APIs
|
||||
to issue redirects to a signed URL that can then be cached by a CDN. This
|
||||
endpoint doesn't require any extra header, and is authenticated using the
|
||||
signature in the URL parameters.
|
||||
"""
|
||||
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
assert hs.config.media.can_load_media_repo, (
|
||||
"This resource should only be mounted on workers that can load the media repo"
|
||||
)
|
||||
|
||||
DirectServeJsonResource.__init__(
|
||||
self,
|
||||
# It is useful to have the tracing context on this endpoint as it
|
||||
# can help debug federation issues
|
||||
extract_context=True,
|
||||
)
|
||||
|
||||
self._clock = hs.get_clock()
|
||||
self._media_repository = hs.get_media_repository()
|
||||
|
||||
async def _async_render_GET(self, request: "SynapseRequest") -> None:
|
||||
set_headers_for_media_response(request)
|
||||
|
||||
# Extract the media ID (and optional name) from the path
|
||||
if request.postpath is None:
|
||||
raise NotFoundError()
|
||||
|
||||
if len(request.postpath) == 1:
|
||||
media_id = request.postpath[0].decode("utf-8")
|
||||
name = None
|
||||
elif len(request.postpath) == 2:
|
||||
media_id = request.postpath[0].decode("utf-8")
|
||||
name = request.postpath[1].decode("utf-8")
|
||||
else:
|
||||
raise NotFoundError()
|
||||
|
||||
# Get the `exp` and `sig` query parameters
|
||||
exp = parse_integer(request=request, name="exp", required=True, negative=False)
|
||||
sig = parse_string(request=request, name="sig", required=True)
|
||||
|
||||
# Check that the signature is valid
|
||||
key = self._media_repository.download_media_key(
|
||||
media_id=media_id, exp=exp, name=name
|
||||
)
|
||||
if not self._media_repository.verify_media_request_signature(key, sig):
|
||||
logger.warning(
|
||||
"Invalid URL signature serving media %s. key: %r, sig: %r",
|
||||
media_id,
|
||||
key,
|
||||
sig,
|
||||
)
|
||||
raise NotFoundError()
|
||||
|
||||
# Check the expiry time
|
||||
if exp < self._clock.time_msec():
|
||||
logger.info("Expired signed URL serving media %s", media_id)
|
||||
raise NotFoundError()
|
||||
|
||||
# Reply with the media
|
||||
await self._media_repository.get_local_media(
|
||||
request=request,
|
||||
media_id=media_id,
|
||||
name=name,
|
||||
max_timeout_ms=0, # If we got here, the media finished uploading
|
||||
federation=False, # This changes the response to be multipart; we explicitly don't want that
|
||||
may_redirect=False, # We're already on the redirected URL, we don't want to redirect again
|
||||
)
|
||||
@@ -1,146 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl_3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
from synapse.api.errors import NotFoundError
|
||||
from synapse.http.server import (
|
||||
DirectServeJsonResource,
|
||||
set_headers_for_media_response,
|
||||
)
|
||||
from synapse.http.servlet import (
|
||||
parse_integer,
|
||||
parse_integer_from_args,
|
||||
parse_string,
|
||||
parse_string_from_args,
|
||||
)
|
||||
from synapse.media.thumbnailer import ThumbnailProvider
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ThumbnailResource(DirectServeJsonResource):
|
||||
"""
|
||||
Serves thumbnails from the media repository, with a temporary signed URL
|
||||
which expires after a set amount of time.
|
||||
|
||||
GET /_synapse/media/thumbnail/{media_id}/{parameters}?exp={exp}&sig={sig}
|
||||
|
||||
The intent of this resource is to allow the federation and client media APIs
|
||||
to issue redirects to a signed URL that can then be cached by a CDN. This
|
||||
endpoint doesn't require any extra header, and is authenticated using the
|
||||
signature in the URL parameters.
|
||||
|
||||
The parameters are encoded as a form-urlencoded then base64 encoded string.
|
||||
This avoids any automatic url decoding Twisted might do. The reason they are
|
||||
part of the URL and not the query string is to ignore the query string when
|
||||
caching the URL, which is possible with some CDNs.
|
||||
"""
|
||||
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
assert hs.config.media.can_load_media_repo, (
|
||||
"This resource should only be mounted on workers that can load the media repo"
|
||||
)
|
||||
|
||||
DirectServeJsonResource.__init__(
|
||||
self,
|
||||
# It is useful to have the tracing context on this endpoint as it
|
||||
# can help debug federation issues
|
||||
extract_context=True,
|
||||
)
|
||||
|
||||
self._clock = hs.get_clock()
|
||||
self._media_repository = hs.get_media_repository()
|
||||
self._dynamic_thumbnails = hs.config.media.dynamic_thumbnails
|
||||
self._thumbnailer = ThumbnailProvider(
|
||||
hs, self._media_repository, self._media_repository.media_storage
|
||||
)
|
||||
|
||||
async def _async_render_GET(self, request: "SynapseRequest") -> None:
|
||||
set_headers_for_media_response(request)
|
||||
|
||||
# Extract the media ID and parameters from the path
|
||||
if request.postpath is None or len(request.postpath) != 2:
|
||||
raise NotFoundError()
|
||||
|
||||
media_id = request.postpath[0].decode("utf-8")
|
||||
parameters = request.postpath[1]
|
||||
|
||||
# Get the `exp` and `sig` query parameters
|
||||
exp = parse_integer(request=request, name="exp", required=True, negative=False)
|
||||
sig = parse_string(request=request, name="sig", required=True)
|
||||
|
||||
# Check that the signature is valid
|
||||
key = self._media_repository.thumbnail_media_key(
|
||||
media_id=media_id,
|
||||
parameters=parameters.decode("utf-8"),
|
||||
exp=exp,
|
||||
)
|
||||
if not self._media_repository.verify_media_request_signature(key, sig):
|
||||
logger.warning(
|
||||
"Invalid URL signature serving media %s. key: %r, sig: %r",
|
||||
media_id,
|
||||
key,
|
||||
sig,
|
||||
)
|
||||
raise NotFoundError()
|
||||
|
||||
# Check the expiry time
|
||||
if exp < self._clock.time_msec():
|
||||
logger.info("Expired signed URL serving media %s", media_id)
|
||||
raise NotFoundError()
|
||||
|
||||
# Now parse and check the parameters
|
||||
args = parse_qs(base64.urlsafe_b64decode(parameters))
|
||||
width = parse_integer_from_args(args, "width", required=True)
|
||||
height = parse_integer_from_args(args, "height", required=True)
|
||||
method = parse_string_from_args(args, "method", "scale")
|
||||
m_type = parse_string_from_args(args, "type", "image/png")
|
||||
|
||||
# Reply with the thumbnail
|
||||
if self._dynamic_thumbnails:
|
||||
await self._thumbnailer.select_or_generate_local_thumbnail(
|
||||
request,
|
||||
media_id,
|
||||
width,
|
||||
height,
|
||||
method,
|
||||
m_type,
|
||||
max_timeout_ms=0, # If we got here, the media finished uploading
|
||||
for_federation=False, # This changes the response to be multipart; we explicitly don't want that
|
||||
may_redirect=False, # We're already on the redirected URL, we don't want to redirect again
|
||||
)
|
||||
else:
|
||||
await self._thumbnailer.respond_local_thumbnail(
|
||||
request,
|
||||
media_id,
|
||||
width,
|
||||
height,
|
||||
method,
|
||||
m_type,
|
||||
max_timeout_ms=0, # If we got here, the media finished uploading
|
||||
for_federation=False, # This changes the response to be multipart; we explicitly don't want that
|
||||
may_redirect=False, # We're already on the redirected URL, we don't want to redirect again
|
||||
)
|
||||
@@ -100,12 +100,6 @@ sql_txn_duration = Counter(
|
||||
labelnames=["desc", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
background_update_status = LaterGauge(
|
||||
name="synapse_background_update_status",
|
||||
desc="Background update status",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
# Unique indexes which have been added in background updates. Maps from table name
|
||||
# to the name of the background update which added the unique index to that table.
|
||||
@@ -617,8 +611,11 @@ class DatabasePool:
|
||||
)
|
||||
|
||||
self.updates = BackgroundUpdater(hs, self)
|
||||
background_update_status.register_hook(
|
||||
lambda: {(self.server_name,): self.updates.get_status()},
|
||||
LaterGauge(
|
||||
name="synapse_background_update_status",
|
||||
desc="Background update status",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): self.updates.get_status()},
|
||||
)
|
||||
|
||||
self._previous_txn_total_time = 0.0
|
||||
|
||||
@@ -110,6 +110,7 @@ def _load_rules(
|
||||
msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
|
||||
msc4028_push_encrypted_events=experimental_config.msc4028_push_encrypted_events,
|
||||
msc4210_enabled=experimental_config.msc4210_enabled,
|
||||
msc4306_enabled=experimental_config.msc4306_enabled,
|
||||
)
|
||||
|
||||
return filtered_rules
|
||||
|
||||
@@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
|
||||
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
federation_known_servers_gauge = LaterGauge(
|
||||
name="synapse_federation_known_servers",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||
class EventIdMembership:
|
||||
"""Returned by `get_membership_from_event_ids`"""
|
||||
@@ -123,8 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
1,
|
||||
self._count_known_servers,
|
||||
)
|
||||
federation_known_servers_gauge.register_hook(
|
||||
lambda: {(self.server_name,): self._known_servers_count}
|
||||
LaterGauge(
|
||||
name="synapse_federation_known_servers",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): self._known_servers_count},
|
||||
)
|
||||
|
||||
@wrap_as_background_process("_count_known_servers")
|
||||
|
||||
@@ -14,6 +14,7 @@ import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
FrozenSet,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
@@ -99,6 +100,7 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
self.get_subscription_for_thread.invalidate(
|
||||
(row.user_id, row.room_id, row.event_id)
|
||||
)
|
||||
self.get_subscribers_to_thread.invalidate((row.room_id, row.event_id))
|
||||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
@@ -194,6 +196,16 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
"""
|
||||
assert self._can_write_to_thread_subscriptions
|
||||
|
||||
def _invalidate_subscription_caches(txn: LoggingTransaction) -> None:
|
||||
txn.call_after(
|
||||
self.get_subscription_for_thread.invalidate,
|
||||
(user_id, room_id, thread_root_event_id),
|
||||
)
|
||||
txn.call_after(
|
||||
self.get_subscribers_to_thread.invalidate,
|
||||
(room_id, thread_root_event_id),
|
||||
)
|
||||
|
||||
def _subscribe_user_to_thread_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[Union[int, AutomaticSubscriptionConflicted]]:
|
||||
@@ -234,10 +246,7 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
"unsubscribed_at_topological_ordering": None,
|
||||
},
|
||||
)
|
||||
txn.call_after(
|
||||
self.get_subscription_for_thread.invalidate,
|
||||
(user_id, room_id, thread_root_event_id),
|
||||
)
|
||||
_invalidate_subscription_caches(txn)
|
||||
return stream_id
|
||||
|
||||
# we already have either a subscription or a prior unsubscription here
|
||||
@@ -291,10 +300,7 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
"unsubscribed_at_topological_ordering": None,
|
||||
},
|
||||
)
|
||||
txn.call_after(
|
||||
self.get_subscription_for_thread.invalidate,
|
||||
(user_id, room_id, thread_root_event_id),
|
||||
)
|
||||
_invalidate_subscription_caches(txn)
|
||||
|
||||
return stream_id
|
||||
|
||||
@@ -376,6 +382,10 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
self.get_subscription_for_thread.invalidate,
|
||||
(user_id, room_id, thread_root_event_id),
|
||||
)
|
||||
txn.call_after(
|
||||
self.get_subscribers_to_thread.invalidate,
|
||||
(room_id, thread_root_event_id),
|
||||
)
|
||||
|
||||
return stream_id
|
||||
|
||||
@@ -388,7 +398,9 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
Purge all subscriptions for the user.
|
||||
The fact that subscriptions have been purged will not be streamed;
|
||||
all stream rows for the user will in fact be removed.
|
||||
This is intended only for dealing with user deactivation.
|
||||
|
||||
This must only be used for user deactivation,
|
||||
because it does not invalidate the `subscribers_to_thread` cache.
|
||||
"""
|
||||
|
||||
def _purge_thread_subscription_settings_for_user_txn(
|
||||
@@ -449,6 +461,42 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
|
||||
|
||||
return ThreadSubscription(automatic=automatic)
|
||||
|
||||
# max_entries=100 rationale:
|
||||
# this returns a potentially large datastructure
|
||||
# (since each entry contains a set which contains a potentially large number of user IDs),
|
||||
# whereas the default of 10'000 entries for @cached feels more
|
||||
# suitable for very small cache entries.
|
||||
#
|
||||
# Overall, when bearing in mind the usual profile of a small community-server or company-server
|
||||
# (where cache tuning hasn't been done, so we're in out-of-box configuration), it is very
|
||||
# unlikely we would benefit from keeping hot the subscribers for as many as 100 threads,
|
||||
# since it's unlikely that so many threads will be active in a short span of time on a small homeserver.
|
||||
# It feels that medium servers will probably also not exhaust this limit.
|
||||
# Larger homeservers are more likely to be carefully tuned, either with a larger global cache factor
|
||||
# or carefully following the usage patterns & cache metrics.
|
||||
# Finally, the query is not so intensive that computing it every time is a huge deal, but given people
|
||||
# often send messages back-to-back in the same thread it seems like it would offer a mild benefit.
|
||||
@cached(max_entries=100)
|
||||
async def get_subscribers_to_thread(
|
||||
self, room_id: str, thread_root_event_id: str
|
||||
) -> FrozenSet[str]:
|
||||
"""
|
||||
Returns:
|
||||
the set of user_ids for local users who are subscribed to the given thread.
|
||||
"""
|
||||
return frozenset(
|
||||
await self.db_pool.simple_select_onecol(
|
||||
table="thread_subscriptions",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"event_id": thread_root_event_id,
|
||||
"subscribed": True,
|
||||
},
|
||||
retcol="user_id",
|
||||
desc="get_subscribers_to_thread",
|
||||
)
|
||||
)
|
||||
|
||||
def get_max_thread_subscriptions_stream_id(self) -> int:
|
||||
"""Get the current maximum stream_id for thread subscriptions.
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ class FilteredPushRules:
|
||||
msc3664_enabled: bool,
|
||||
msc4028_push_encrypted_events: bool,
|
||||
msc4210_enabled: bool,
|
||||
msc4306_enabled: bool,
|
||||
): ...
|
||||
def rules(self) -> Collection[Tuple[PushRule, bool]]: ...
|
||||
|
||||
@@ -67,13 +68,19 @@ class PushRuleEvaluator:
|
||||
room_version_feature_flags: Tuple[str, ...],
|
||||
msc3931_enabled: bool,
|
||||
msc4210_enabled: bool,
|
||||
msc4306_enabled: bool,
|
||||
): ...
|
||||
def run(
|
||||
self,
|
||||
push_rules: FilteredPushRules,
|
||||
user_id: Optional[str],
|
||||
display_name: Optional[str],
|
||||
msc4306_thread_subscription_state: Optional[bool],
|
||||
) -> Collection[Union[Mapping, str]]: ...
|
||||
def matches(
|
||||
self, condition: JsonDict, user_id: Optional[str], display_name: Optional[str]
|
||||
self,
|
||||
condition: JsonDict,
|
||||
user_id: Optional[str],
|
||||
display_name: Optional[str],
|
||||
msc4306_thread_subscription_state: Optional[bool] = None,
|
||||
) -> bool: ...
|
||||
|
||||
@@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance(
|
||||
# We track the number of affected hosts per time-period so we can
|
||||
# differentiate one really noisy homeserver from a general
|
||||
# ratelimit tuning problem across the federation.
|
||||
sleep_affected_hosts_gauge = LaterGauge(
|
||||
LaterGauge(
|
||||
name="synapse_rate_limit_sleep_affected_hosts",
|
||||
desc="Number of hosts that had requests put to sleep",
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
sleep_affected_hosts_gauge.register_hook(
|
||||
lambda: _get_counts_from_rate_limiter_instance(
|
||||
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||
lambda rate_limiter_instance: sum(
|
||||
ratelimiter.should_sleep()
|
||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
reject_affected_hosts_gauge = LaterGauge(
|
||||
LaterGauge(
|
||||
name="synapse_rate_limit_reject_affected_hosts",
|
||||
desc="Number of hosts that had requests rejected",
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
reject_affected_hosts_gauge.register_hook(
|
||||
lambda: _get_counts_from_rate_limiter_instance(
|
||||
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||
lambda rate_limiter_instance: sum(
|
||||
ratelimiter.should_reject()
|
||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||
)
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -44,13 +44,6 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
running_tasks_gauge = LaterGauge(
|
||||
name="synapse_scheduler_running_tasks",
|
||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
"""
|
||||
This is a simple task scheduler designed for resumable tasks. Normally,
|
||||
@@ -137,8 +130,11 @@ class TaskScheduler:
|
||||
TaskScheduler.SCHEDULE_INTERVAL_MS,
|
||||
)
|
||||
|
||||
running_tasks_gauge.register_hook(
|
||||
lambda: {(self.server_name,): len(self._running_tasks)}
|
||||
LaterGauge(
|
||||
name="synapse_scheduler_running_tasks",
|
||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self._running_tasks)},
|
||||
)
|
||||
|
||||
def register_action(
|
||||
|
||||
@@ -186,72 +186,6 @@ class FederationMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
|
||||
self.assertEqual(channel.is_finished(), True)
|
||||
self.assertNotIn("body", channel.result)
|
||||
|
||||
@unittest.override_config(
|
||||
{"media_redirect": {"enabled": True, "secret": "supersecret"}}
|
||||
)
|
||||
def test_signed_redirect(self) -> None:
|
||||
"""When media redirect are enabled, we should redirect to a signed URL"""
|
||||
content = io.BytesIO(b"file_to_stream")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_upload",
|
||||
content,
|
||||
46,
|
||||
UserID.from_string("@user_id:whatever.org"),
|
||||
)
|
||||
)
|
||||
# test with a text file
|
||||
channel = self.make_signed_federation_request(
|
||||
"GET",
|
||||
f"/_matrix/federation/v1/media/download/{content_uri.media_id}",
|
||||
)
|
||||
self.pump()
|
||||
self.assertEqual(200, channel.code)
|
||||
|
||||
content_type = channel.headers.getRawHeaders("content-type")
|
||||
assert content_type is not None
|
||||
assert "multipart/mixed" in content_type[0]
|
||||
assert "boundary" in content_type[0]
|
||||
|
||||
# extract boundary
|
||||
boundary = content_type[0].split("boundary=")[1]
|
||||
lines = channel.text_body.split("\r\n")
|
||||
|
||||
# Assert the structure of the multipart body line by line.
|
||||
# Expected structure:
|
||||
# --boundary_value
|
||||
# Content-Type: application/json
|
||||
#
|
||||
# {}
|
||||
# --boundary_value
|
||||
# Location: signed_url
|
||||
#
|
||||
#
|
||||
# --boundary_value--
|
||||
# (potentially a final empty line if the body ends with \r\n)
|
||||
self.assertEqual(len(lines), 10)
|
||||
|
||||
# Part 1: JSON metadata
|
||||
self.assertEqual(lines[0], f"--{boundary}")
|
||||
self.assertEqual(lines[1], "Content-Type: application/json")
|
||||
self.assertEqual(lines[2], "") # Empty line separating headers from body
|
||||
self.assertEqual(lines[3], "{}") # JSON body
|
||||
|
||||
# Part 2: Redirect URL
|
||||
self.assertEqual(lines[4], f"--{boundary}") # Boundary for the next part
|
||||
# The Location header contains dynamic parts (exp, sig), so use regex
|
||||
self.assertRegex(
|
||||
lines[5],
|
||||
rf"^Location: https://test/_synapse/media/download/{content_uri.media_id}\?exp=\d+&sig=\w+$",
|
||||
)
|
||||
self.assertEqual(lines[6], "") # First empty line after Location header
|
||||
self.assertEqual(lines[7], "") # Second empty line after Location header
|
||||
|
||||
# Final boundary
|
||||
self.assertEqual(lines[8], f"--{boundary}--")
|
||||
self.assertEqual(lines[9], "")
|
||||
|
||||
|
||||
class FederationThumbnailTest(unittest.FederatingHomeserverTestCase):
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
@@ -359,69 +293,3 @@ class FederationThumbnailTest(unittest.FederatingHomeserverTestCase):
|
||||
small_png.expected_cropped in field for field in stripped_bytes
|
||||
)
|
||||
self.assertTrue(found_file)
|
||||
|
||||
@unittest.override_config(
|
||||
{"media_redirect": {"enabled": True, "secret": "supersecret"}}
|
||||
)
|
||||
def test_thumbnail_signed_redirect(self) -> None:
|
||||
"""When media redirect are enabled, we should redirect to a signed URL for thumbnails"""
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_png_thumbnail",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user_id:whatever.org"),
|
||||
)
|
||||
)
|
||||
# test with a thumbnail request
|
||||
channel = self.make_signed_federation_request(
|
||||
"GET",
|
||||
f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=scale",
|
||||
)
|
||||
self.pump()
|
||||
self.assertEqual(200, channel.code)
|
||||
|
||||
content_type = channel.headers.getRawHeaders("content-type")
|
||||
assert content_type is not None
|
||||
assert "multipart/mixed" in content_type[0]
|
||||
assert "boundary" in content_type[0]
|
||||
|
||||
# extract boundary
|
||||
boundary = content_type[0].split("boundary=")[1]
|
||||
lines = channel.text_body.split("\r\n")
|
||||
|
||||
# Assert the structure of the multipart body line by line.
|
||||
# Expected structure:
|
||||
# --boundary_value
|
||||
# Content-Type: application/json
|
||||
#
|
||||
# {}
|
||||
# --boundary_value
|
||||
# Location: signed_url
|
||||
#
|
||||
#
|
||||
# --boundary_value--
|
||||
# (potentially a final empty line if the body ends with \r\n)
|
||||
self.assertEqual(len(lines), 10)
|
||||
|
||||
# Part 1: JSON metadata
|
||||
self.assertEqual(lines[0], f"--{boundary}")
|
||||
self.assertEqual(lines[1], "Content-Type: application/json")
|
||||
self.assertEqual(lines[2], "") # Empty line separating headers from body
|
||||
self.assertEqual(lines[3], "{}") # JSON body
|
||||
|
||||
# Part 2: Redirect URL
|
||||
self.assertEqual(lines[4], f"--{boundary}") # Boundary for the next part
|
||||
# The Location header contains dynamic parts (exp, sig), so use regex
|
||||
self.assertRegex(
|
||||
lines[5],
|
||||
rf"^Location: https://test/_synapse/media/thumbnail/{content_uri.media_id}/[^?]+\?exp=\d+&sig=\w+$",
|
||||
)
|
||||
self.assertEqual(lines[6], "") # First empty line after Location header
|
||||
self.assertEqual(lines[7], "") # Second empty line after Location header
|
||||
|
||||
# Final boundary
|
||||
self.assertEqual(lines[8], f"--{boundary}--")
|
||||
self.assertEqual(lines[9], "")
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
from synapse.api.constants import EduTypes
|
||||
|
||||
from tests import unittest
|
||||
from tests.unittest import DEBUG, override_config
|
||||
from tests.unittest import override_config
|
||||
|
||||
|
||||
class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
|
||||
@@ -48,7 +48,6 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
|
||||
)
|
||||
self.assertEqual(200, channel.code)
|
||||
|
||||
@DEBUG
|
||||
def test_edu_debugging_doesnt_explode(self) -> None:
|
||||
"""Sanity check incoming federation succeeds with `synapse.debug_8631` enabled.
|
||||
|
||||
|
||||
@@ -22,13 +22,7 @@ from typing import Dict, Protocol, Tuple
|
||||
|
||||
from prometheus_client.core import Sample
|
||||
|
||||
from synapse.metrics import (
|
||||
REGISTRY,
|
||||
SERVER_NAME_LABEL,
|
||||
InFlightGauge,
|
||||
LaterGauge,
|
||||
generate_latest,
|
||||
)
|
||||
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
|
||||
from synapse.util.caches.deferred_cache import DeferredCache
|
||||
|
||||
from tests import unittest
|
||||
@@ -291,42 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
|
||||
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
||||
|
||||
|
||||
class LaterGaugeTests(unittest.HomeserverTestCase):
|
||||
def test_later_gauge_multiple_servers(self) -> None:
|
||||
"""
|
||||
Test that LaterGauge metrics are reported correctly across multiple servers. We
|
||||
will have an metrics entry for each homeserver that is labeled with the
|
||||
`server_name` label.
|
||||
"""
|
||||
later_gauge = LaterGauge(
|
||||
name="foo",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
later_gauge.register_hook(lambda: {("hs1",): 1})
|
||||
later_gauge.register_hook(lambda: {("hs2",): 2})
|
||||
|
||||
metrics_map = get_latest_metrics()
|
||||
|
||||
# Find the metrics for the caches from both homeservers
|
||||
hs1_metric = 'foo{server_name="hs1"}'
|
||||
hs1_metric_value = metrics_map.get(hs1_metric)
|
||||
self.assertIsNotNone(
|
||||
hs1_metric_value,
|
||||
f"Missing metric {hs1_metric} in cache metrics {metrics_map}",
|
||||
)
|
||||
hs2_metric = 'foo{server_name="hs2"}'
|
||||
hs2_metric_value = metrics_map.get(hs2_metric)
|
||||
self.assertIsNotNone(
|
||||
hs2_metric_value,
|
||||
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
|
||||
)
|
||||
|
||||
# Sanity check the metric values
|
||||
self.assertEqual(hs1_metric_value, "1.0")
|
||||
self.assertEqual(hs2_metric_value, "2.0")
|
||||
|
||||
|
||||
def get_latest_metrics() -> Dict[str, str]:
|
||||
"""
|
||||
Collect the latest metrics from the registry and parse them into an easy to use map.
|
||||
|
||||
@@ -26,7 +26,7 @@ from parameterized import parameterized
|
||||
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventContentFields, RelationTypes
|
||||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
|
||||
from synapse.rest import admin
|
||||
@@ -206,7 +206,10 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
bulk_evaluator._action_for_event_by_user.assert_not_called()
|
||||
|
||||
def _create_and_process(
|
||||
self, bulk_evaluator: BulkPushRuleEvaluator, content: Optional[JsonDict] = None
|
||||
self,
|
||||
bulk_evaluator: BulkPushRuleEvaluator,
|
||||
content: Optional[JsonDict] = None,
|
||||
type: str = "test",
|
||||
) -> bool:
|
||||
"""Returns true iff the `mentions` trigger an event push action."""
|
||||
# Create a new message event which should cause a notification.
|
||||
@@ -214,7 +217,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self.event_creation_handler.create_event(
|
||||
self.requester,
|
||||
{
|
||||
"type": "test",
|
||||
"type": type,
|
||||
"room_id": self.room_id,
|
||||
"content": content or {},
|
||||
"sender": f"@bob:{self.hs.hostname}",
|
||||
@@ -446,3 +449,73 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
@override_config({"experimental_features": {"msc4306_enabled": True}})
|
||||
def test_thread_subscriptions(self) -> None:
|
||||
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
|
||||
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
|
||||
|
||||
self.assertFalse(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{
|
||||
"msgtype": "m.text",
|
||||
"body": "test message before subscription",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_root_id,
|
||||
},
|
||||
},
|
||||
type=EventTypes.Message,
|
||||
)
|
||||
)
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.subscribe_user_to_thread(
|
||||
self.alice,
|
||||
self.room_id,
|
||||
thread_root_id,
|
||||
automatic_event_orderings=None,
|
||||
)
|
||||
)
|
||||
|
||||
self.assertTrue(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{
|
||||
"msgtype": "m.text",
|
||||
"body": "test message after subscription",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_root_id,
|
||||
},
|
||||
},
|
||||
type="m.room.message",
|
||||
)
|
||||
)
|
||||
|
||||
def test_with_disabled_thread_subscriptions(self) -> None:
|
||||
"""
|
||||
Test what happens with threaded events when MSC4306 is disabled.
|
||||
|
||||
FUTURE: If MSC4306 becomes enabled-by-default/accepted, this test is to be removed.
|
||||
"""
|
||||
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
|
||||
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
|
||||
|
||||
# When MSC4306 is not enabled, a threaded message generates a notification
|
||||
# by default.
|
||||
self.assertTrue(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{
|
||||
"msgtype": "m.text",
|
||||
"body": "test message before subscription",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_root_id,
|
||||
},
|
||||
},
|
||||
type="m.room.message",
|
||||
)
|
||||
)
|
||||
|
||||
@@ -150,6 +150,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
|
||||
*,
|
||||
related_events: Optional[JsonDict] = None,
|
||||
msc4210: bool = False,
|
||||
msc4306: bool = False,
|
||||
) -> PushRuleEvaluator:
|
||||
event = FrozenEvent(
|
||||
{
|
||||
@@ -176,6 +177,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
|
||||
room_version_feature_flags=event.room_version.msc3931_push_features,
|
||||
msc3931_enabled=True,
|
||||
msc4210_enabled=msc4210,
|
||||
msc4306_enabled=msc4306,
|
||||
)
|
||||
|
||||
def test_display_name(self) -> None:
|
||||
@@ -806,6 +808,112 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
|
||||
)
|
||||
)
|
||||
|
||||
def test_thread_subscription_subscribed(self) -> None:
|
||||
"""
|
||||
Test MSC4306 thread subscription push rules against an event in a subscribed thread.
|
||||
"""
|
||||
evaluator = self._get_evaluator(
|
||||
{
|
||||
"msgtype": "m.text",
|
||||
"body": "Squawk",
|
||||
"m.relates_to": {
|
||||
"event_id": "$threadroot",
|
||||
"rel_type": "m.thread",
|
||||
},
|
||||
},
|
||||
msc4306=True,
|
||||
)
|
||||
self.assertTrue(
|
||||
evaluator.matches(
|
||||
{
|
||||
"kind": "io.element.msc4306.thread_subscription",
|
||||
"subscribed": True,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
msc4306_thread_subscription_state=True,
|
||||
)
|
||||
)
|
||||
self.assertFalse(
|
||||
evaluator.matches(
|
||||
{
|
||||
"kind": "io.element.msc4306.thread_subscription",
|
||||
"subscribed": False,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
msc4306_thread_subscription_state=True,
|
||||
)
|
||||
)
|
||||
|
||||
def test_thread_subscription_unsubscribed(self) -> None:
|
||||
"""
|
||||
Test MSC4306 thread subscription push rules against an event in an unsubscribed thread.
|
||||
"""
|
||||
evaluator = self._get_evaluator(
|
||||
{
|
||||
"msgtype": "m.text",
|
||||
"body": "Squawk",
|
||||
"m.relates_to": {
|
||||
"event_id": "$threadroot",
|
||||
"rel_type": "m.thread",
|
||||
},
|
||||
},
|
||||
msc4306=True,
|
||||
)
|
||||
self.assertFalse(
|
||||
evaluator.matches(
|
||||
{
|
||||
"kind": "io.element.msc4306.thread_subscription",
|
||||
"subscribed": True,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
msc4306_thread_subscription_state=False,
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
evaluator.matches(
|
||||
{
|
||||
"kind": "io.element.msc4306.thread_subscription",
|
||||
"subscribed": False,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
msc4306_thread_subscription_state=False,
|
||||
)
|
||||
)
|
||||
|
||||
def test_thread_subscription_unthreaded(self) -> None:
|
||||
"""
|
||||
Test MSC4306 thread subscription push rules against an unthreaded event.
|
||||
"""
|
||||
evaluator = self._get_evaluator(
|
||||
{"msgtype": "m.text", "body": "Squawk"}, msc4306=True
|
||||
)
|
||||
self.assertFalse(
|
||||
evaluator.matches(
|
||||
{
|
||||
"kind": "io.element.msc4306.thread_subscription",
|
||||
"subscribed": True,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
msc4306_thread_subscription_state=None,
|
||||
)
|
||||
)
|
||||
self.assertFalse(
|
||||
evaluator.matches(
|
||||
{
|
||||
"kind": "io.element.msc4306.thread_subscription",
|
||||
"subscribed": False,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
msc4306_thread_subscription_state=None,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class TestBulkPushRuleEvaluator(unittest.HomeserverTestCase):
|
||||
"""Tests for the bulk push rule evaluator"""
|
||||
|
||||
@@ -2527,84 +2527,6 @@ class DownloadAndThumbnailTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
)
|
||||
|
||||
@override_config({"media_redirect": {"enabled": True, "secret": "supersecret"}})
|
||||
def test_download_signed_redirect(self) -> None:
|
||||
"""When media redirects are enabled, we should redirect to a signed URL for downloads"""
|
||||
# Create local media
|
||||
content = io.BytesIO(b"file_to_stream")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_upload",
|
||||
content,
|
||||
14,
|
||||
UserID.from_string(f"@user:{self.hs.hostname}"),
|
||||
)
|
||||
)
|
||||
|
||||
# Request the download
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_matrix/client/v1/media/download/{self.hs.hostname}/{content_uri.media_id}",
|
||||
shorthand=False,
|
||||
access_token=self.tok,
|
||||
)
|
||||
|
||||
# Should get a redirect response
|
||||
self.assertEqual(channel.code, 307)
|
||||
|
||||
# Check the Location header for the signed URL
|
||||
location_headers = channel.headers.getRawHeaders("Location")
|
||||
self.assertIsNotNone(location_headers)
|
||||
assert location_headers is not None
|
||||
self.assertEqual(len(location_headers), 1)
|
||||
location = location_headers[0]
|
||||
|
||||
# Verify the signed URL format
|
||||
self.assertRegex(
|
||||
location,
|
||||
rf"^https://test/_synapse/media/download/{content_uri.media_id}\?exp=\d+&sig=\w+$",
|
||||
)
|
||||
|
||||
@override_config({"media_redirect": {"enabled": True, "secret": "supersecret"}})
|
||||
def test_thumbnail_signed_redirect(self) -> None:
|
||||
"""When media redirects are enabled, we should redirect to a signed URL for thumbnails (scaled)"""
|
||||
# Create local media with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_png_thumbnail",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string(f"@user:{self.hs.hostname}"),
|
||||
)
|
||||
)
|
||||
|
||||
# Request a scaled thumbnail
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_matrix/client/v1/media/thumbnail/{self.hs.hostname}/{content_uri.media_id}?width=32&height=32",
|
||||
shorthand=False,
|
||||
access_token=self.tok,
|
||||
)
|
||||
|
||||
# Should get a redirect response
|
||||
self.assertEqual(channel.code, 307)
|
||||
|
||||
# Check the Location header for the signed URL
|
||||
location_headers = channel.headers.getRawHeaders("Location")
|
||||
self.assertIsNotNone(location_headers)
|
||||
assert location_headers is not None
|
||||
self.assertEqual(len(location_headers), 1)
|
||||
location = location_headers[0]
|
||||
|
||||
# Verify the signed URL format
|
||||
self.assertRegex(
|
||||
location,
|
||||
rf"^https://test/_synapse/media/thumbnail/{content_uri.media_id}/[^?]+\?exp=\d+&sig=\w+$",
|
||||
)
|
||||
|
||||
|
||||
configs = [
|
||||
{"extra_config": {"dynamic_thumbnails": True}},
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
@@ -1,282 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import io
|
||||
from typing import Dict
|
||||
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from synapse.rest.synapse.client import build_synapse_client_resource_tree
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class SignedDownloadTestCase(unittest.HomeserverTestCase):
|
||||
def create_resource_dict(self) -> Dict[str, Resource]:
|
||||
d = super().create_resource_dict()
|
||||
d.update(build_synapse_client_resource_tree(self.hs))
|
||||
return d
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
config = self.default_config()
|
||||
config["media_redirect"] = {
|
||||
"enabled": True,
|
||||
"secret": "supersecret",
|
||||
}
|
||||
|
||||
return self.setup_test_homeserver(config=config)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
super().prepare(reactor, clock, hs)
|
||||
self.media_repo = hs.get_media_repository()
|
||||
|
||||
def test_valid_signed_download(self) -> None:
|
||||
"""Test that a valid signed URL returns the media content"""
|
||||
# Create test content
|
||||
content = io.BytesIO(b"test file content")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"some_name.txt",
|
||||
content,
|
||||
17,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL
|
||||
exp = self.clock.time_msec() + 3600000 # 1 hour from now
|
||||
key = self.media_repo.download_media_key(
|
||||
media_id=content_uri.media_id, exp=exp, name="test_file.txt"
|
||||
)
|
||||
sig = self.media_repo.compute_media_request_signature(key)
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}/test_file.txt?exp={exp}&sig={sig}",
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(channel.result["body"], b"test file content")
|
||||
|
||||
# Check content type
|
||||
content_type = channel.headers.getRawHeaders("Content-Type")
|
||||
assert content_type is not None
|
||||
self.assertIn("text/plain", content_type[0])
|
||||
|
||||
# Check content disposition
|
||||
content_disposition = channel.headers.getRawHeaders("Content-Disposition")
|
||||
assert content_disposition is not None
|
||||
self.assertIn("test_file.txt", content_disposition[0])
|
||||
|
||||
def test_valid_signed_download_without_filename(self) -> None:
|
||||
"""Test that a valid signed URL works without a filename"""
|
||||
# Create test content
|
||||
content = io.BytesIO(b"test file content")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_file.txt",
|
||||
content,
|
||||
17,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL without filename
|
||||
exp = self.clock.time_msec() + 3600000 # 1 hour from now
|
||||
key = self.media_repo.download_media_key(
|
||||
media_id=content_uri.media_id, exp=exp, name=None
|
||||
)
|
||||
sig = self.media_repo.compute_media_request_signature(key)
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertEqual(channel.result["body"], b"test file content")
|
||||
|
||||
def test_invalid_signature(self) -> None:
|
||||
"""Test that an invalid signature returns 404"""
|
||||
# Create test content
|
||||
content = io.BytesIO(b"test file content")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_file.txt",
|
||||
content,
|
||||
17,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Use a properly formatted but invalid signature (64 hex chars like a real signature)
|
||||
exp = self.clock.time_msec() + 3600000 # 1 hour from now
|
||||
invalid_sig = "0" * 64 # Invalid but properly formatted signature
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={invalid_sig}",
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 404)
|
||||
|
||||
def test_expired_url(self) -> None:
|
||||
"""Test that an expired URL returns 404"""
|
||||
# Create test content
|
||||
content = io.BytesIO(b"test file content")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_file.txt",
|
||||
content,
|
||||
17,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL that will expire soon
|
||||
exp = self.clock.time_msec() + 1000 # 1 second from now
|
||||
key = self.media_repo.download_media_key(
|
||||
media_id=content_uri.media_id, exp=exp, name=None
|
||||
)
|
||||
sig = self.media_repo.compute_media_request_signature(key)
|
||||
|
||||
# Advance the clock to make the URL expired
|
||||
self.reactor.advance(2) # Advance 2 seconds
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 404)
|
||||
|
||||
def test_missing_parameters(self) -> None:
|
||||
"""Test that missing exp or sig parameters return 404"""
|
||||
# Create test content
|
||||
content = io.BytesIO(b"test file content")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_file.txt",
|
||||
content,
|
||||
17,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Test missing exp parameter
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?sig=somesig",
|
||||
shorthand=False,
|
||||
)
|
||||
self.assertEqual(channel.code, 400) # Bad request for missing required param
|
||||
|
||||
# Test missing sig parameter
|
||||
exp = self.clock.time_msec() + 3600000
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}",
|
||||
shorthand=False,
|
||||
)
|
||||
self.assertEqual(channel.code, 400) # Bad request for missing required param
|
||||
|
||||
def test_nonexistent_media(self) -> None:
|
||||
"""Test that requesting non-existent media returns 404"""
|
||||
# Generate a signed URL for non-existent media
|
||||
fake_media_id = "nonexistent"
|
||||
exp = self.clock.time_msec() + 3600000 # 1 hour from now
|
||||
key = self.media_repo.download_media_key(
|
||||
media_id=fake_media_id, exp=exp, name=None
|
||||
)
|
||||
sig = self.media_repo.compute_media_request_signature(key)
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{fake_media_id}?exp={exp}&sig={sig}",
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 404)
|
||||
|
||||
def test_etag_functionality(self) -> None:
|
||||
"""Test that ETag functionality works properly"""
|
||||
# Create test content
|
||||
content = io.BytesIO(b"test file content for etag")
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"text/plain",
|
||||
"test_file.txt",
|
||||
content,
|
||||
26,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL
|
||||
exp = self.clock.time_msec() + 3600000 # 1 hour from now
|
||||
key = self.media_repo.download_media_key(
|
||||
media_id=content_uri.media_id, exp=exp, name=None
|
||||
)
|
||||
sig = self.media_repo.compute_media_request_signature(key)
|
||||
|
||||
# Make the first request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Check the response has an ETag and a Cache-Control header
|
||||
self.assertEqual(channel.code, 200)
|
||||
etag_headers = channel.headers.getRawHeaders("ETag")
|
||||
assert etag_headers is not None
|
||||
etag = etag_headers[0]
|
||||
cache_control = channel.headers.getRawHeaders("Cache-Control")
|
||||
self.assertIsNotNone(cache_control)
|
||||
|
||||
# Make a second request with If-None-Match header
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
|
||||
shorthand=False,
|
||||
custom_headers=[("If-None-Match", etag)],
|
||||
)
|
||||
|
||||
# Should get 304 Not Modified
|
||||
self.assertEqual(channel.code, 304)
|
||||
self.assertNotIn("body", channel.result)
|
||||
@@ -1,356 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import io
|
||||
import re
|
||||
from typing import Dict
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from synapse.rest.synapse.client import build_synapse_client_resource_tree
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.media.test_media_storage import small_png
|
||||
|
||||
|
||||
class SignedThumbnailTestCase(unittest.HomeserverTestCase):
|
||||
def create_resource_dict(self) -> Dict[str, Resource]:
|
||||
d = super().create_resource_dict()
|
||||
d.update(build_synapse_client_resource_tree(self.hs))
|
||||
return d
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
config = self.default_config()
|
||||
config["media_redirect"] = {
|
||||
"enabled": True,
|
||||
"secret": "supersecret",
|
||||
}
|
||||
config["dynamic_thumbnails"] = True
|
||||
|
||||
return self.setup_test_homeserver(config=config)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
super().prepare(reactor, clock, hs)
|
||||
self.media_repo = hs.get_media_repository()
|
||||
|
||||
def test_valid_signed_thumbnail_scaled(self) -> None:
|
||||
"""Test that a valid signed URL returns the thumbnail content (scaled)"""
|
||||
# Create test content with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_image.png",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL for scaled thumbnail
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
"method": "scale",
|
||||
"type": "image/png",
|
||||
}
|
||||
signed_url = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id=content_uri.media_id, parameters=params_dict
|
||||
)
|
||||
# Extract the path and query from the signed URL
|
||||
url_path = signed_url.split("https://test", 1)[1]
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 200)
|
||||
|
||||
# Check content type
|
||||
content_type = channel.headers.getRawHeaders("Content-Type")
|
||||
assert content_type is not None
|
||||
self.assertEqual(content_type[0], "image/png")
|
||||
|
||||
# Check that we got actual thumbnail data
|
||||
self.assertIsNotNone(channel.result.get("body"))
|
||||
self.assertGreater(len(channel.result["body"]), 0)
|
||||
|
||||
def test_valid_signed_thumbnail_cropped(self) -> None:
|
||||
"""Test that a valid signed URL returns the thumbnail content (cropped)"""
|
||||
# Create test content with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_image.png",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL for cropped thumbnail
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
"method": "crop",
|
||||
"type": "image/png",
|
||||
}
|
||||
signed_url = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id=content_uri.media_id, parameters=params_dict
|
||||
)
|
||||
# Extract the path and query from the signed URL
|
||||
url_path = signed_url.split("https://test", 1)[1]
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 200)
|
||||
|
||||
# Check content type
|
||||
content_type = channel.headers.getRawHeaders("Content-Type")
|
||||
assert content_type is not None
|
||||
self.assertEqual(content_type[0], "image/png")
|
||||
|
||||
# Check that we got actual thumbnail data
|
||||
self.assertIsNotNone(channel.result.get("body"))
|
||||
self.assertGreater(len(channel.result["body"]), 0)
|
||||
|
||||
def test_invalid_signature(self) -> None:
|
||||
"""Test that an invalid signature returns 404"""
|
||||
# Create test content with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_image.png",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
}
|
||||
signed_url = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id=content_uri.media_id, parameters=params_dict
|
||||
)
|
||||
|
||||
# Extract the path and query from the signed URL
|
||||
url_path = signed_url.split("https://test", 1)[1]
|
||||
invalid_sig = "0" * 64 # Invalid but properly formatted signature
|
||||
url_path = re.sub(r"sig=\w+", "sig=" + invalid_sig, url_path)
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 404)
|
||||
|
||||
def test_expired_url(self) -> None:
|
||||
"""Test that an expired URL returns 404"""
|
||||
# Create test content with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_image.png",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
}
|
||||
signed_url = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id=content_uri.media_id, parameters=params_dict
|
||||
)
|
||||
# Extract the path and query from the signed URL
|
||||
url_path = signed_url.split("https://test", 1)[1]
|
||||
|
||||
# Make a first request, it should work
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 200)
|
||||
|
||||
# Advance the clock to make the URL expired
|
||||
self.reactor.advance(
|
||||
10 * 60 + 1
|
||||
) # Advance 10 minutes + 1 second (TTL is 10 minutes by default)
|
||||
|
||||
# Make a second request, it should fail
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 404)
|
||||
|
||||
def test_missing_parameters(self) -> None:
|
||||
"""Test that missing exp or sig parameters return 400"""
|
||||
# Create test content with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_image.png",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
"method": "scale",
|
||||
"type": "image/png",
|
||||
}
|
||||
parameters = urlencode(params_dict)
|
||||
|
||||
# Test missing exp parameter
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/thumbnail/{content_uri.media_id}/{parameters}?sig=somesig",
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
self.assertEqual(channel.code, 400) # Bad request for missing required param
|
||||
|
||||
# Test missing sig parameter
|
||||
exp = self.clock.time_msec() + 3600000
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f"/_synapse/media/thumbnail/{content_uri.media_id}/{parameters}?exp={exp}",
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
self.assertEqual(channel.code, 400) # Bad request for missing required param
|
||||
|
||||
def test_nonexistent_media(self) -> None:
|
||||
"""Test that requesting non-existent media returns 404"""
|
||||
# Generate a signed URL for non-existent media
|
||||
fake_media_id = "nonexistent"
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
}
|
||||
signed_url = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id=fake_media_id, parameters=params_dict
|
||||
)
|
||||
# Extract the path and query from the signed URL
|
||||
url_path = signed_url.split("https://test", 1)[1]
|
||||
|
||||
# Make the request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response
|
||||
self.assertEqual(channel.code, 404)
|
||||
|
||||
def test_etag_functionality(self) -> None:
|
||||
"""Test that ETag functionality works properly"""
|
||||
# Create test content with an image
|
||||
content = io.BytesIO(small_png.data)
|
||||
content_uri = self.get_success(
|
||||
self.media_repo.create_or_update_content(
|
||||
"image/png",
|
||||
"test_image.png",
|
||||
content,
|
||||
67,
|
||||
UserID.from_string("@user:test"),
|
||||
)
|
||||
)
|
||||
|
||||
# Generate a signed URL
|
||||
params_dict = {
|
||||
"width": "32",
|
||||
"height": "32",
|
||||
}
|
||||
signed_url = self.media_repo.signed_location_for_thumbnail(
|
||||
media_id=content_uri.media_id, parameters=params_dict
|
||||
)
|
||||
# Extract the path and query from the signed URL
|
||||
url_path = signed_url.split("https://test", 1)[1]
|
||||
|
||||
# Make a first request
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Check the response has an ETag and a Cache-Control header
|
||||
self.assertEqual(channel.code, 200)
|
||||
etag_headers = channel.headers.getRawHeaders("ETag")
|
||||
assert etag_headers is not None
|
||||
etag = etag_headers[0]
|
||||
cache_control = channel.headers.getRawHeaders("Cache-Control")
|
||||
self.assertIsNotNone(cache_control)
|
||||
|
||||
# Make a second request with If-None-Match header
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
url_path,
|
||||
shorthand=False,
|
||||
custom_headers=[("If-None-Match", etag)],
|
||||
)
|
||||
self.pump()
|
||||
|
||||
# Should get 304 Not Modified
|
||||
self.assertEqual(channel.code, 304)
|
||||
self.assertNotIn("body", channel.result)
|
||||
@@ -327,3 +327,42 @@ class ThreadSubscriptionsTestCase(unittest.HomeserverTestCase):
|
||||
self.assertFalse(
|
||||
func(autosub=EventOrderings(-50, 2), unsubscribed_at=EventOrderings(2, 1))
|
||||
)
|
||||
|
||||
def test_get_subscribers_to_thread(self) -> None:
|
||||
"""
|
||||
Test getting all subscribers to a thread at once.
|
||||
|
||||
To check cache invalidations are correct, we do multiple
|
||||
step-by-step rounds of subscription changes and assertions.
|
||||
"""
|
||||
other_user_id = "@other_user:test"
|
||||
|
||||
subscribers = self.get_success(
|
||||
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
|
||||
)
|
||||
self.assertEqual(subscribers, frozenset())
|
||||
|
||||
self._subscribe(
|
||||
self.thread_root_id, automatic_event_orderings=None, user_id=self.user_id
|
||||
)
|
||||
|
||||
subscribers = self.get_success(
|
||||
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
|
||||
)
|
||||
self.assertEqual(subscribers, frozenset((self.user_id,)))
|
||||
|
||||
self._subscribe(
|
||||
self.thread_root_id, automatic_event_orderings=None, user_id=other_user_id
|
||||
)
|
||||
|
||||
subscribers = self.get_success(
|
||||
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
|
||||
)
|
||||
self.assertEqual(subscribers, frozenset((self.user_id, other_user_id)))
|
||||
|
||||
self._unsubscribe(self.thread_root_id, user_id=self.user_id)
|
||||
|
||||
subscribers = self.get_success(
|
||||
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
|
||||
)
|
||||
self.assertEqual(subscribers, frozenset((other_user_id,)))
|
||||
|
||||
Reference in New Issue
Block a user