Use clock as a time source
This commit is contained in:
@@ -20,7 +20,6 @@
|
||||
#
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
@@ -619,7 +618,7 @@ class SyncHandler:
|
||||
events are included, and a dict mapping from room_id to a list of
|
||||
sticky event IDs for that room.
|
||||
"""
|
||||
now = round(time.time() * 1000)
|
||||
now = self.clock.time_msec()
|
||||
with Measure(
|
||||
self.clock, name="sticky_events_by_room", server_name=self.server_name
|
||||
):
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
import logging
|
||||
import time
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
@@ -234,7 +233,7 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
def _get_sticky_event_ids_sent_by_self_txn(
|
||||
self, txn: LoggingTransaction, room_id: str, from_stream_pos: int
|
||||
) -> list[str]:
|
||||
now_ms = self._now()
|
||||
now_ms = self.clock.time_msec()
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT sticky_events.event_id, sticky_events.sender, events.stream_ordering FROM sticky_events
|
||||
@@ -288,7 +287,7 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
txn: LoggingTransaction,
|
||||
events: list[EventBase],
|
||||
) -> None:
|
||||
now_ms = self._now()
|
||||
now_ms = self.clock.time_msec()
|
||||
# event, expires_at, stream_id
|
||||
sticky_events: list[tuple[EventBase, int, int]] = []
|
||||
for ev in events:
|
||||
@@ -607,7 +606,7 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
await self.db_pool.runInteraction(
|
||||
"_delete_expired_sticky_events",
|
||||
self._delete_expired_sticky_events_txn,
|
||||
self._now(),
|
||||
self.clock.time_msec(),
|
||||
)
|
||||
|
||||
def _delete_expired_sticky_events_txn(
|
||||
@@ -620,9 +619,6 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
(now,),
|
||||
)
|
||||
|
||||
def _now(self) -> int:
|
||||
return round(time.time() * 1000)
|
||||
|
||||
def _run_background_cleanup(self) -> Deferred:
|
||||
return self.hs.run_as_background_process(
|
||||
"delete_expired_sticky_events",
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import time
|
||||
from typing import Callable, Collection
|
||||
from unittest import mock
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
@@ -457,8 +456,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
@unittest.override_config({"experimental_features": {"msc4354_enabled": True}})
|
||||
def test_sends_sticky_events(self) -> None:
|
||||
"""Test that we send sticky events in addition to the latest event in the room when catching up."""
|
||||
# make the clock used when generating origin_server_ts the same as the clock used to check expiry
|
||||
self.reactor.advance(time.time())
|
||||
per_dest_queue, sent_pdus = self.make_fake_destination_queue()
|
||||
|
||||
# Make a room with a local user, and two servers. One will go offline
|
||||
|
||||
Reference in New Issue
Block a user