Delete from sticky_events periodically
This commit is contained in:
@@ -25,9 +25,12 @@ from typing import (
|
||||
cast,
|
||||
)
|
||||
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
from synapse.api.constants import EventTypes, StickyEvent
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventPersistencePair
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.streams._base import StickyEventsStream
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
@@ -44,6 +47,12 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Remove entries from the sticky_events table at this frequency.
|
||||
# Note: this does NOT mean we don't honour shorter expiration timeouts.
|
||||
# Consumers call 'get_sticky_events_in_rooms' which has `WHERE expires_at > ?`
|
||||
# to filter out expired sticky events that have yet to be deleted.
|
||||
DELETE_EXPIRED_STICKY_EVENTS_MS = 60 * 1000 * 60 # 1 hour
|
||||
|
||||
|
||||
class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
def __init__(
|
||||
@@ -58,6 +67,12 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
self._instance_name in hs.config.worker.writers.events
|
||||
)
|
||||
|
||||
# Technically this means we will cleanup N times, once per event persister, maybe put on master?
|
||||
if self._can_write_to_sticky_events:
|
||||
self._clock.looping_call(
|
||||
self._run_background_cleanup, DELETE_EXPIRED_STICKY_EVENTS_MS
|
||||
)
|
||||
|
||||
self._sticky_events_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator(
|
||||
db_conn=db_conn,
|
||||
db=database,
|
||||
@@ -426,5 +441,30 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore):
|
||||
# We know the events are otherwise authorised, so we only need to load the current state
|
||||
# and check if the events pass auth at the current state.
|
||||
|
||||
async def _delete_expired_sticky_events(self) -> None:
|
||||
logger.info("delete_expired_sticky_events")
|
||||
await self.db_pool.runInteraction(
|
||||
"_delete_expired_sticky_events",
|
||||
self._delete_expired_sticky_events_txn,
|
||||
self._now(),
|
||||
)
|
||||
|
||||
def _delete_expired_sticky_events_txn(
|
||||
self, txn: LoggingTransaction, now: int
|
||||
) -> None:
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM sticky_events WHERE expires_at < ?
|
||||
""",
|
||||
(now,),
|
||||
)
|
||||
|
||||
def _now(self) -> int:
|
||||
return round(time.time() * 1000)
|
||||
|
||||
def _run_background_cleanup(self) -> Deferred:
|
||||
return run_as_background_process(
|
||||
"delete_expired_sticky_events",
|
||||
self.server_name,
|
||||
self._delete_expired_sticky_events,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user