1
0

Compare commits

..

16 Commits

Author SHA1 Message Date
Devon Hudson
c9f212ab44 Revert "Fix LaterGauge metrics to collect from all servers (#18751)"
This reverts commit 076db0ab49.
2025-08-06 15:49:26 -06:00
Devon Hudson
85e3adba86 Revert "Temporarily disable problem test"
This reverts commit 4333eff1d5.
2025-08-06 15:49:03 -06:00
Devon Hudson
d3bdf8b091 Revert "Temporarily disable all tests that call generate_latest"
This reverts commit d8ab5434d5.
2025-08-06 15:48:36 -06:00
Devon Hudson
d8ab5434d5 Temporarily disable all tests that call generate_latest 2025-08-06 15:21:42 -06:00
Devon Hudson
4333eff1d5 Temporarily disable problem test 2025-08-06 14:39:35 -06:00
Devon Hudson
c9f04f3484 Re-enable parallel 2025-08-06 14:39:18 -06:00
Olivier 'reivilibre
a387d6ecf8 better monitor 2025-08-06 18:03:31 +01:00
Olivier 'reivilibre
9e473d9e38 fail slow 2025-08-06 18:02:30 +01:00
Olivier 'reivilibre
d2ea7e32f5 Revert "choose a test"
This reverts commit a256423553.
2025-08-06 18:02:11 +01:00
Olivier 'reivilibre
2db0f1e49b don't wait for lint 2025-08-06 17:58:14 +01:00
Olivier 'reivilibre
a256423553 choose a test 2025-08-06 17:56:54 +01:00
Olivier 'reivilibre
e91aa4fd2f debug diskspace 2025-08-06 17:12:49 +01:00
Olivier 'reivilibre
499c1631de no parallel? 2025-08-06 16:29:03 +01:00
Olivier 'reivilibre
4ecd9aba95 Newsfile
Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-08-06 16:07:32 +01:00
Olivier 'reivilibre
4e947d05ab Remove stray @DEBUG 2025-08-06 16:07:07 +01:00
reivilibre
6514381b02 Implement the push rules for experimental MSC4306: Thread Subscriptions. (#18762)
Follows: #18756

Implements: MSC4306

---------

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2025-08-06 15:33:52 +01:00
53 changed files with 615 additions and 1886 deletions

View File

@@ -373,7 +373,7 @@ jobs:
calculate-test-jobs:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
# needs: linting-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
@@ -393,6 +393,7 @@ jobs:
- changes
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }}
@@ -426,7 +427,24 @@ jobs:
if: ${{ matrix.job.postgres-version }}
timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done
- run: poetry run trial --jobs=6 tests
- run: |
(
while true; do
echo "......."
date
df -h | grep root
free -m
sleep 10
done
) &
MONITOR_PID=$!
poetry run trial --jobs=6 tests
STATUS=$?
kill $MONITOR_PID
exit $STATUS
env:
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
SYNAPSE_POSTGRES_HOST: /var/run/postgresql

View File

@@ -1 +0,0 @@
Fix `LaterGauge` metrics to collect from all servers.

View File

@@ -0,0 +1 @@
Implement the push rules for experimental [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-doc/issues/4306).

1
changelog.d/18787.misc Normal file
View File

@@ -0,0 +1 @@
CI debugging.

View File

@@ -1 +0,0 @@
Allow serving media with a redirect to an unauthenticated, short-lived, signed URL.

View File

