Update delayed events to support no tokens
This commit is contained in:
@@ -399,7 +399,7 @@ class DelayedEventsHandler:
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
async def cancel(self, requester: Requester, delay_id: str) -> None:
|
||||
async def cancel(self, delay_id: str) -> None:
|
||||
"""
|
||||
Cancels the scheduled delivery of the matching delayed event.
|
||||
|
||||
@@ -412,20 +412,19 @@ class DelayedEventsHandler:
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._delayed_event_mgmt_ratelimiter.ratelimit(
|
||||
requester,
|
||||
(requester.user.to_string(), requester.device_id),
|
||||
None,
|
||||
(delay_id),
|
||||
)
|
||||
await make_deferred_yieldable(self._initialized_from_db)
|
||||
|
||||
next_send_ts = await self._store.cancel_delayed_event(
|
||||
delay_id=delay_id,
|
||||
user_localpart=requester.user.localpart,
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at_or_none(next_send_ts)
|
||||
|
||||
async def restart(self, requester: Requester, delay_id: str) -> None:
|
||||
async def restart(self, delay_id: str) -> None:
|
||||
"""
|
||||
Restarts the scheduled delivery of the matching delayed event.
|
||||
|
||||
@@ -438,26 +437,24 @@ class DelayedEventsHandler:
|
||||
"""
|
||||
assert self._is_master
|
||||
await self._delayed_event_mgmt_ratelimiter.ratelimit(
|
||||
requester,
|
||||
(requester.user.to_string(), requester.device_id),
|
||||
None,
|
||||
(delay_id),
|
||||
)
|
||||
await make_deferred_yieldable(self._initialized_from_db)
|
||||
|
||||
next_send_ts = await self._store.restart_delayed_event(
|
||||
delay_id=delay_id,
|
||||
user_localpart=requester.user.localpart,
|
||||
current_ts=self._get_current_ts(),
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at(next_send_ts)
|
||||
|
||||
async def send(self, requester: Requester, delay_id: str) -> None:
|
||||
async def send(self, delay_id: str) -> None:
|
||||
"""
|
||||
Immediately sends the matching delayed event, instead of waiting for its scheduled delivery.
|
||||
|
||||
Args:
|
||||
requester: The owner of the delayed event to act on.
|
||||
delay_id: The ID of the delayed event to act on.
|
||||
|
||||
Raises:
|
||||
@@ -466,28 +463,21 @@ class DelayedEventsHandler:
|
||||
assert self._is_master
|
||||
# Use standard request limiter for sending delayed events on-demand,
|
||||
# as an on-demand send is similar to sending a regular event.
|
||||
await self._request_ratelimiter.ratelimit(requester)
|
||||
await make_deferred_yieldable(self._initialized_from_db)
|
||||
await self._delayed_event_mgmt_ratelimiter.ratelimit(
|
||||
None,
|
||||
(delay_id),
|
||||
)
|
||||
|
||||
event, next_send_ts = await self._store.process_target_delayed_event(
|
||||
delay_id=delay_id,
|
||||
user_localpart=requester.user.localpart,
|
||||
)
|
||||
|
||||
if self._next_send_ts_changed(next_send_ts):
|
||||
self._schedule_next_at_or_none(next_send_ts)
|
||||
|
||||
await self._send_event(
|
||||
DelayedEventDetails(
|
||||
delay_id=DelayID(delay_id),
|
||||
user_localpart=UserLocalpart(requester.user.localpart),
|
||||
room_id=event.room_id,
|
||||
type=event.type,
|
||||
state_key=event.state_key,
|
||||
origin_server_ts=event.origin_server_ts,
|
||||
content=event.content,
|
||||
device_id=event.device_id,
|
||||
)
|
||||
event
|
||||
)
|
||||
|
||||
async def _send_on_timeout(self) -> None:
|
||||
|
||||
@@ -53,8 +53,6 @@ class UpdateDelayedEventServlet(RestServlet):
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, delay_id: str
|
||||
) -> tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
try:
|
||||
action = str(body["action"])
|
||||
@@ -75,11 +73,11 @@ class UpdateDelayedEventServlet(RestServlet):
|
||||
)
|
||||
|
||||
if enum_action == _UpdateDelayedEventAction.CANCEL:
|
||||
await self.delayed_events_handler.cancel(requester, delay_id)
|
||||
await self.delayed_events_handler.cancel(delay_id)
|
||||
elif enum_action == _UpdateDelayedEventAction.RESTART:
|
||||
await self.delayed_events_handler.restart(requester, delay_id)
|
||||
await self.delayed_events_handler.restart(delay_id)
|
||||
elif enum_action == _UpdateDelayedEventAction.SEND:
|
||||
await self.delayed_events_handler.send(requester, delay_id)
|
||||
await self.delayed_events_handler.send(delay_id)
|
||||
return 200, {}
|
||||
|
||||
|
||||
|
||||
@@ -110,7 +110,6 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
table="delayed_events",
|
||||
values={
|
||||
"delay_id": delay_id,
|
||||
"user_localpart": user_localpart,
|
||||
"device_id": device_id,
|
||||
"delay": delay,
|
||||
"send_ts": send_ts,
|
||||
@@ -136,7 +135,6 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
self,
|
||||
*,
|
||||
delay_id: str,
|
||||
user_localpart: str,
|
||||
current_ts: Timestamp,
|
||||
) -> Timestamp:
|
||||
"""
|
||||
@@ -145,7 +143,6 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
|
||||
Args:
|
||||
delay_id: The ID of the delayed event to restart.
|
||||
user_localpart: The localpart of the delayed event's owner.
|
||||
current_ts: The current time, which will be used to calculate the new send time.
|
||||
|
||||
Returns: The send time of the next delayed event to be sent,
|
||||
@@ -163,13 +160,11 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
"""
|
||||
UPDATE delayed_events
|
||||
SET send_ts = ? + delay
|
||||
WHERE delay_id = ? AND user_localpart = ?
|
||||
AND NOT is_processed
|
||||
WHERE delay_id = ? AND NOT is_processed
|
||||
""",
|
||||
(
|
||||
current_ts,
|
||||
delay_id,
|
||||
user_localpart,
|
||||
),
|
||||
)
|
||||
if txn.rowcount == 0:
|
||||
@@ -321,9 +316,8 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
self,
|
||||
*,
|
||||
delay_id: str,
|
||||
user_localpart: str,
|
||||
) -> tuple[
|
||||
EventDetails,
|
||||
DelayedEventDetails,
|
||||
Optional[Timestamp],
|
||||
]:
|
||||
"""
|
||||
@@ -332,7 +326,6 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
|
||||
Args:
|
||||
delay_id: The ID of the delayed event to restart.
|
||||
user_localpart: The localpart of the delayed event's owner.
|
||||
|
||||
Returns: The details of the matching delayed event,
|
||||
and the send time of the next delayed event to be sent, if any.
|
||||
@@ -351,7 +344,7 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
"""
|
||||
UPDATE delayed_events
|
||||
SET is_processed = TRUE
|
||||
WHERE delay_id = ? AND user_localpart = ?
|
||||
WHERE delay_id = ?
|
||||
AND NOT is_processed
|
||||
RETURNING
|
||||
room_id,
|
||||
@@ -359,24 +352,26 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
state_key,
|
||||
origin_server_ts,
|
||||
content,
|
||||
device_id
|
||||
device_id,
|
||||
user_localpart
|
||||
""",
|
||||
(
|
||||
delay_id,
|
||||
user_localpart,
|
||||
),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is None:
|
||||
raise NotFoundError("Delayed event not found")
|
||||
|
||||
event = EventDetails(
|
||||
event = DelayedEventDetails(
|
||||
RoomID.from_string(row[0]),
|
||||
EventType(row[1]),
|
||||
StateKey(row[2]) if row[2] is not None else None,
|
||||
Timestamp(row[3]) if row[3] is not None else None,
|
||||
db_to_json(row[4]),
|
||||
DeviceID(row[5]) if row[5] is not None else None,
|
||||
DelayID(delay_id),
|
||||
UserLocalpart(row[6]),
|
||||
)
|
||||
|
||||
return event, self._get_next_delayed_event_send_ts_txn(txn)
|
||||
@@ -388,8 +383,7 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
async def cancel_delayed_event(
|
||||
self,
|
||||
*,
|
||||
delay_id: str,
|
||||
user_localpart: str,
|
||||
delay_id: str
|
||||
) -> Optional[Timestamp]:
|
||||
"""
|
||||
Cancels the matching delayed event, i.e. remove it as long as it hasn't been processed.
|
||||
@@ -413,7 +407,6 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
table="delayed_events",
|
||||
keyvalues={
|
||||
"delay_id": delay_id,
|
||||
"user_localpart": user_localpart,
|
||||
"is_processed": False,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 Element Creations, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- Remove user_localpart from primary key.
|
||||
|
||||
ALTER TABLE delayed_events DROP CONSTRAINT delayed_events_pkey;
|
||||
ALTER TABLE delayed_events ADD PRIMARY KEY (delay_id);
|
||||
Reference in New Issue
Block a user