Try to update to support per-user enablement
This commit is contained in:
@@ -595,4 +595,4 @@ class ExperimentalConfig(Config):
|
|||||||
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)
|
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)
|
||||||
|
|
||||||
# MSC4354: Sticky Events
|
# MSC4354: Sticky Events
|
||||||
self.msc4354_enabled: bool = experimental.get("msc4354_enabled", False)
|
self.msc4354_enabled_value: bool = experimental.get("msc4354_enabled", False)
|
||||||
|
|||||||
@@ -511,8 +511,6 @@ class FederationSender(AbstractFederationSender):
|
|||||||
|
|
||||||
def notify_new_server_joined(self, server: str, room_id: str) -> None:
|
def notify_new_server_joined(self, server: str, room_id: str) -> None:
|
||||||
# We currently only use this notification for MSC4354: Sticky Events.
|
# We currently only use this notification for MSC4354: Sticky Events.
|
||||||
if not self.hs.config.experimental.msc4354_enabled:
|
|
||||||
return
|
|
||||||
# fire off a processing loop in the background
|
# fire off a processing loop in the background
|
||||||
self.hs.run_as_background_process(
|
self.hs.run_as_background_process(
|
||||||
"process_new_server_joined_over_federation",
|
"process_new_server_joined_over_federation",
|
||||||
|
|||||||
@@ -104,7 +104,6 @@ class PerDestinationQueue:
|
|||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
self._federation_shard_config = hs.config.worker.federation_shard_config
|
self._federation_shard_config = hs.config.worker.federation_shard_config
|
||||||
self._state = hs.get_state_handler()
|
self._state = hs.get_state_handler()
|
||||||
self.msc4354_enabled = hs.config.experimental.msc4354_enabled
|
|
||||||
|
|
||||||
self._should_send_on_this_instance = True
|
self._should_send_on_this_instance = True
|
||||||
if not self._federation_shard_config.should_handle(
|
if not self._federation_shard_config.should_handle(
|
||||||
@@ -582,32 +581,31 @@ class PerDestinationQueue:
|
|||||||
# send.
|
# send.
|
||||||
extrem_events = await self._store.get_events_as_list(extrems)
|
extrem_events = await self._store.get_events_as_list(extrems)
|
||||||
|
|
||||||
if self.msc4354_enabled:
|
# we also want to send sticky events that are still active in this room
|
||||||
# we also want to send sticky events that are still active in this room
|
sticky_event_ids = (
|
||||||
sticky_event_ids = (
|
await self._store.get_sticky_event_ids_sent_by_self(
|
||||||
await self._store.get_sticky_event_ids_sent_by_self(
|
pdu.room_id,
|
||||||
pdu.room_id,
|
last_successful_stream_ordering,
|
||||||
last_successful_stream_ordering,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
# skip any that are actually the forward extremities we want to send anyway
|
)
|
||||||
sticky_events = await self._store.get_events_as_list(
|
# skip any that are actually the forward extremities we want to send anyway
|
||||||
[
|
sticky_events = await self._store.get_events_as_list(
|
||||||
event_id
|
[
|
||||||
for event_id in sticky_event_ids
|
event_id
|
||||||
if event_id not in extrems
|
for event_id in sticky_event_ids
|
||||||
]
|
if event_id not in extrems
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if sticky_events:
|
||||||
|
# *prepend* these to the extrem list, so they are processed first.
|
||||||
|
# This ensures they will show up before the forward extrem in stream order
|
||||||
|
extrem_events = sticky_events + extrem_events
|
||||||
|
logger.info(
|
||||||
|
"Sending %d missed sticky events to %s: %r",
|
||||||
|
len(sticky_events),
|
||||||
|
self._destination,
|
||||||
|
pdu.room_id,
|
||||||
)
|
)
|
||||||
if sticky_events:
|
|
||||||
# *prepend* these to the extrem list, so they are processed first.
|
|
||||||
# This ensures they will show up before the forward extrem in stream order
|
|
||||||
extrem_events = sticky_events + extrem_events
|
|
||||||
logger.info(
|
|
||||||
"Sending %d missed sticky events to %s: %r",
|
|
||||||
len(sticky_events),
|
|
||||||
self._destination,
|
|
||||||
pdu.room_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
new_pdus = []
|
new_pdus = []
|
||||||
for p in extrem_events:
|
for p in extrem_events:
|
||||||
|
|||||||
@@ -82,6 +82,7 @@ from synapse.util.caches.lrucache import LruCache
|
|||||||
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
|
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
from synapse.rest.admin.experimental_features import ExperimentalFeature
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
@@ -2178,6 +2179,7 @@ class SyncHandler:
|
|||||||
|
|
||||||
since_token = sync_result_builder.since_token
|
since_token = sync_result_builder.since_token
|
||||||
user_id = sync_result_builder.sync_config.user.to_string()
|
user_id = sync_result_builder.sync_config.user.to_string()
|
||||||
|
calculate_sticky_events = self.store.is_feature_enabled(user_id, ExperimentalFeature.MSC4354)
|
||||||
|
|
||||||
blocks_all_rooms = (
|
blocks_all_rooms = (
|
||||||
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
|
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
|
||||||
@@ -2216,7 +2218,7 @@ class SyncHandler:
|
|||||||
sync_result_builder.now_token = now_token
|
sync_result_builder.now_token = now_token
|
||||||
|
|
||||||
sticky_by_room: Dict[str, Set[str]] = {}
|
sticky_by_room: Dict[str, Set[str]] = {}
|
||||||
if self.hs_config.experimental.msc4354_enabled:
|
if await calculate_sticky_events:
|
||||||
now_token, sticky_by_room = await self.sticky_events_by_room(
|
now_token, sticky_by_room = await self.sticky_events_by_room(
|
||||||
sync_result_builder, now_token, since_token
|
sync_result_builder, now_token, since_token
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ class ExperimentalFeature(str, Enum):
|
|||||||
MSC3881 = "msc3881"
|
MSC3881 = "msc3881"
|
||||||
MSC3575 = "msc3575"
|
MSC3575 = "msc3575"
|
||||||
MSC4222 = "msc4222"
|
MSC4222 = "msc4222"
|
||||||
|
MSC4354 = "msc4354"
|
||||||
|
|
||||||
def is_globally_enabled(self, config: "HomeServerConfig") -> bool:
|
def is_globally_enabled(self, config: "HomeServerConfig") -> bool:
|
||||||
if self is ExperimentalFeature.MSC3881:
|
if self is ExperimentalFeature.MSC3881:
|
||||||
@@ -52,6 +53,8 @@ class ExperimentalFeature(str, Enum):
|
|||||||
return config.experimental.msc3575_enabled
|
return config.experimental.msc3575_enabled
|
||||||
if self is ExperimentalFeature.MSC4222:
|
if self is ExperimentalFeature.MSC4222:
|
||||||
return config.experimental.msc4222_enabled
|
return config.experimental.msc4222_enabled
|
||||||
|
if self is ExperimentalFeature.MSC4354:
|
||||||
|
return config.experimental.msc4354_enabled
|
||||||
|
|
||||||
assert_never(self)
|
assert_never(self)
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,9 @@ class VersionsRestServlet(RestServlet):
|
|||||||
msc3575_enabled = await self.store.is_feature_enabled(
|
msc3575_enabled = await self.store.is_feature_enabled(
|
||||||
user_id, ExperimentalFeature.MSC3575
|
user_id, ExperimentalFeature.MSC3575
|
||||||
)
|
)
|
||||||
|
msc4354_enabled = await self.store.is_feature_enabled(
|
||||||
|
user_id, ExperimentalFeature.MSC4354
|
||||||
|
)
|
||||||
|
|
||||||
return (
|
return (
|
||||||
200,
|
200,
|
||||||
@@ -183,7 +186,7 @@ class VersionsRestServlet(RestServlet):
|
|||||||
# MSC4169: Backwards-compatible redaction sending using `/send`
|
# MSC4169: Backwards-compatible redaction sending using `/send`
|
||||||
"com.beeper.msc4169": self.config.experimental.msc4169_enabled,
|
"com.beeper.msc4169": self.config.experimental.msc4169_enabled,
|
||||||
# MSC4354: Sticky events
|
# MSC4354: Sticky events
|
||||||
"org.matrix.msc4354": self.config.experimental.msc4354_enabled,
|
"org.matrix.msc4354": msc4354_enabled,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -114,7 +114,7 @@ class ExperimentalFeaturesStore(CacheInvalidationWorkerStore):
|
|||||||
|
|
||||||
if feature.is_globally_enabled(self.hs.config):
|
if feature.is_globally_enabled(self.hs.config):
|
||||||
return True
|
return True
|
||||||
|
msc4354_enabled
|
||||||
# if it's not enabled globally, check if it is enabled per-user
|
# if it's not enabled globally, check if it is enabled per-user
|
||||||
res = await self.db_pool.simple_select_one_onecol(
|
res = await self.db_pool.simple_select_one_onecol(
|
||||||
table="per_user_experimental_features",
|
table="per_user_experimental_features",
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ from synapse.types import (
|
|||||||
)
|
)
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util.clock import Clock
|
from synapse.util.clock import Clock
|
||||||
|
from synapse.rest.admin.experimental_features import ExperimentalFeature
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
filtered_event_logger = logging.getLogger("synapse.visibility.filtered_event_debug")
|
filtered_event_logger = logging.getLogger("synapse.visibility.filtered_event_debug")
|
||||||
@@ -110,6 +111,7 @@ async def filter_events_for_client(
|
|||||||
# We copy the events list to guarantee any modifications we make will only
|
# We copy the events list to guarantee any modifications we make will only
|
||||||
# happen within the function.
|
# happen within the function.
|
||||||
events_before_filtering = events.copy()
|
events_before_filtering = events.copy()
|
||||||
|
sticky_events_enabled = await storage.main.is_feature_enabled(user_id, ExperimentalFeature.MSC4354)
|
||||||
# Default case is to *exclude* soft-failed events
|
# Default case is to *exclude* soft-failed events
|
||||||
events = [e for e in events if not e.internal_metadata.is_soft_failed()]
|
events = [e for e in events if not e.internal_metadata.is_soft_failed()]
|
||||||
client_config = await storage.main.get_admin_client_config_for_user(user_id)
|
client_config = await storage.main.get_admin_client_config_for_user(user_id)
|
||||||
@@ -203,7 +205,7 @@ async def filter_events_for_client(
|
|||||||
# to the cache!
|
# to the cache!
|
||||||
cloned = clone_event(filtered)
|
cloned = clone_event(filtered)
|
||||||
cloned.unsigned[EventUnsignedContentFields.MEMBERSHIP] = user_membership
|
cloned.unsigned[EventUnsignedContentFields.MEMBERSHIP] = user_membership
|
||||||
if storage.main.config.experimental.msc4354_enabled:
|
if sticky_events_enabled:
|
||||||
sticky_duration = cloned.sticky_duration()
|
sticky_duration = cloned.sticky_duration()
|
||||||
if sticky_duration:
|
if sticky_duration:
|
||||||
now = storage.main.clock.time_msec()
|
now = storage.main.clock.time_msec()
|
||||||
|
|||||||
Reference in New Issue
Block a user