@@ -127,7 +127,6 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_synapse/admin/v1/quarantine_media/.*$",
"^/_matrix/client/v1/media/.*$",
"^/_matrix/federation/v1/media/.*$",
"^/_synapse/media/",
],
# The first configured media worker will run the media background jobs
"shared_extra_conf": {

View File

@@ -2182,39 +2182,6 @@ media_upload_limits:
max_size: 500M
```
---
### `media_redirect`
*(object)* When enabled, Synapse will use HTTP redirect responses to serve media instead of directly serving the media from the media store. This can help with caching, but requires /_synapse/media/* to be routed to the media worker.
This setting has the following sub-options:
* `enabled` (boolean): Enables the media redirect feature. If enabled, you must specify a `media_redirect.secret` or `media_redirect.secret_path`. Defaults to `false`.
* `secret` (string|null): Secret used to sign media redirect URLs. This must be set if `media_redirect.enabled` is set. Defaults to `null`.
* `secret_path` (string|null): An alternative to `media_redirect.secret` that specifies a file containing the secret. Defaults to `null`.
* `ttl` (duration): How long the redirect URLs should be valid for. Defaults to `"10m"`.
Default configuration:
```yaml
media_redirect:
enabled: false
```
Example configurations:
```yaml
media_redirect:
enabled: true
secret: aiCh9gu4Zahvueveisooquu7chaiw9Ee
```
```yaml
media_redirect:
enabled: true
secret_path: /path/to/secrets/file
```
---
### `max_image_pixels`
*(byte size)* Maximum number of pixels that will be thumbnailed. Defaults to `"32M"`.

View File

@@ -765,7 +765,6 @@ Handles the media repository. It can handle all endpoints starting with:
/_matrix/media/
/_matrix/client/v1/media/
/_matrix/federation/v1/media/
/_synapse/media/
... and the following regular expressions matching media-specific administration APIs:

View File

@@ -61,6 +61,7 @@ fn bench_match_exact(b: &mut Bencher) {
vec![],
false,
false,
false,
)
.unwrap();
@@ -71,10 +72,10 @@ fn bench_match_exact(b: &mut Bencher) {
},
));
let matched = eval.match_condition(&condition, None, None).unwrap();
let matched = eval.match_condition(&condition, None, None, None).unwrap();
assert!(matched, "Didn't match");
b.iter(|| eval.match_condition(&condition, None, None).unwrap());
b.iter(|| eval.match_condition(&condition, None, None, None).unwrap());
}
#[bench]
@@ -107,6 +108,7 @@ fn bench_match_word(b: &mut Bencher) {
vec![],
false,
false,
false,
)
.unwrap();
@@ -117,10 +119,10 @@ fn bench_match_word(b: &mut Bencher) {
},
));
let matched = eval.match_condition(&condition, None, None).unwrap();
let matched = eval.match_condition(&condition, None, None, None).unwrap();
assert!(matched, "Didn't match");
b.iter(|| eval.match_condition(&condition, None, None).unwrap());
b.iter(|| eval.match_condition(&condition, None, None, None).unwrap());
}
#[bench]
@@ -153,6 +155,7 @@ fn bench_match_word_miss(b: &mut Bencher) {
vec![],
false,
false,
false,
)
.unwrap();
@@ -163,10 +166,10 @@ fn bench_match_word_miss(b: &mut Bencher) {
},
));
let matched = eval.match_condition(&condition, None, None).unwrap();
let matched = eval.match_condition(&condition, None, None, None).unwrap();
assert!(!matched, "Didn't match");
b.iter(|| eval.match_condition(&condition, None, None).unwrap());
b.iter(|| eval.match_condition(&condition, None, None, None).unwrap());
}
#[bench]
@@ -199,6 +202,7 @@ fn bench_eval_message(b: &mut Bencher) {
vec![],
false,
false,
false,
)
.unwrap();
@@ -210,7 +214,8 @@ fn bench_eval_message(b: &mut Bencher) {
false,
false,
false,
false,
);
b.iter(|| eval.run(&rules, Some("bob"), Some("person")));
b.iter(|| eval.run(&rules, Some("bob"), Some("person"), None));
}

View File

@@ -290,6 +290,26 @@ pub const BASE_APPEND_CONTENT_RULES: &[PushRule] = &[PushRule {
}];
pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
PushRule {
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.unsubscribed_thread"),
priority_class: 1,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::Msc4306ThreadSubscription { subscribed: false },
)]),
actions: Cow::Borrowed(&[]),
default: true,
default_enabled: true,
},
PushRule {
rule_id: Cow::Borrowed("global/content/.io.element.msc4306.rule.subscribed_thread"),
priority_class: 1,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::Msc4306ThreadSubscription { subscribed: true },
)]),
actions: Cow::Borrowed(&[Action::Notify, SOUND_ACTION]),
default: true,
default_enabled: true,
},
PushRule {
rule_id: Cow::Borrowed("global/underride/.m.rule.call"),
priority_class: 1,

View File

@@ -106,8 +106,11 @@ pub struct PushRuleEvaluator {
/// flag as MSC1767 (extensible events core).
msc3931_enabled: bool,
// If MSC4210 (remove legacy mentions) is enabled.
/// If MSC4210 (remove legacy mentions) is enabled.
msc4210_enabled: bool,
/// If MSC4306 (thread subscriptions) is enabled.
msc4306_enabled: bool,
}
#[pymethods]
@@ -126,6 +129,7 @@ impl PushRuleEvaluator {
room_version_feature_flags,
msc3931_enabled,
msc4210_enabled,
msc4306_enabled,
))]
pub fn py_new(
flattened_keys: BTreeMap<String, JsonValue>,
@@ -138,6 +142,7 @@ impl PushRuleEvaluator {
room_version_feature_flags: Vec<String>,
msc3931_enabled: bool,
msc4210_enabled: bool,
msc4306_enabled: bool,
) -> Result<Self, Error> {
let body = match flattened_keys.get("content.body") {
Some(JsonValue::Value(SimpleJsonValue::Str(s))) => s.clone().into_owned(),
@@ -156,6 +161,7 @@ impl PushRuleEvaluator {
room_version_feature_flags,
msc3931_enabled,
msc4210_enabled,
msc4306_enabled,
})
}
@@ -167,12 +173,19 @@ impl PushRuleEvaluator {
///
/// Returns the set of actions, if any, that match (filtering out any
/// `dont_notify` and `coalesce` actions).
#[pyo3(signature = (push_rules, user_id=None, display_name=None))]
///
/// msc4306_thread_subscription_state: (Only populated if MSC4306 is enabled)
/// The thread subscription state corresponding to the thread containing this event.
/// - `None` if the event is not in a thread, or if MSC4306 is disabled.
/// - `Some(true)` if the event is in a thread and the user has a subscription for that thread
/// - `Some(false)` if the event is in a thread and the user does NOT have a subscription for that thread
#[pyo3(signature = (push_rules, user_id=None, display_name=None, msc4306_thread_subscription_state=None))]
pub fn run(
&self,
push_rules: &FilteredPushRules,
user_id: Option<&str>,
display_name: Option<&str>,
msc4306_thread_subscription_state: Option<bool>,
) -> Vec<Action> {
'outer: for (push_rule, enabled) in push_rules.iter() {
if !enabled {
@@ -204,7 +217,12 @@ impl PushRuleEvaluator {
Condition::Known(KnownCondition::RoomVersionSupports { feature: _ }),
);
match self.match_condition(condition, user_id, display_name) {
match self.match_condition(
condition,
user_id,
display_name,
msc4306_thread_subscription_state,
) {
Ok(true) => {}
Ok(false) => continue 'outer,
Err(err) => {
@@ -237,14 +255,20 @@ impl PushRuleEvaluator {
}
/// Check if the given condition matches.
#[pyo3(signature = (condition, user_id=None, display_name=None))]
#[pyo3(signature = (condition, user_id=None, display_name=None, msc4306_thread_subscription_state=None))]
fn matches(
&self,
condition: Condition,
user_id: Option<&str>,
display_name: Option<&str>,
msc4306_thread_subscription_state: Option<bool>,
) -> bool {
match self.match_condition(&condition, user_id, display_name) {
match self.match_condition(
&condition,
user_id,
display_name,
msc4306_thread_subscription_state,
) {
Ok(true) => true,
Ok(false) => false,
Err(err) => {
@@ -262,6 +286,7 @@ impl PushRuleEvaluator {
condition: &Condition,
user_id: Option<&str>,
display_name: Option<&str>,
msc4306_thread_subscription_state: Option<bool>,
) -> Result<bool, Error> {
let known_condition = match condition {
Condition::Known(known) => known,
@@ -393,6 +418,13 @@ impl PushRuleEvaluator {
&& self.room_version_feature_flags.contains(&flag)
}
}
KnownCondition::Msc4306ThreadSubscription { subscribed } => {
if !self.msc4306_enabled {
false
} else {
msc4306_thread_subscription_state == Some(*subscribed)
}
}
};
Ok(result)
@@ -536,10 +568,11 @@ fn push_rule_evaluator() {
vec![],
true,
false,
false,
)
.unwrap();
let result = evaluator.run(&FilteredPushRules::default(), None, Some("bob"));
let result = evaluator.run(&FilteredPushRules::default(), None, Some("bob"), None);
assert_eq!(result.len(), 3);
}
@@ -566,6 +599,7 @@ fn test_requires_room_version_supports_condition() {
flags,
true,
false,
false,
)
.unwrap();
@@ -575,6 +609,7 @@ fn test_requires_room_version_supports_condition() {
&FilteredPushRules::default(),
Some("@bob:example.org"),
None,
None,
);
assert_eq!(result.len(), 3);
@@ -593,7 +628,17 @@ fn test_requires_room_version_supports_condition() {
};
let rules = PushRules::new(vec![custom_rule]);
result = evaluator.run(
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false, false),
&FilteredPushRules::py_new(
rules,
BTreeMap::new(),
true,
false,
true,
false,
false,
false,
),
None,
None,
None,
);

View File

@@ -369,6 +369,10 @@ pub enum KnownCondition {
RoomVersionSupports {
feature: Cow<'static, str>,
},
#[serde(rename = "io.element.msc4306.thread_subscription")]
Msc4306ThreadSubscription {
subscribed: bool,
},
}
impl<'source> IntoPyObject<'source> for Condition {
@@ -547,11 +551,13 @@ pub struct FilteredPushRules {
msc3664_enabled: bool,
msc4028_push_encrypted_events: bool,
msc4210_enabled: bool,
msc4306_enabled: bool,
}
#[pymethods]
impl FilteredPushRules {
#[new]
#[allow(clippy::too_many_arguments)]
pub fn py_new(
push_rules: PushRules,
enabled_map: BTreeMap<String, bool>,
@@ -560,6 +566,7 @@ impl FilteredPushRules {
msc3664_enabled: bool,
msc4028_push_encrypted_events: bool,
msc4210_enabled: bool,
msc4306_enabled: bool,
) -> Self {
Self {
push_rules,
@@ -569,6 +576,7 @@ impl FilteredPushRules {
msc3664_enabled,
msc4028_push_encrypted_events,
msc4210_enabled,
msc4306_enabled,
}
}
@@ -619,6 +627,10 @@ impl FilteredPushRules {
return false;
}
if !self.msc4306_enabled && rule.rule_id.contains("/.io.element.msc4306.rule.") {
return false;
}
true
})
.map(|r| {

View File

@@ -2433,44 +2433,6 @@ properties:
max_size: 100M
- time_period: 1w
max_size: 500M
media_redirect:
type: object
description: >-
When enabled, Synapse will use HTTP redirect responses to serve media
instead of directly serving the media from the media store. This can help
with caching, but requires /_synapse/media/* to be routed to the media
worker.
properties:
enabled:
type: boolean
description: >-
Enables the media redirect feature. If enabled, you must specify a
`media_redirect.secret` or `media_redirect.secret_path`.
default: false
secret:
type: ["string", "null"]
description: >-
Secret used to sign media redirect URLs. This must be set if
`media_redirect.enabled` is set.
default: null
secret_path:
type: ["string", "null"]
description: >-
An alternative to `media_redirect.secret` that specifies a file
containing the secret.
default: null
ttl:
$ref: "#/$defs/duration"
description: >-
How long the redirect URLs should be valid for.
default: 10m
default:
enabled: false
examples:
- enabled: true
secret: aiCh9gu4Zahvueveisooquu7chaiw9Ee
- enabled: true
secret_path: /path/to/secrets/file
max_image_pixels:
$ref: "#/$defs/bytes"
description: Maximum number of pixels that will be thumbnailed.

View File

@@ -21,7 +21,7 @@
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Tuple
import attr
@@ -30,7 +30,7 @@ from synapse.types import JsonDict
from synapse.util.check_dependencies import check_requirements
from synapse.util.module_loader import load_module
from ._base import Config, ConfigError, read_file
from ._base import Config, ConfigError
logger = logging.getLogger(__name__)
@@ -130,9 +130,7 @@ class MediaUploadLimit:
class ContentRepositoryConfig(Config):
section = "media"
def read_config(
self, config: JsonDict, allow_secrets_in_config: bool, **kwargs: Any
) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# Only enable the media repo if either the media repo is enabled or the
# current worker app is the media repo.
if (
@@ -292,55 +290,6 @@ class ContentRepositoryConfig(Config):
self.enable_authenticated_media = config.get("enable_authenticated_media", True)
redirect_config = config.get("media_redirect", {})
if redirect_config is None:
redirect_config = {}
if not isinstance(redirect_config, dict):
raise ConfigError(
"`media_redirect` must be a dictionary",
("media_redirect",),
)
# Whether we should use a redirect to /_synapse/media/* when serving
# media for better caching. This requires this endpoint to be routed to
# the media worker.
self.use_redirect = redirect_config.get("enabled", False)
redirect_secret = redirect_config.get("secret")
if redirect_secret and not allow_secrets_in_config:
raise ConfigError(
"Config options that expect an in-line secret as value are disabled",
("media_redirect", "secret"),
)
if redirect_secret is not None and not isinstance(redirect_secret, str):
raise ConfigError(
"`media_redirect.secret` must be a string.",
("media_redirect", "secret"),
)
redirect_secret_path = redirect_config.get("secret_path")
if redirect_secret_path:
if redirect_secret:
raise ConfigError(
"You have configured both `media_redirect.secret` and `media_redirect.secret_path`.\n"
"These are mutually incompatible.",
("media_redirect", "secret_path"),
)
redirect_secret = read_file(
redirect_secret_path, ("media_redirect", "secret_path")
).strip()
self.redirect_secret: Optional[bytes] = (
redirect_secret.encode("utf-8") if redirect_secret else None
)
if self.use_redirect and self.redirect_secret is None:
raise ConfigError(
"You have configured `media_redirect.enabled` but not set `media_redirect.secret` or `media_redirect.secret_path`."
)
self.redirect_ttl_ms = self.parse_duration(redirect_config.get("ttl", "10m"))
self.media_upload_limits: List[MediaUploadLimit] = []
for limit_config in config.get("media_upload_limits", []):
time_period_ms = self.parse_duration(limit_config["time_period"])

View File

@@ -37,7 +37,6 @@ Events are replicated via a separate events stream.
"""
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
@@ -68,25 +67,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
@@ -131,15 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
lambda: {(self.server_name,): len(queue)}
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
)
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -199,24 +199,6 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
@@ -416,28 +398,38 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
transaction_queue_pending_destinations_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
}
},
)
transaction_queue_pending_pdus_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
}
},
)
transaction_queue_pending_edus_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
}
},
)
self._is_processing = False

View File

@@ -826,12 +826,7 @@ class FederationMediaDownloadServlet(BaseFederationServerServlet):
)
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
await self.media_repo.get_local_media(
request,
media_id,
None,
max_timeout_ms,
federation=True,
may_redirect=True,
request, media_id, None, max_timeout_ms, federation=True
)
@@ -878,27 +873,11 @@ class FederationMediaThumbnailServlet(BaseFederationServerServlet):
if self.dynamic_thumbnails:
await self.thumbnail_provider.select_or_generate_local_thumbnail(
request,
media_id,
width,
height,
method,
m_type,
max_timeout_ms,
for_federation=True,
may_redirect=True,
request, media_id, width, height, method, m_type, max_timeout_ms, True
)
else:
await self.thumbnail_provider.respond_local_thumbnail(
request,
media_id,
width,
height,
method,
m_type,
max_timeout_ms,
for_federation=True,
may_redirect=True,
request, media_id, width, height, method, m_type, max_timeout_ms, True
)
self.media_repo.mark_recently_accessed(None, media_id)

View File

@@ -173,18 +173,6 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -791,8 +779,11 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence
)
presence_user_to_current_state_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_current_state)}
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -891,8 +882,11 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000,
)
presence_wheel_timer_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.wheel_timer)}
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers

View File

@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts
in_flight_requests = LaterGauge(
LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(_get_in_flight_counts)
class RequestMetrics:

View File

@@ -972,25 +972,6 @@ def set_corp_headers(request: Request) -> None:
request.setHeader(b"Cross-Origin-Resource-Policy", b"cross-origin")
def set_headers_for_media_response(request: "SynapseRequest") -> None:
"""Set the appropriate headers when serving media responses to clients"""
set_cors_headers(request)
set_corp_headers(request)
request.setHeader(
b"Content-Security-Policy",
b"sandbox;"
b" default-src 'none';"
b" script-src 'none';"
b" plugin-types application/pdf;"
b" style-src 'unsafe-inline';"
b" media-src 'self';"
b" object-src 'self';",
)
# Limited non-standard form of CSP for IE11
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
request.setHeader(b"Referrer-Policy", b"no-referrer")
def respond_with_html(request: Request, code: int, html: str) -> None:
"""
Wraps `respond_with_html_bytes` by first encoding HTML from a str to UTF-8 bytes.

View File

@@ -714,17 +714,6 @@ def parse_strings_from_args(
return default
@overload
def parse_string_from_args(
args: Mapping[bytes, Sequence[bytes]],
name: str,
default: str,
*,
allowed_values: Optional[StrCollection] = None,
encoding: str = "ascii",
) -> str: ...
@overload
def parse_string_from_args(
args: Mapping[bytes, Sequence[bytes]],

View File

@@ -36,7 +36,6 @@ from typing import (
Tuple,
Type,
)
from uuid import uuid4
import attr
from zope.interface import implementer
@@ -64,8 +63,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
CRLF = b"\r\n"
# list all text content types that will have the charset default to UTF-8 when
# none is given
TEXT_CONTENT_TYPES = [
@@ -444,35 +441,6 @@ async def respond_with_responder(
finish_request(request)
def respond_with_multipart_location(
request: SynapseRequest,
location: bytes,
) -> None:
"""Responds to a media request with a multipart/mixed response, with the
(empty) media metadata as a JSON object in the first part, and a `Location`
header as the second part
Args:
request: The incoming request.
location: The URL to give in the `Location` header.
"""
boundary = uuid4().hex.encode("ascii") # Pick a random boundary
request.setResponseCode(200)
request.setHeader(
b"Content-Type",
b"multipart/mixed; boundary=" + boundary,
)
request.write(b"--" + boundary + CRLF)
request.write(b"Content-Type: application/json" + CRLF + CRLF)
# This is an empty JSON object for now, we don't have any
# metadata associated with media yet
request.write(b"{}")
request.write(CRLF + b"--" + boundary + CRLF)
request.write(b"Location: " + location + CRLF + CRLF)
request.write(CRLF + b"--" + boundary + b"--" + CRLF)
finish_request(request)
def respond_with_304(request: SynapseRequest) -> None:
request.setResponseCode(304)

View File

@@ -19,17 +19,12 @@
# [This file includes modifications made by New Vector Limited]
#
#
import base64
import errno
import hashlib
import hmac
import logging
import os
import shutil
from http.client import TEMPORARY_REDIRECT
from io import BytesIO
from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple
from urllib.parse import urlencode
import attr
from matrix_common.types.mxc_uri import MXCUri
@@ -49,7 +44,7 @@ from synapse.api.errors import (
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.repository import ThumbnailRequirement
from synapse.http.server import respond_with_json, respond_with_redirect
from synapse.http.server import respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
from synapse.logging.opentracing import trace
@@ -60,7 +55,6 @@ from synapse.media._base import (
check_for_cached_entry_and_respond,
get_filename_from_headers,
respond_404,
respond_with_multipart_location,
respond_with_multipart_responder,
respond_with_responder,
)
@@ -75,7 +69,7 @@ from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
from synapse.media.url_previewer import UrlPreviewer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia
from synapse.types import StrSequence, UserID
from synapse.types import UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import random_string
@@ -130,8 +124,6 @@ class MediaRepository:
cfg=hs.config.ratelimiting.remote_media_downloads,
)
self._media_request_signature_secret = hs.config.media.redirect_secret
# List of StorageProviders where we should search for media and
# potentially upload to.
storage_providers = []
@@ -474,7 +466,6 @@ class MediaRepository:
max_timeout_ms: int,
allow_authenticated: bool = True,
federation: bool = False,
may_redirect: bool = False,
) -> None:
"""Responds to requests for local media, if exists, or returns 404.
@@ -486,12 +477,8 @@ class MediaRepository:
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
allow_authenticated: whether media marked as authenticated may be
served to this request
federation: whether the local media being fetched is for a
federation request
may_redirect: whether the request may issue a redirect instead of
serving the media directly
allow_authenticated: whether media marked as authenticated may be served to this request
federation: whether the local media being fetched is for a federation request
Returns:
Resolves once a response has successfully been written to request
@@ -506,19 +493,6 @@ class MediaRepository:
self.mark_recently_accessed(None, media_id)
if self.hs.config.media.use_redirect and may_redirect:
location = self.signed_location_for_media(media_id, name)
if federation:
respond_with_multipart_location(request, location.encode("ascii"))
else:
respond_with_redirect(
request, location.encode("ascii"), TEMPORARY_REDIRECT
)
return
# Once we've checked auth we can return early if the media is cached on
# the client
if check_for_cached_entry_and_respond(request):
@@ -1571,145 +1545,3 @@ class MediaRepository:
removed_media.append(media_id)
return removed_media, len(removed_media)
def download_media_key(
self,
media_id: str,
exp: int,
name: Optional[str] = None,
) -> StrSequence:
"""Get the key used for the download media signature
Args:
media_id: The media ID of the content. (This is the same as
the file_id for local content.)
exp: The expiration time of the signature, as a unix timestamp in ms.
name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response.
"""
if name is not None:
return ("download", media_id, str(exp), name)
return ("download", media_id, str(exp))
def signed_location_for_media(
self,
media_id: str,
name: Optional[str] = None,
) -> str:
"""Get the signed location for a media download
That URL will serve the media with no extra authentication for a limited
time, allowing the media to be cached by a CDN more easily.
"""
# XXX: One potential improvement here would be to round the `exp` to
# the nearest 5 minutes, so that a CDN/cache can always cache the
# media for a little bit
exp = self.clock.time_msec() + self.hs.config.media.redirect_ttl_ms
key = self.download_media_key(
media_id=media_id,
exp=exp,
name=name,
)
signature = self.compute_media_request_signature(key)
# This *could* in theory be a relative redirect, but Synapse has a
# bug where it always treats it as absolute. Because this is used
# for federation request, we can't just fix the bug in Synapse and
# use a relative redirect, we have to wait for the fix to be rolled
# out across the federation.
name_path = f"/{name}" if name else ""
return f"{self.hs.config.server.public_baseurl}_synapse/media/download/{media_id}{name_path}?exp={exp}&sig={signature}"
def thumbnail_media_key(
self, media_id: str, parameters: str, exp: int
) -> StrSequence:
"""Get the key used for the thumbnail media signature
Args:
media_id: The media ID of the content. (This is the same as
the file_id for local content.)
parameters: The parameters of the thumbnail request as a string,
e.g. "width=100&height=100"
exp: The expiration time of the signature, as a unix timestamp in ms.
"""
return ("thumbnail", media_id, parameters, str(exp))
def signed_location_for_thumbnail(
self,
media_id: str,
parameters: dict[str, str],
) -> str:
"""Get the signed location for a thumbnail media
That URL will serve the media with no extra authentication for a limited
time, allowing the media to be cached by a CDN more easily.
"""
# XXX: One potential improvement here would be to round the `exp` to
# the nearest 5 minutes, so that a CDN/cache can always cache the
# media for a little bit
exp = self.clock.time_msec() + self.hs.config.media.redirect_ttl_ms
parameters_str = base64.urlsafe_b64encode(
urlencode(sorted(parameters.items())).encode("utf-8")
).decode("ascii")
key = self.thumbnail_media_key(
media_id=media_id,
parameters=parameters_str,
exp=exp,
)
signature = self.compute_media_request_signature(key)
# This *could* in theory be a relative redirect, but Synapse has a
# bug where it always treats it as absolute. Because this is used
# for federation request, we can't just fix the bug in Synapse and
# use a relative redirect, we have to wait for the fix to be rolled
# out across the federation.
return f"{self.hs.config.server.public_baseurl}_synapse/media/thumbnail/{media_id}/{parameters_str}?exp={exp}&sig={signature}"
def compute_media_request_signature(self, payload: StrSequence) -> str:
"""Compute the signature for a signed media request
This currently uses a HMAC-SHA256 signature encoded as hex, but this could
be swapped to an asymmetric signature.
"""
assert self._media_request_signature_secret is not None, (
"media request signature secret not set"
)
# XXX: alternatively, we could do multiple rounds of HMAC with the
# different segments, like AWS SigV4 does
bytes_payload = "|".join(payload).encode("utf-8")
digest = hmac.digest(
key=self._media_request_signature_secret,
msg=bytes_payload,
digest=hashlib.sha256,
)
signature = digest.hex()
return signature
def verify_media_request_signature(
self, payload: StrSequence, signature: str
) -> bool:
"""Verify the signature for a signed media request
Returns True if the signature is valid, False otherwise.
"""
# In case there is no secret, we can't verify the signature
if self._media_request_signature_secret is None:
return False
bytes_payload = "|".join(payload).encode("utf-8")
decoded_signature = bytes.fromhex(signature)
computed_signature = hmac.digest(
key=self._media_request_signature_secret,
msg=bytes_payload,
digest=hashlib.sha256,
)
if len(computed_signature) != len(decoded_signature):
return False
return hmac.compare_digest(computed_signature, decoded_signature)

View File

@@ -59,7 +59,7 @@ from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer
from ..types import JsonDict
from ._base import CRLF, FileInfo, Responder
from ._base import FileInfo, Responder
from .filepath import MediaFilePaths
if TYPE_CHECKING:
@@ -68,6 +68,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
CRLF = b"\r\n"
class SHA256TransparentIOWriter:
"""Will generate a SHA256 hash from a source stream transparently.

View File

@@ -20,7 +20,6 @@
#
#
import logging
from http.client import TEMPORARY_REDIRECT
from io import BytesIO
from types import TracebackType
from typing import TYPE_CHECKING, List, Optional, Tuple, Type
@@ -29,7 +28,7 @@ from PIL import Image
from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error
from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
from synapse.http.server import respond_with_json, respond_with_redirect
from synapse.http.server import respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace
from synapse.media._base import (
@@ -38,7 +37,6 @@ from synapse.media._base import (
check_for_cached_entry_and_respond,
respond_404,
respond_with_file,
respond_with_multipart_location,
respond_with_multipart_responder,
respond_with_responder,
)
@@ -271,7 +269,6 @@ class ThumbnailProvider:
self.media_repo = media_repo
self.media_storage = media_storage
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails
async def respond_local_thumbnail(
@@ -285,7 +282,6 @@ class ThumbnailProvider:
max_timeout_ms: int,
for_federation: bool,
allow_authenticated: bool = True,
may_redirect: bool = False,
) -> None:
media_info = await self.media_repo.get_local_media_info(
request, media_id, max_timeout_ms
@@ -299,26 +295,6 @@ class ThumbnailProvider:
if media_info.authenticated:
raise NotFoundError()
if self.hs.config.media.use_redirect and may_redirect:
location = self.media_repo.signed_location_for_thumbnail(
media_id,
{
"width": str(width),
"height": str(height),
"method": method,
"type": m_type,
},
)
if for_federation:
respond_with_multipart_location(request, location.encode("ascii"))
else:
respond_with_redirect(
request, location.encode("ascii"), TEMPORARY_REDIRECT
)
return
# Once we've checked auth we can return early if the media is cached on
# the client
if check_for_cached_entry_and_respond(request):
@@ -351,7 +327,6 @@ class ThumbnailProvider:
max_timeout_ms: int,
for_federation: bool,
allow_authenticated: bool = True,
may_redirect: bool = False,
) -> None:
media_info = await self.media_repo.get_local_media_info(
request, media_id, max_timeout_ms
@@ -365,27 +340,6 @@ class ThumbnailProvider:
if media_info.authenticated:
raise NotFoundError()
if self.hs.config.media.use_redirect and may_redirect:
location = self.media_repo.signed_location_for_thumbnail(
media_id,
{
"width": str(desired_width),
"height": str(desired_height),
"method": desired_method,
"type": desired_type,
},
)
if for_federation:
respond_with_multipart_location(request, location.encode("ascii"))
else:
respond_with_redirect(
request, location.encode("ascii"), TEMPORARY_REDIRECT
)
return
# Once we've checked auth we can return early if the media is cached on
# the client
if check_for_cached_entry_and_respond(request):

View File

@@ -31,7 +31,6 @@ from typing import (
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
@@ -74,6 +73,8 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
@@ -162,47 +163,42 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# List of callbacks: each callback should either return a value (if there are no
# labels for this metric), or dict mapping from a label tuple to a value
_hooks: List[
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
for hook in self._hooks:
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
def register_hook(
self,
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
self._hooks.append(hook)
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
yield g
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -254,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations
self._lock = threading.Lock()
REGISTRY.register(self)
self._register_with_collector()
def register(
self,
@@ -345,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""

View File

@@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL],
)
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -299,16 +281,28 @@ class Notifier:
)
}
notifier_listeners_gauge.register_hook(count_listeners)
notifier_rooms_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
}
},
)
notifier_users_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_user_stream)}
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -25,6 +25,7 @@ from typing import (
Any,
Collection,
Dict,
FrozenSet,
List,
Mapping,
Optional,
@@ -477,8 +478,18 @@ class BulkPushRuleEvaluator:
event.room_version.msc3931_push_features,
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
self.hs.config.experimental.msc4210_enabled,
self.hs.config.experimental.msc4306_enabled,
)
msc4306_thread_subscribers: Optional[FrozenSet[str]] = None
if self.hs.config.experimental.msc4306_enabled and thread_id != MAIN_TIMELINE:
# pull out, in batch, all local subscribers to this thread
# (in the common case, they will all be getting processed for push
# rules right now)
msc4306_thread_subscribers = await self.store.get_subscribers_to_thread(
event.room_id, thread_id
)
for uid, rules in rules_by_user.items():
if event.sender == uid:
continue
@@ -503,7 +514,13 @@ class BulkPushRuleEvaluator:
# current user, it'll be added to the dict later.
actions_by_user[uid] = []
actions = evaluator.run(rules, uid, display_name)
msc4306_thread_subscription_state: Optional[bool] = None
if msc4306_thread_subscribers is not None:
msc4306_thread_subscription_state = uid in msc4306_thread_subscribers
actions = evaluator.run(
rules, uid, display_name, msc4306_thread_subscription_state
)
if "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions

View File

@@ -106,18 +106,6 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
@@ -255,8 +243,11 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []
tcp_resource_total_connections_gauge.register_hook(
lambda: {(self.server_name,): len(self._connections)}
LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -275,11 +266,14 @@ class ReplicationCommandHandler:
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
tcp_command_queue_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
}
},
)
self._is_master = hs.config.worker.worker_app is None

View File

@@ -527,11 +527,9 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
pending_commands.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
}
},
)
@@ -546,11 +544,9 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
transport_send_buffer.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
}
},
)
@@ -575,12 +571,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_send_buffer.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
}
},
)
@@ -588,10 +582,8 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_read_buffer.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
}
},
)

View File

@@ -30,7 +30,6 @@ from synapse.http.server import (
respond_with_json_bytes,
set_corp_headers,
set_cors_headers,
set_headers_for_media_response,
)
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
@@ -170,8 +169,7 @@ class ThumbnailResource(RestServlet):
method,
m_type,
max_timeout_ms,
for_federation=False,
may_redirect=True,
False,
)
else:
await self.thumbnailer.respond_local_thumbnail(
@@ -182,8 +180,7 @@ class ThumbnailResource(RestServlet):
method,
m_type,
max_timeout_ms,
for_federation=False,
may_redirect=True,
False,
)
self.media_repo.mark_recently_accessed(None, media_id)
else:
@@ -241,7 +238,21 @@ class DownloadResource(RestServlet):
await self.auth.get_user_by_req(request, allow_guest=True)
set_headers_for_media_response(request)
set_cors_headers(request)
set_corp_headers(request)
request.setHeader(
b"Content-Security-Policy",
b"sandbox;"
b" default-src 'none';"
b" script-src 'none';"
b" plugin-types application/pdf;"
b" style-src 'unsafe-inline';"
b" media-src 'self';"
b" object-src 'self';",
)
# Limited non-standard form of CSP for IE11
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
request.setHeader(b"Referrer-Policy", b"no-referrer")
max_timeout_ms = parse_integer(
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
)
@@ -249,7 +260,7 @@ class DownloadResource(RestServlet):
if self._is_mine_server_name(server_name):
await self.media_repo.get_local_media(
request, media_id, file_name, max_timeout_ms, may_redirect=True
request, media_id, file_name, max_timeout_ms
)
else:
ip_address = request.getClientAddress().host

View File

@@ -23,9 +23,7 @@ import logging
import re
from typing import TYPE_CHECKING, Optional
from synapse.http.server import (
set_headers_for_media_response,
)
from synapse.http.server import set_corp_headers, set_cors_headers
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer
from synapse.http.site import SynapseRequest
from synapse.media._base import (
@@ -64,7 +62,21 @@ class DownloadResource(RestServlet):
# Validate the server name, raising if invalid
parse_and_validate_server_name(server_name)
set_headers_for_media_response(request)
set_cors_headers(request)
set_corp_headers(request)
request.setHeader(
b"Content-Security-Policy",
b"sandbox;"
b" default-src 'none';"
b" script-src 'none';"
b" plugin-types application/pdf;"
b" style-src 'unsafe-inline';"
b" media-src 'self';"
b" object-src 'self';",
)
# Limited non-standard form of CSP for IE11
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
request.setHeader(b"Referrer-Policy", b"no-referrer")
max_timeout_ms = parse_integer(
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
)

View File

@@ -31,7 +31,6 @@ from synapse.rest.synapse.client.rendezvous import MSC4108RendezvousSessionResou
from synapse.rest.synapse.client.sso_register import SsoRegisterResource
from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource
from synapse.rest.synapse.mas import MasResource
from synapse.rest.synapse.media import MediaResource
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -65,9 +64,6 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc
resources["/_synapse/jwks"] = JwksResource(hs)
resources["/_synapse/mas"] = MasResource(hs)
if hs.config.media.can_load_media_repo:
resources["/_synapse/media"] = MediaResource(hs)
# provider-specific SSO bits. Only load these if they are enabled, since they
# rely on optional dependencies.
if hs.config.oidc.oidc_enabled:

View File

@@ -1,46 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import logging
from typing import TYPE_CHECKING
from twisted.web.resource import Resource
from .download import DownloadResource
from .thumbnail import ThumbnailResource
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class MediaResource(Resource):
"""
Provides endpoints for signed media downloads and thumbnails.
All endpoints are mounted under the path `/_synapse/media/` and only work
on the media worker.
"""
def __init__(self, hs: "HomeServer"):
assert hs.config.media.can_load_media_repo, (
"This resource should only be mounted on workers that can load the media repo"
)
Resource.__init__(self)
self.putChild(b"download", DownloadResource(hs))
self.putChild(b"thumbnail", ThumbnailResource(hs))

View File

@@ -1,111 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import logging
from typing import TYPE_CHECKING
from synapse.api.errors import NotFoundError
from synapse.http.server import (
DirectServeJsonResource,
set_headers_for_media_response,
)
from synapse.http.servlet import parse_integer, parse_string
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class DownloadResource(DirectServeJsonResource):
"""
Serves media from the media repository, with a temporary signed URL which
expires after a set amount of time.
GET /_synapse/media/download/{media_id}?exp={exp}&sig={sig}
GET /_synapse/media/download/{media_id}/{name}?exp={exp}&sig={sig}
The intent of this resource is to allow the federation and client media APIs
to issue redirects to a signed URL that can then be cached by a CDN. This
endpoint doesn't require any extra header, and is authenticated using the
signature in the URL parameters.
"""
isLeaf = True
def __init__(self, hs: "HomeServer"):
assert hs.config.media.can_load_media_repo, (
"This resource should only be mounted on workers that can load the media repo"
)
DirectServeJsonResource.__init__(
self,
# It is useful to have the tracing context on this endpoint as it
# can help debug federation issues
extract_context=True,
)
self._clock = hs.get_clock()
self._media_repository = hs.get_media_repository()
async def _async_render_GET(self, request: "SynapseRequest") -> None:
set_headers_for_media_response(request)
# Extract the media ID (and optional name) from the path
if request.postpath is None:
raise NotFoundError()
if len(request.postpath) == 1:
media_id = request.postpath[0].decode("utf-8")
name = None
elif len(request.postpath) == 2:
media_id = request.postpath[0].decode("utf-8")
name = request.postpath[1].decode("utf-8")
else:
raise NotFoundError()
# Get the `exp` and `sig` query parameters
exp = parse_integer(request=request, name="exp", required=True, negative=False)
sig = parse_string(request=request, name="sig", required=True)
# Check that the signature is valid
key = self._media_repository.download_media_key(
media_id=media_id, exp=exp, name=name
)
if not self._media_repository.verify_media_request_signature(key, sig):
logger.warning(
"Invalid URL signature serving media %s. key: %r, sig: %r",
media_id,
key,
sig,
)
raise NotFoundError()
# Check the expiry time
if exp < self._clock.time_msec():
logger.info("Expired signed URL serving media %s", media_id)
raise NotFoundError()
# Reply with the media
await self._media_repository.get_local_media(
request=request,
media_id=media_id,
name=name,
max_timeout_ms=0, # If we got here, the media finished uploading
federation=False, # This changes the response to be multipart; we explicitly don't want that
may_redirect=False, # We're already on the redirected URL, we don't want to redirect again
)

View File

@@ -1,146 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl_3.0.html>.
#
#
import base64
import logging
from typing import TYPE_CHECKING
from urllib.parse import parse_qs
from synapse.api.errors import NotFoundError
from synapse.http.server import (
DirectServeJsonResource,
set_headers_for_media_response,
)
from synapse.http.servlet import (
parse_integer,
parse_integer_from_args,
parse_string,
parse_string_from_args,
)
from synapse.media.thumbnailer import ThumbnailProvider
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ThumbnailResource(DirectServeJsonResource):
"""
Serves thumbnails from the media repository, with a temporary signed URL
which expires after a set amount of time.
GET /_synapse/media/thumbnail/{media_id}/{parameters}?exp={exp}&sig={sig}
The intent of this resource is to allow the federation and client media APIs
to issue redirects to a signed URL that can then be cached by a CDN. This
endpoint doesn't require any extra header, and is authenticated using the
signature in the URL parameters.
The parameters are encoded as a form-urlencoded then base64 encoded string.
This avoids any automatic url decoding Twisted might do. The reason they are
part of the URL and not the query string is to ignore the query string when
caching the URL, which is possible with some CDNs.
"""
isLeaf = True
def __init__(self, hs: "HomeServer"):
assert hs.config.media.can_load_media_repo, (
"This resource should only be mounted on workers that can load the media repo"
)
DirectServeJsonResource.__init__(
self,
# It is useful to have the tracing context on this endpoint as it
# can help debug federation issues
extract_context=True,
)
self._clock = hs.get_clock()
self._media_repository = hs.get_media_repository()
self._dynamic_thumbnails = hs.config.media.dynamic_thumbnails
self._thumbnailer = ThumbnailProvider(
hs, self._media_repository, self._media_repository.media_storage
)
async def _async_render_GET(self, request: "SynapseRequest") -> None:
set_headers_for_media_response(request)
# Extract the media ID and parameters from the path
if request.postpath is None or len(request.postpath) != 2:
raise NotFoundError()
media_id = request.postpath[0].decode("utf-8")
parameters = request.postpath[1]
# Get the `exp` and `sig` query parameters
exp = parse_integer(request=request, name="exp", required=True, negative=False)
sig = parse_string(request=request, name="sig", required=True)
# Check that the signature is valid
key = self._media_repository.thumbnail_media_key(
media_id=media_id,
parameters=parameters.decode("utf-8"),
exp=exp,
)
if not self._media_repository.verify_media_request_signature(key, sig):
logger.warning(
"Invalid URL signature serving media %s. key: %r, sig: %r",
media_id,
key,
sig,
)
raise NotFoundError()
# Check the expiry time
if exp < self._clock.time_msec():
logger.info("Expired signed URL serving media %s", media_id)
raise NotFoundError()
# Now parse and check the parameters
args = parse_qs(base64.urlsafe_b64decode(parameters))
width = parse_integer_from_args(args, "width", required=True)
height = parse_integer_from_args(args, "height", required=True)
method = parse_string_from_args(args, "method", "scale")
m_type = parse_string_from_args(args, "type", "image/png")
# Reply with the thumbnail
if self._dynamic_thumbnails:
await self._thumbnailer.select_or_generate_local_thumbnail(
request,
media_id,
width,
height,
method,
m_type,
max_timeout_ms=0, # If we got here, the media finished uploading
for_federation=False, # This changes the response to be multipart; we explicitly don't want that
may_redirect=False, # We're already on the redirected URL, we don't want to redirect again
)
else:
await self._thumbnailer.respond_local_thumbnail(
request,
media_id,
width,
height,
method,
m_type,
max_timeout_ms=0, # If we got here, the media finished uploading
for_federation=False, # This changes the response to be multipart; we explicitly don't want that
may_redirect=False, # We're already on the redirected URL, we don't want to redirect again
)

View File

@@ -100,12 +100,6 @@ sql_txn_duration = Counter(
labelnames=["desc", SERVER_NAME_LABEL],
)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
)
# Unique indexes which have been added in background updates. Maps from table name
# to the name of the background update which added the unique index to that table.
@@ -617,8 +611,11 @@ class DatabasePool:
)
self.updates = BackgroundUpdater(hs, self)
background_update_status.register_hook(
lambda: {(self.server_name,): self.updates.get_status()},
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0

View File

@@ -110,6 +110,7 @@ def _load_rules(
msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
msc4028_push_encrypted_events=experimental_config.msc4028_push_encrypted_events,
msc4210_enabled=experimental_config.msc4210_enabled,
msc4306_enabled=experimental_config.msc4306_enabled,
)
return filtered_rules

View File

@@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership:
"""Returned by `get_membership_from_event_ids`"""
@@ -123,8 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1,
self._count_known_servers,
)
federation_known_servers_gauge.register_hook(
lambda: {(self.server_name,): self._known_servers_count}
LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")

View File

@@ -14,6 +14,7 @@ import logging
from typing import (
TYPE_CHECKING,
Any,
FrozenSet,
Iterable,
List,
Optional,
@@ -99,6 +100,7 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
self.get_subscription_for_thread.invalidate(
(row.user_id, row.room_id, row.event_id)
)
self.get_subscribers_to_thread.invalidate((row.room_id, row.event_id))
super().process_replication_rows(stream_name, instance_name, token, rows)
@@ -194,6 +196,16 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
"""
assert self._can_write_to_thread_subscriptions
def _invalidate_subscription_caches(txn: LoggingTransaction) -> None:
txn.call_after(
self.get_subscription_for_thread.invalidate,
(user_id, room_id, thread_root_event_id),
)
txn.call_after(
self.get_subscribers_to_thread.invalidate,
(room_id, thread_root_event_id),
)
def _subscribe_user_to_thread_txn(
txn: LoggingTransaction,
) -> Optional[Union[int, AutomaticSubscriptionConflicted]]:
@@ -234,10 +246,7 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
"unsubscribed_at_topological_ordering": None,
},
)
txn.call_after(
self.get_subscription_for_thread.invalidate,
(user_id, room_id, thread_root_event_id),
)
_invalidate_subscription_caches(txn)
return stream_id
# we already have either a subscription or a prior unsubscription here
@@ -291,10 +300,7 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
"unsubscribed_at_topological_ordering": None,
},
)
txn.call_after(
self.get_subscription_for_thread.invalidate,
(user_id, room_id, thread_root_event_id),
)
_invalidate_subscription_caches(txn)
return stream_id
@@ -376,6 +382,10 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
self.get_subscription_for_thread.invalidate,
(user_id, room_id, thread_root_event_id),
)
txn.call_after(
self.get_subscribers_to_thread.invalidate,
(room_id, thread_root_event_id),
)
return stream_id
@@ -388,7 +398,9 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
Purge all subscriptions for the user.
The fact that subscriptions have been purged will not be streamed;
all stream rows for the user will in fact be removed.
This is intended only for dealing with user deactivation.
This must only be used for user deactivation,
because it does not invalidate the `subscribers_to_thread` cache.
"""
def _purge_thread_subscription_settings_for_user_txn(
@@ -449,6 +461,42 @@ class ThreadSubscriptionsWorkerStore(CacheInvalidationWorkerStore):
return ThreadSubscription(automatic=automatic)
# max_entries=100 rationale:
# this returns a potentially large datastructure
# (since each entry contains a set which contains a potentially large number of user IDs),
# whereas the default of 10'000 entries for @cached feels more
# suitable for very small cache entries.
#
# Overall, when bearing in mind the usual profile of a small community-server or company-server
# (where cache tuning hasn't been done, so we're in out-of-box configuration), it is very
# unlikely we would benefit from keeping hot the subscribers for as many as 100 threads,
# since it's unlikely that so many threads will be active in a short span of time on a small homeserver.
# It feels that medium servers will probably also not exhaust this limit.
# Larger homeservers are more likely to be carefully tuned, either with a larger global cache factor
# or carefully following the usage patterns & cache metrics.
# Finally, the query is not so intensive that computing it every time is a huge deal, but given people
# often send messages back-to-back in the same thread it seems like it would offer a mild benefit.
@cached(max_entries=100)
async def get_subscribers_to_thread(
self, room_id: str, thread_root_event_id: str
) -> FrozenSet[str]:
"""
Returns:
the set of user_ids for local users who are subscribed to the given thread.
"""
return frozenset(
await self.db_pool.simple_select_onecol(
table="thread_subscriptions",
keyvalues={
"room_id": room_id,
"event_id": thread_root_event_id,
"subscribed": True,
},
retcol="user_id",
desc="get_subscribers_to_thread",
)
)
def get_max_thread_subscriptions_stream_id(self) -> int:
"""Get the current maximum stream_id for thread subscriptions.

View File

@@ -49,6 +49,7 @@ class FilteredPushRules:
msc3664_enabled: bool,
msc4028_push_encrypted_events: bool,
msc4210_enabled: bool,
msc4306_enabled: bool,
): ...
def rules(self) -> Collection[Tuple[PushRule, bool]]: ...
@@ -67,13 +68,19 @@ class PushRuleEvaluator:
room_version_feature_flags: Tuple[str, ...],
msc3931_enabled: bool,
msc4210_enabled: bool,
msc4306_enabled: bool,
): ...
def run(
self,
push_rules: FilteredPushRules,
user_id: Optional[str],
display_name: Optional[str],
msc4306_thread_subscription_state: Optional[bool],
) -> Collection[Union[Mapping, str]]: ...
def matches(
self, condition: JsonDict, user_id: Optional[str], display_name: Optional[str]
self,
condition: JsonDict,
user_id: Optional[str],
display_name: Optional[str],
msc4306_thread_subscription_state: Optional[bool] = None,
) -> bool: ...

View File

@@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
sleep_affected_hosts_gauge = LaterGauge(
LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
sleep_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
)
),
)
reject_affected_hosts_gauge = LaterGauge(
LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
reject_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
)
),
)

View File

@@ -44,13 +44,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
@@ -137,8 +130,11 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
running_tasks_gauge.register_hook(
lambda: {(self.server_name,): len(self._running_tasks)}
LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(

View File

@@ -186,72 +186,6 @@ class FederationMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
self.assertEqual(channel.is_finished(), True)
self.assertNotIn("body", channel.result)
@unittest.override_config(
{"media_redirect": {"enabled": True, "secret": "supersecret"}}
)
def test_signed_redirect(self) -> None:
"""When media redirect are enabled, we should redirect to a signed URL"""
content = io.BytesIO(b"file_to_stream")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_upload",
content,
46,
UserID.from_string("@user_id:whatever.org"),
)
)
# test with a text file
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/v1/media/download/{content_uri.media_id}",
)
self.pump()
self.assertEqual(200, channel.code)
content_type = channel.headers.getRawHeaders("content-type")
assert content_type is not None
assert "multipart/mixed" in content_type[0]
assert "boundary" in content_type[0]
# extract boundary
boundary = content_type[0].split("boundary=")[1]
lines = channel.text_body.split("\r\n")
# Assert the structure of the multipart body line by line.
# Expected structure:
# --boundary_value
# Content-Type: application/json
#
# {}
# --boundary_value
# Location: signed_url
#
#
# --boundary_value--
# (potentially a final empty line if the body ends with \r\n)
self.assertEqual(len(lines), 10)
# Part 1: JSON metadata
self.assertEqual(lines[0], f"--{boundary}")
self.assertEqual(lines[1], "Content-Type: application/json")
self.assertEqual(lines[2], "") # Empty line separating headers from body
self.assertEqual(lines[3], "{}") # JSON body
# Part 2: Redirect URL
self.assertEqual(lines[4], f"--{boundary}") # Boundary for the next part
# The Location header contains dynamic parts (exp, sig), so use regex
self.assertRegex(
lines[5],
rf"^Location: https://test/_synapse/media/download/{content_uri.media_id}\?exp=\d+&sig=\w+$",
)
self.assertEqual(lines[6], "") # First empty line after Location header
self.assertEqual(lines[7], "") # Second empty line after Location header
# Final boundary
self.assertEqual(lines[8], f"--{boundary}--")
self.assertEqual(lines[9], "")
class FederationThumbnailTest(unittest.FederatingHomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -359,69 +293,3 @@ class FederationThumbnailTest(unittest.FederatingHomeserverTestCase):
small_png.expected_cropped in field for field in stripped_bytes
)
self.assertTrue(found_file)
@unittest.override_config(
{"media_redirect": {"enabled": True, "secret": "supersecret"}}
)
def test_thumbnail_signed_redirect(self) -> None:
"""When media redirect are enabled, we should redirect to a signed URL for thumbnails"""
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_png_thumbnail",
content,
67,
UserID.from_string("@user_id:whatever.org"),
)
)
# test with a thumbnail request
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=scale",
)
self.pump()
self.assertEqual(200, channel.code)
content_type = channel.headers.getRawHeaders("content-type")
assert content_type is not None
assert "multipart/mixed" in content_type[0]
assert "boundary" in content_type[0]
# extract boundary
boundary = content_type[0].split("boundary=")[1]
lines = channel.text_body.split("\r\n")
# Assert the structure of the multipart body line by line.
# Expected structure:
# --boundary_value
# Content-Type: application/json
#
# {}
# --boundary_value
# Location: signed_url
#
#
# --boundary_value--
# (potentially a final empty line if the body ends with \r\n)
self.assertEqual(len(lines), 10)
# Part 1: JSON metadata
self.assertEqual(lines[0], f"--{boundary}")
self.assertEqual(lines[1], "Content-Type: application/json")
self.assertEqual(lines[2], "") # Empty line separating headers from body
self.assertEqual(lines[3], "{}") # JSON body
# Part 2: Redirect URL
self.assertEqual(lines[4], f"--{boundary}") # Boundary for the next part
# The Location header contains dynamic parts (exp, sig), so use regex
self.assertRegex(
lines[5],
rf"^Location: https://test/_synapse/media/thumbnail/{content_uri.media_id}/[^?]+\?exp=\d+&sig=\w+$",
)
self.assertEqual(lines[6], "") # First empty line after Location header
self.assertEqual(lines[7], "") # Second empty line after Location header
# Final boundary
self.assertEqual(lines[8], f"--{boundary}--")
self.assertEqual(lines[9], "")

View File

@@ -22,7 +22,7 @@
from synapse.api.constants import EduTypes
from tests import unittest
from tests.unittest import DEBUG, override_config
from tests.unittest import override_config
class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
@@ -48,7 +48,6 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
)
self.assertEqual(200, channel.code)
@DEBUG
def test_edu_debugging_doesnt_explode(self) -> None:
"""Sanity check incoming federation succeeds with `synapse.debug_8631` enabled.

View File

@@ -22,13 +22,7 @@ from typing import Dict, Protocol, Tuple
from prometheus_client.core import Sample
from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
generate_latest,
)
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest
@@ -291,42 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
later_gauge.register_hook(lambda: {("hs1",): 1})
later_gauge.register_hook(lambda: {("hs2",): 2})
metrics_map = get_latest_metrics()
# Find the metrics for the caches from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in cache metrics {metrics_map}",
)
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
# Sanity check the metric values
self.assertEqual(hs1_metric_value, "1.0")
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]:
"""
Collect the latest metrics from the registry and parse them into an easy to use map.

View File

@@ -26,7 +26,7 @@ from parameterized import parameterized
from twisted.internet.testing import MemoryReactor
from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import RoomVersions
from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
from synapse.rest import admin
@@ -206,7 +206,10 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
bulk_evaluator._action_for_event_by_user.assert_not_called()
def _create_and_process(
self, bulk_evaluator: BulkPushRuleEvaluator, content: Optional[JsonDict] = None
self,
bulk_evaluator: BulkPushRuleEvaluator,
content: Optional[JsonDict] = None,
type: str = "test",
) -> bool:
"""Returns true iff the `mentions` trigger an event push action."""
# Create a new message event which should cause a notification.
@@ -214,7 +217,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self.event_creation_handler.create_event(
self.requester,
{
"type": "test",
"type": type,
"room_id": self.room_id,
"content": content or {},
"sender": f"@bob:{self.hs.hostname}",
@@ -446,3 +449,73 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
},
)
)
@override_config({"experimental_features": {"msc4306_enabled": True}})
def test_thread_subscriptions(self) -> None:
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "test message before subscription",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type=EventTypes.Message,
)
)
self.get_success(
self.hs.get_datastores().main.subscribe_user_to_thread(
self.alice,
self.room_id,
thread_root_id,
automatic_event_orderings=None,
)
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "test message after subscription",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
)
)
def test_with_disabled_thread_subscriptions(self) -> None:
"""
Test what happens with threaded events when MSC4306 is disabled.
FUTURE: If MSC4306 becomes enabled-by-default/accepted, this test is to be removed.
"""
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
# When MSC4306 is not enabled, a threaded message generates a notification
# by default.
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
"msgtype": "m.text",
"body": "test message before subscription",
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root_id,
},
},
type="m.room.message",
)
)

View File

@@ -150,6 +150,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
*,
related_events: Optional[JsonDict] = None,
msc4210: bool = False,
msc4306: bool = False,
) -> PushRuleEvaluator:
event = FrozenEvent(
{
@@ -176,6 +177,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
room_version_feature_flags=event.room_version.msc3931_push_features,
msc3931_enabled=True,
msc4210_enabled=msc4210,
msc4306_enabled=msc4306,
)
def test_display_name(self) -> None:
@@ -806,6 +808,112 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
)
)
def test_thread_subscription_subscribed(self) -> None:
"""
Test MSC4306 thread subscription push rules against an event in a subscribed thread.
"""
evaluator = self._get_evaluator(
{
"msgtype": "m.text",
"body": "Squawk",
"m.relates_to": {
"event_id": "$threadroot",
"rel_type": "m.thread",
},
},
msc4306=True,
)
self.assertTrue(
evaluator.matches(
{
"kind": "io.element.msc4306.thread_subscription",
"subscribed": True,
},
None,
None,
msc4306_thread_subscription_state=True,
)
)
self.assertFalse(
evaluator.matches(
{
"kind": "io.element.msc4306.thread_subscription",
"subscribed": False,
},
None,
None,
msc4306_thread_subscription_state=True,
)
)
def test_thread_subscription_unsubscribed(self) -> None:
"""
Test MSC4306 thread subscription push rules against an event in an unsubscribed thread.
"""
evaluator = self._get_evaluator(
{
"msgtype": "m.text",
"body": "Squawk",
"m.relates_to": {
"event_id": "$threadroot",
"rel_type": "m.thread",
},
},
msc4306=True,
)
self.assertFalse(
evaluator.matches(
{
"kind": "io.element.msc4306.thread_subscription",
"subscribed": True,
},
None,
None,
msc4306_thread_subscription_state=False,
)
)
self.assertTrue(
evaluator.matches(
{
"kind": "io.element.msc4306.thread_subscription",
"subscribed": False,
},
None,
None,
msc4306_thread_subscription_state=False,
)
)
def test_thread_subscription_unthreaded(self) -> None:
"""
Test MSC4306 thread subscription push rules against an unthreaded event.
"""
evaluator = self._get_evaluator(
{"msgtype": "m.text", "body": "Squawk"}, msc4306=True
)
self.assertFalse(
evaluator.matches(
{
"kind": "io.element.msc4306.thread_subscription",
"subscribed": True,
},
None,
None,
msc4306_thread_subscription_state=None,
)
)
self.assertFalse(
evaluator.matches(
{
"kind": "io.element.msc4306.thread_subscription",
"subscribed": False,
},
None,
None,
msc4306_thread_subscription_state=None,
)
)
class TestBulkPushRuleEvaluator(unittest.HomeserverTestCase):
"""Tests for the bulk push rule evaluator"""

View File

@@ -2527,84 +2527,6 @@ class DownloadAndThumbnailTestCase(unittest.HomeserverTestCase):
)
)
@override_config({"media_redirect": {"enabled": True, "secret": "supersecret"}})
def test_download_signed_redirect(self) -> None:
"""When media redirects are enabled, we should redirect to a signed URL for downloads"""
# Create local media
content = io.BytesIO(b"file_to_stream")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_upload",
content,
14,
UserID.from_string(f"@user:{self.hs.hostname}"),
)
)
# Request the download
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{self.hs.hostname}/{content_uri.media_id}",
shorthand=False,
access_token=self.tok,
)
# Should get a redirect response
self.assertEqual(channel.code, 307)
# Check the Location header for the signed URL
location_headers = channel.headers.getRawHeaders("Location")
self.assertIsNotNone(location_headers)
assert location_headers is not None
self.assertEqual(len(location_headers), 1)
location = location_headers[0]
# Verify the signed URL format
self.assertRegex(
location,
rf"^https://test/_synapse/media/download/{content_uri.media_id}\?exp=\d+&sig=\w+$",
)
@override_config({"media_redirect": {"enabled": True, "secret": "supersecret"}})
def test_thumbnail_signed_redirect(self) -> None:
"""When media redirects are enabled, we should redirect to a signed URL for thumbnails (scaled)"""
# Create local media with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_png_thumbnail",
content,
67,
UserID.from_string(f"@user:{self.hs.hostname}"),
)
)
# Request a scaled thumbnail
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/thumbnail/{self.hs.hostname}/{content_uri.media_id}?width=32&height=32",
shorthand=False,
access_token=self.tok,
)
# Should get a redirect response
self.assertEqual(channel.code, 307)
# Check the Location header for the signed URL
location_headers = channel.headers.getRawHeaders("Location")
self.assertIsNotNone(location_headers)
assert location_headers is not None
self.assertEqual(len(location_headers), 1)
location = location_headers[0]
# Verify the signed URL format
self.assertRegex(
location,
rf"^https://test/_synapse/media/thumbnail/{content_uri.media_id}/[^?]+\?exp=\d+&sig=\w+$",
)
configs = [
{"extra_config": {"dynamic_thumbnails": True}},

View File

@@ -1,13 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#

View File

@@ -1,282 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
import io
from typing import Dict
from twisted.internet.testing import MemoryReactor
from twisted.web.resource import Resource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from tests import unittest
class SignedDownloadTestCase(unittest.HomeserverTestCase):
def create_resource_dict(self) -> Dict[str, Resource]:
d = super().create_resource_dict()
d.update(build_synapse_client_resource_tree(self.hs))
return d
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
config["media_redirect"] = {
"enabled": True,
"secret": "supersecret",
}
return self.setup_test_homeserver(config=config)
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
super().prepare(reactor, clock, hs)
self.media_repo = hs.get_media_repository()
def test_valid_signed_download(self) -> None:
"""Test that a valid signed URL returns the media content"""
# Create test content
content = io.BytesIO(b"test file content")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"some_name.txt",
content,
17,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL
exp = self.clock.time_msec() + 3600000 # 1 hour from now
key = self.media_repo.download_media_key(
media_id=content_uri.media_id, exp=exp, name="test_file.txt"
)
sig = self.media_repo.compute_media_request_signature(key)
# Make the request
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}/test_file.txt?exp={exp}&sig={sig}",
shorthand=False,
)
# Check the response
self.assertEqual(channel.code, 200)
self.assertEqual(channel.result["body"], b"test file content")
# Check content type
content_type = channel.headers.getRawHeaders("Content-Type")
assert content_type is not None
self.assertIn("text/plain", content_type[0])
# Check content disposition
content_disposition = channel.headers.getRawHeaders("Content-Disposition")
assert content_disposition is not None
self.assertIn("test_file.txt", content_disposition[0])
def test_valid_signed_download_without_filename(self) -> None:
"""Test that a valid signed URL works without a filename"""
# Create test content
content = io.BytesIO(b"test file content")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_file.txt",
content,
17,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL without filename
exp = self.clock.time_msec() + 3600000 # 1 hour from now
key = self.media_repo.download_media_key(
media_id=content_uri.media_id, exp=exp, name=None
)
sig = self.media_repo.compute_media_request_signature(key)
# Make the request
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
shorthand=False,
)
# Check the response
self.assertEqual(channel.code, 200)
self.assertEqual(channel.result["body"], b"test file content")
def test_invalid_signature(self) -> None:
"""Test that an invalid signature returns 404"""
# Create test content
content = io.BytesIO(b"test file content")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_file.txt",
content,
17,
UserID.from_string("@user:test"),
)
)
# Use a properly formatted but invalid signature (64 hex chars like a real signature)
exp = self.clock.time_msec() + 3600000 # 1 hour from now
invalid_sig = "0" * 64 # Invalid but properly formatted signature
# Make the request
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={invalid_sig}",
shorthand=False,
)
# Check the response
self.assertEqual(channel.code, 404)
def test_expired_url(self) -> None:
"""Test that an expired URL returns 404"""
# Create test content
content = io.BytesIO(b"test file content")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_file.txt",
content,
17,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL that will expire soon
exp = self.clock.time_msec() + 1000 # 1 second from now
key = self.media_repo.download_media_key(
media_id=content_uri.media_id, exp=exp, name=None
)
sig = self.media_repo.compute_media_request_signature(key)
# Advance the clock to make the URL expired
self.reactor.advance(2) # Advance 2 seconds
# Make the request
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
shorthand=False,
)
# Check the response
self.assertEqual(channel.code, 404)
def test_missing_parameters(self) -> None:
"""Test that missing exp or sig parameters return 404"""
# Create test content
content = io.BytesIO(b"test file content")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_file.txt",
content,
17,
UserID.from_string("@user:test"),
)
)
# Test missing exp parameter
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?sig=somesig",
shorthand=False,
)
self.assertEqual(channel.code, 400) # Bad request for missing required param
# Test missing sig parameter
exp = self.clock.time_msec() + 3600000
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}",
shorthand=False,
)
self.assertEqual(channel.code, 400) # Bad request for missing required param
def test_nonexistent_media(self) -> None:
"""Test that requesting non-existent media returns 404"""
# Generate a signed URL for non-existent media
fake_media_id = "nonexistent"
exp = self.clock.time_msec() + 3600000 # 1 hour from now
key = self.media_repo.download_media_key(
media_id=fake_media_id, exp=exp, name=None
)
sig = self.media_repo.compute_media_request_signature(key)
# Make the request
channel = self.make_request(
"GET",
f"/_synapse/media/download/{fake_media_id}?exp={exp}&sig={sig}",
shorthand=False,
)
# Check the response
self.assertEqual(channel.code, 404)
def test_etag_functionality(self) -> None:
"""Test that ETag functionality works properly"""
# Create test content
content = io.BytesIO(b"test file content for etag")
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"text/plain",
"test_file.txt",
content,
26,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL
exp = self.clock.time_msec() + 3600000 # 1 hour from now
key = self.media_repo.download_media_key(
media_id=content_uri.media_id, exp=exp, name=None
)
sig = self.media_repo.compute_media_request_signature(key)
# Make the first request
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
shorthand=False,
)
# Check the response has an ETag and a Cache-Control header
self.assertEqual(channel.code, 200)
etag_headers = channel.headers.getRawHeaders("ETag")
assert etag_headers is not None
etag = etag_headers[0]
cache_control = channel.headers.getRawHeaders("Cache-Control")
self.assertIsNotNone(cache_control)
# Make a second request with If-None-Match header
channel = self.make_request(
"GET",
f"/_synapse/media/download/{content_uri.media_id}?exp={exp}&sig={sig}",
shorthand=False,
custom_headers=[("If-None-Match", etag)],
)
# Should get 304 Not Modified
self.assertEqual(channel.code, 304)
self.assertNotIn("body", channel.result)

View File

@@ -1,356 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
import io
import re
from typing import Dict
from urllib.parse import urlencode
from twisted.internet.testing import MemoryReactor
from twisted.web.resource import Resource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from tests import unittest
from tests.media.test_media_storage import small_png
class SignedThumbnailTestCase(unittest.HomeserverTestCase):
def create_resource_dict(self) -> Dict[str, Resource]:
d = super().create_resource_dict()
d.update(build_synapse_client_resource_tree(self.hs))
return d
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
config["media_redirect"] = {
"enabled": True,
"secret": "supersecret",
}
config["dynamic_thumbnails"] = True
return self.setup_test_homeserver(config=config)
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
super().prepare(reactor, clock, hs)
self.media_repo = hs.get_media_repository()
def test_valid_signed_thumbnail_scaled(self) -> None:
"""Test that a valid signed URL returns the thumbnail content (scaled)"""
# Create test content with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_image.png",
content,
67,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL for scaled thumbnail
params_dict = {
"width": "32",
"height": "32",
"method": "scale",
"type": "image/png",
}
signed_url = self.media_repo.signed_location_for_thumbnail(
media_id=content_uri.media_id, parameters=params_dict
)
# Extract the path and query from the signed URL
url_path = signed_url.split("https://test", 1)[1]
# Make the request
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response
self.assertEqual(channel.code, 200)
# Check content type
content_type = channel.headers.getRawHeaders("Content-Type")
assert content_type is not None
self.assertEqual(content_type[0], "image/png")
# Check that we got actual thumbnail data
self.assertIsNotNone(channel.result.get("body"))
self.assertGreater(len(channel.result["body"]), 0)
def test_valid_signed_thumbnail_cropped(self) -> None:
"""Test that a valid signed URL returns the thumbnail content (cropped)"""
# Create test content with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_image.png",
content,
67,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL for cropped thumbnail
params_dict = {
"width": "32",
"height": "32",
"method": "crop",
"type": "image/png",
}
signed_url = self.media_repo.signed_location_for_thumbnail(
media_id=content_uri.media_id, parameters=params_dict
)
# Extract the path and query from the signed URL
url_path = signed_url.split("https://test", 1)[1]
# Make the request
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response
self.assertEqual(channel.code, 200)
# Check content type
content_type = channel.headers.getRawHeaders("Content-Type")
assert content_type is not None
self.assertEqual(content_type[0], "image/png")
# Check that we got actual thumbnail data
self.assertIsNotNone(channel.result.get("body"))
self.assertGreater(len(channel.result["body"]), 0)
def test_invalid_signature(self) -> None:
"""Test that an invalid signature returns 404"""
# Create test content with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_image.png",
content,
67,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL
params_dict = {
"width": "32",
"height": "32",
}
signed_url = self.media_repo.signed_location_for_thumbnail(
media_id=content_uri.media_id, parameters=params_dict
)
# Extract the path and query from the signed URL
url_path = signed_url.split("https://test", 1)[1]
invalid_sig = "0" * 64 # Invalid but properly formatted signature
url_path = re.sub(r"sig=\w+", "sig=" + invalid_sig, url_path)
# Make the request
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response
self.assertEqual(channel.code, 404)
def test_expired_url(self) -> None:
"""Test that an expired URL returns 404"""
# Create test content with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_image.png",
content,
67,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL
params_dict = {
"width": "32",
"height": "32",
}
signed_url = self.media_repo.signed_location_for_thumbnail(
media_id=content_uri.media_id, parameters=params_dict
)
# Extract the path and query from the signed URL
url_path = signed_url.split("https://test", 1)[1]
# Make a first request, it should work
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response
self.assertEqual(channel.code, 200)
# Advance the clock to make the URL expired
self.reactor.advance(
10 * 60 + 1
) # Advance 10 minutes + 1 second (TTL is 10 minutes by default)
# Make a second request, it should fail
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response
self.assertEqual(channel.code, 404)
def test_missing_parameters(self) -> None:
"""Test that missing exp or sig parameters return 400"""
# Create test content with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_image.png",
content,
67,
UserID.from_string("@user:test"),
)
)
params_dict = {
"width": "32",
"height": "32",
"method": "scale",
"type": "image/png",
}
parameters = urlencode(params_dict)
# Test missing exp parameter
channel = self.make_request(
"GET",
f"/_synapse/media/thumbnail/{content_uri.media_id}/{parameters}?sig=somesig",
shorthand=False,
)
self.pump()
self.assertEqual(channel.code, 400) # Bad request for missing required param
# Test missing sig parameter
exp = self.clock.time_msec() + 3600000
channel = self.make_request(
"GET",
f"/_synapse/media/thumbnail/{content_uri.media_id}/{parameters}?exp={exp}",
shorthand=False,
)
self.pump()
self.assertEqual(channel.code, 400) # Bad request for missing required param
def test_nonexistent_media(self) -> None:
"""Test that requesting non-existent media returns 404"""
# Generate a signed URL for non-existent media
fake_media_id = "nonexistent"
params_dict = {
"width": "32",
"height": "32",
}
signed_url = self.media_repo.signed_location_for_thumbnail(
media_id=fake_media_id, parameters=params_dict
)
# Extract the path and query from the signed URL
url_path = signed_url.split("https://test", 1)[1]
# Make the request
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response
self.assertEqual(channel.code, 404)
def test_etag_functionality(self) -> None:
"""Test that ETag functionality works properly"""
# Create test content with an image
content = io.BytesIO(small_png.data)
content_uri = self.get_success(
self.media_repo.create_or_update_content(
"image/png",
"test_image.png",
content,
67,
UserID.from_string("@user:test"),
)
)
# Generate a signed URL
params_dict = {
"width": "32",
"height": "32",
}
signed_url = self.media_repo.signed_location_for_thumbnail(
media_id=content_uri.media_id, parameters=params_dict
)
# Extract the path and query from the signed URL
url_path = signed_url.split("https://test", 1)[1]
# Make a first request
channel = self.make_request(
"GET",
url_path,
shorthand=False,
)
self.pump()
# Check the response has an ETag and a Cache-Control header
self.assertEqual(channel.code, 200)
etag_headers = channel.headers.getRawHeaders("ETag")
assert etag_headers is not None
etag = etag_headers[0]
cache_control = channel.headers.getRawHeaders("Cache-Control")
self.assertIsNotNone(cache_control)
# Make a second request with If-None-Match header
channel = self.make_request(
"GET",
url_path,
shorthand=False,
custom_headers=[("If-None-Match", etag)],
)
self.pump()
# Should get 304 Not Modified
self.assertEqual(channel.code, 304)
self.assertNotIn("body", channel.result)

View File

@@ -327,3 +327,42 @@ class ThreadSubscriptionsTestCase(unittest.HomeserverTestCase):
self.assertFalse(
func(autosub=EventOrderings(-50, 2), unsubscribed_at=EventOrderings(2, 1))
)
def test_get_subscribers_to_thread(self) -> None:
"""
Test getting all subscribers to a thread at once.
To check cache invalidations are correct, we do multiple
step-by-step rounds of subscription changes and assertions.
"""
other_user_id = "@other_user:test"
subscribers = self.get_success(
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
)
self.assertEqual(subscribers, frozenset())
self._subscribe(
self.thread_root_id, automatic_event_orderings=None, user_id=self.user_id
)
subscribers = self.get_success(
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
)
self.assertEqual(subscribers, frozenset((self.user_id,)))
self._subscribe(
self.thread_root_id, automatic_event_orderings=None, user_id=other_user_id
)
subscribers = self.get_success(
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
)
self.assertEqual(subscribers, frozenset((self.user_id, other_user_id)))
self._unsubscribe(self.thread_root_id, user_id=self.user_id)
subscribers = self.get_success(
self.store.get_subscribers_to_thread(self.room_id, self.thread_root_id)
)
self.assertEqual(subscribers, frozenset((other_user_id,)))