Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0dd33f1803 | ||
|
|
e2582a5dd7 | ||
|
|
a195efa1fe | ||
|
|
802560211a | ||
|
|
10a08ab88a | ||
|
|
fa6679e794 | ||
|
|
391bfe9a7b | ||
|
|
dd2d32dcdb |
18
CHANGES.md
18
CHANGES.md
@@ -1,21 +1,3 @@
|
||||
Synapse 1.33.0 (2021-05-05)
|
||||
===========================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909))
|
||||
|
||||
|
||||
Synapse 1.33.0rc2 (2021-04-29)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))
|
||||
|
||||
|
||||
Synapse 1.33.0rc1 (2021-04-28)
|
||||
==============================
|
||||
|
||||
|
||||
1
changelog.d/9885.misc
Normal file
1
changelog.d/9885.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add type hints to presence handler.
|
||||
1
changelog.d/9886.misc
Normal file
1
changelog.d/9886.misc
Normal file
@@ -0,0 +1 @@
|
||||
Reduce memory usage of the LRU caches.
|
||||
1
changelog.d/9895.bugfix
Normal file
1
changelog.d/9895.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements.
|
||||
1
changelog.d/9900.bugfix
Normal file
1
changelog.d/9900.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.
|
||||
6
debian/changelog
vendored
6
debian/changelog
vendored
@@ -1,9 +1,3 @@
|
||||
matrix-synapse-py3 (1.33.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 05 May 2021 14:15:27 +0100
|
||||
|
||||
matrix-synapse-py3 (1.32.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.32.2.
|
||||
|
||||
@@ -21,10 +21,9 @@ DISTS = (
|
||||
"debian:buster",
|
||||
"debian:bullseye",
|
||||
"debian:sid",
|
||||
"ubuntu:bionic", # 18.04 LTS (our EOL forced by Py36 on 2021-12-23)
|
||||
"ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14)
|
||||
"ubuntu:groovy", # 20.10 (EOL 2021-07-07)
|
||||
"ubuntu:hirsute", # 21.04 (EOL 2022-01-05)
|
||||
"ubuntu:bionic",
|
||||
"ubuntu:focal",
|
||||
"ubuntu:groovy",
|
||||
)
|
||||
|
||||
DESC = '''\
|
||||
|
||||
@@ -47,7 +47,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.33.0"
|
||||
__version__ = "1.33.0rc1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -28,6 +28,7 @@ from bisect import bisect
|
||||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Callable,
|
||||
Collection,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
@@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC):
|
||||
"""
|
||||
|
||||
async def update_external_syncs_row(
|
||||
self, process_id, user_id, is_syncing, sync_time_msec
|
||||
):
|
||||
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
|
||||
) -> None:
|
||||
"""Update the syncing users for an external process as a delta.
|
||||
|
||||
This is a no-op when presence is handled by a different worker.
|
||||
|
||||
Args:
|
||||
process_id (str): An identifier for the process the users are
|
||||
process_id: An identifier for the process the users are
|
||||
syncing against. This allows synapse to process updates
|
||||
as user start and stop syncing against a given process.
|
||||
user_id (str): The user who has started or stopped syncing
|
||||
is_syncing (bool): Whether or not the user is now syncing
|
||||
sync_time_msec(int): Time in ms when the user was last syncing
|
||||
user_id: The user who has started or stopped syncing
|
||||
is_syncing: Whether or not the user is now syncing
|
||||
sync_time_msec: Time in ms when the user was last syncing
|
||||
"""
|
||||
pass
|
||||
|
||||
async def update_external_syncs_clear(self, process_id):
|
||||
async def update_external_syncs_clear(self, process_id: str) -> None:
|
||||
"""Marks all users that had been marked as syncing by a given process
|
||||
as offline.
|
||||
|
||||
@@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]):
|
||||
|
||||
|
||||
class WorkerPresenceHandler(BasePresenceHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
|
||||
@@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
|
||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
|
||||
# we haven't notified the presence writer of that yet
|
||||
self.users_going_offline = {}
|
||||
self.users_going_offline = {} # type: Dict[str, int]
|
||||
|
||||
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
|
||||
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
|
||||
@@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
self._on_shutdown,
|
||||
)
|
||||
|
||||
def _on_shutdown(self):
|
||||
def _on_shutdown(self) -> None:
|
||||
if self._presence_enabled:
|
||||
self.hs.get_tcp_replication().send_command(
|
||||
ClearUserSyncsCommand(self.instance_id)
|
||||
)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
|
||||
if self._presence_enabled:
|
||||
self.hs.get_tcp_replication().send_user_sync(
|
||||
self.instance_id, user_id, is_syncing, last_sync_ms
|
||||
)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
def mark_as_coming_online(self, user_id: str) -> None:
|
||||
"""A user has started syncing. Send a UserSync to the presence writer,
|
||||
unless they had recently stopped syncing.
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
going_offline = self.users_going_offline.pop(user_id, None)
|
||||
if not going_offline:
|
||||
@@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
# were offline
|
||||
self.send_user_sync(user_id, True, self.clock.time_msec())
|
||||
|
||||
def mark_as_going_offline(self, user_id):
|
||||
def mark_as_going_offline(self, user_id: str) -> None:
|
||||
"""A user has stopped syncing. We wait before notifying the presence
|
||||
writer as its likely they'll come back soon. This allows us to avoid
|
||||
sending a stopped syncing immediately followed by a started syncing
|
||||
notification to the presence writer
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
self.users_going_offline[user_id] = self.clock.time_msec()
|
||||
|
||||
def send_stop_syncing(self):
|
||||
def send_stop_syncing(self) -> None:
|
||||
"""Check if there are any users who have stopped syncing a while ago and
|
||||
haven't come back yet. If there are poke the presence writer about them.
|
||||
"""
|
||||
@@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
|
||||
return _user_syncing()
|
||||
|
||||
async def notify_from_replication(self, states, stream_id):
|
||||
async def notify_from_replication(
|
||||
self, states: List[UserPresenceState], stream_id: int
|
||||
) -> None:
|
||||
parties = await get_interested_parties(self.store, self.presence_router, states)
|
||||
room_ids_to_states, users_to_states = parties
|
||||
|
||||
@@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
if count > 0
|
||||
]
|
||||
|
||||
async def set_state(self, target_user, state, ignore_status_msg=False):
|
||||
async def set_state(
|
||||
self,
|
||||
target_user: UserID,
|
||||
state: JsonDict,
|
||||
ignore_status_msg: bool = False,
|
||||
) -> None:
|
||||
"""Set the presence state of the user."""
|
||||
presence = state["presence"]
|
||||
|
||||
@@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
ignore_status_msg=ignore_status_msg,
|
||||
)
|
||||
|
||||
async def bump_presence_active_time(self, user):
|
||||
async def bump_presence_active_time(self, user: UserID) -> None:
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
@@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler):
|
||||
# we assume that all the sync requests on that process have stopped.
|
||||
# Stored as a dict from process_id to set of user_id, and a dict of
|
||||
# process_id to millisecond timestamp last updated.
|
||||
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
|
||||
self.external_process_last_updated_ms = {} # type: Dict[int, int]
|
||||
self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]]
|
||||
self.external_process_last_updated_ms = {} # type: Dict[str, int]
|
||||
|
||||
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
|
||||
|
||||
@@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
self._event_pos = self.store.get_current_events_token()
|
||||
self._event_processing = False
|
||||
|
||||
async def _on_shutdown(self):
|
||||
async def _on_shutdown(self) -> None:
|
||||
"""Gets called when shutting down. This lets us persist any updates that
|
||||
we haven't yet persisted, e.g. updates that only changes some internal
|
||||
timers. This allows changes to persist across startup without having to
|
||||
@@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
)
|
||||
logger.info("Finished _on_shutdown")
|
||||
|
||||
async def _persist_unpersisted_changes(self):
|
||||
async def _persist_unpersisted_changes(self) -> None:
|
||||
"""We periodically persist the unpersisted changes, as otherwise they
|
||||
may stack up and slow down shutdown times.
|
||||
"""
|
||||
@@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
states, destinations
|
||||
)
|
||||
|
||||
async def _handle_timeouts(self):
|
||||
async def _handle_timeouts(self) -> None:
|
||||
"""Checks the presence of users that have timed out and updates as
|
||||
appropriate.
|
||||
"""
|
||||
@@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
return await self._update_states(changes)
|
||||
|
||||
async def bump_presence_active_time(self, user):
|
||||
async def bump_presence_active_time(self, user: UserID) -> None:
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
@@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler):
|
||||
return []
|
||||
|
||||
async def update_external_syncs_row(
|
||||
self, process_id, user_id, is_syncing, sync_time_msec
|
||||
):
|
||||
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
|
||||
) -> None:
|
||||
"""Update the syncing users for an external process as a delta.
|
||||
|
||||
Args:
|
||||
process_id (str): An identifier for the process the users are
|
||||
process_id: An identifier for the process the users are
|
||||
syncing against. This allows synapse to process updates
|
||||
as user start and stop syncing against a given process.
|
||||
user_id (str): The user who has started or stopped syncing
|
||||
is_syncing (bool): Whether or not the user is now syncing
|
||||
sync_time_msec(int): Time in ms when the user was last syncing
|
||||
user_id: The user who has started or stopped syncing
|
||||
is_syncing: Whether or not the user is now syncing
|
||||
sync_time_msec: Time in ms when the user was last syncing
|
||||
"""
|
||||
with (await self.external_sync_linearizer.queue(process_id)):
|
||||
prev_state = await self.current_state_for_user(user_id)
|
||||
@@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
|
||||
|
||||
async def update_external_syncs_clear(self, process_id):
|
||||
async def update_external_syncs_clear(self, process_id: str) -> None:
|
||||
"""Marks all users that had been marked as syncing by a given process
|
||||
as offline.
|
||||
|
||||
@@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler):
|
||||
)
|
||||
self.external_process_last_updated_ms.pop(process_id, None)
|
||||
|
||||
async def current_state_for_user(self, user_id):
|
||||
async def current_state_for_user(self, user_id: str) -> UserPresenceState:
|
||||
"""Get the current presence state for a user."""
|
||||
res = await self.current_state_for_users([user_id])
|
||||
return res[user_id]
|
||||
|
||||
async def _persist_and_notify(self, states):
|
||||
async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
|
||||
"""Persist states in the database, poke the notifier and send to
|
||||
interested remote servers
|
||||
"""
|
||||
@@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
# stream (which is updated by `store.update_presence`).
|
||||
await self.maybe_send_presence_to_interested_destinations(states)
|
||||
|
||||
async def incoming_presence(self, origin, content):
|
||||
async def incoming_presence(self, origin: str, content: JsonDict) -> None:
|
||||
"""Called when we receive a `m.presence` EDU from a remote server."""
|
||||
if not self._presence_enabled:
|
||||
return
|
||||
@@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
federation_presence_counter.inc(len(updates))
|
||||
await self._update_states(updates)
|
||||
|
||||
async def set_state(self, target_user, state, ignore_status_msg=False):
|
||||
async def set_state(
|
||||
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
|
||||
) -> None:
|
||||
"""Set the presence state of the user."""
|
||||
status_msg = state.get("status_msg", None)
|
||||
presence = state["presence"]
|
||||
@@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
await self._update_states([prev_state.copy_and_replace(**new_fields)])
|
||||
|
||||
async def is_visible(self, observed_user, observer_user):
|
||||
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
|
||||
"""Returns whether a user can see another user's presence."""
|
||||
observer_room_ids = await self.store.get_rooms_for_user(
|
||||
observer_user.to_string()
|
||||
@@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
)
|
||||
return rows
|
||||
|
||||
def notify_new_event(self):
|
||||
def notify_new_event(self) -> None:
|
||||
"""Called when new events have happened. Handles users and servers
|
||||
joining rooms and require being sent presence.
|
||||
"""
|
||||
@@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
run_as_background_process("presence.notify_new_event", _process_presence)
|
||||
|
||||
async def _unsafe_process(self):
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "presence_delta"):
|
||||
@@ -1188,7 +1192,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
max_pos
|
||||
)
|
||||
|
||||
async def _handle_state_delta(self, deltas):
|
||||
async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
|
||||
"""Process current state deltas to find new joins that need to be
|
||||
handled.
|
||||
"""
|
||||
@@ -1311,7 +1315,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
return [remote_host], states
|
||||
|
||||
|
||||
def should_notify(old_state, new_state):
|
||||
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
|
||||
"""Decides if a presence state change should be sent to interested parties."""
|
||||
if old_state == new_state:
|
||||
return False
|
||||
@@ -1347,7 +1351,9 @@ def should_notify(old_state, new_state):
|
||||
return False
|
||||
|
||||
|
||||
def format_user_presence_state(state, now, include_user_id=True):
|
||||
def format_user_presence_state(
|
||||
state: UserPresenceState, now: int, include_user_id: bool = True
|
||||
) -> JsonDict:
|
||||
"""Convert UserPresenceState to a format that can be sent down to clients
|
||||
and to other servers.
|
||||
|
||||
@@ -1385,11 +1391,11 @@ class PresenceEventSource:
|
||||
@log_function
|
||||
async def get_new_events(
|
||||
self,
|
||||
user,
|
||||
from_key,
|
||||
room_ids=None,
|
||||
include_offline=True,
|
||||
explicit_room_id=None,
|
||||
user: UserID,
|
||||
from_key: Optional[int],
|
||||
room_ids: Optional[List[str]] = None,
|
||||
include_offline: bool = True,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
**kwargs,
|
||||
) -> Tuple[List[UserPresenceState], int]:
|
||||
# The process for getting presence events are:
|
||||
@@ -1594,7 +1600,7 @@ class PresenceEventSource:
|
||||
if update.state != PresenceState.OFFLINE
|
||||
]
|
||||
|
||||
def get_current_key(self):
|
||||
def get_current_key(self) -> int:
|
||||
return self.store.get_current_presence_token()
|
||||
|
||||
@cached(num_args=2, cache_context=True)
|
||||
@@ -1654,15 +1660,20 @@ class PresenceEventSource:
|
||||
return users_interested_in
|
||||
|
||||
|
||||
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
|
||||
def handle_timeouts(
|
||||
user_states: List[UserPresenceState],
|
||||
is_mine_fn: Callable[[str], bool],
|
||||
syncing_user_ids: Set[str],
|
||||
now: int,
|
||||
) -> List[UserPresenceState]:
|
||||
"""Checks the presence of users that have timed out and updates as
|
||||
appropriate.
|
||||
|
||||
Args:
|
||||
user_states(list): List of UserPresenceState's to check.
|
||||
is_mine_fn (fn): Function that returns if a user_id is ours
|
||||
syncing_user_ids (set): Set of user_ids with active syncs.
|
||||
now (int): Current time in ms.
|
||||
user_states: List of UserPresenceState's to check.
|
||||
is_mine_fn: Function that returns if a user_id is ours
|
||||
syncing_user_ids: Set of user_ids with active syncs.
|
||||
now: Current time in ms.
|
||||
|
||||
Returns:
|
||||
List of UserPresenceState updates
|
||||
@@ -1679,14 +1690,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
|
||||
return list(changes.values())
|
||||
|
||||
|
||||
def handle_timeout(state, is_mine, syncing_user_ids, now):
|
||||
def handle_timeout(
|
||||
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
|
||||
) -> Optional[UserPresenceState]:
|
||||
"""Checks the presence of the user to see if any of the timers have elapsed
|
||||
|
||||
Args:
|
||||
state (UserPresenceState)
|
||||
is_mine (bool): Whether the user is ours
|
||||
syncing_user_ids (set): Set of user_ids with active syncs.
|
||||
now (int): Current time in ms.
|
||||
state
|
||||
is_mine: Whether the user is ours
|
||||
syncing_user_ids: Set of user_ids with active syncs.
|
||||
now: Current time in ms.
|
||||
|
||||
Returns:
|
||||
A UserPresenceState update or None if no update.
|
||||
@@ -1738,23 +1751,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
|
||||
return state if changed else None
|
||||
|
||||
|
||||
def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||
def handle_update(
|
||||
prev_state: UserPresenceState,
|
||||
new_state: UserPresenceState,
|
||||
is_mine: bool,
|
||||
wheel_timer: WheelTimer,
|
||||
now: int,
|
||||
) -> Tuple[UserPresenceState, bool, bool]:
|
||||
"""Given a presence update:
|
||||
1. Add any appropriate timers.
|
||||
2. Check if we should notify anyone.
|
||||
|
||||
Args:
|
||||
prev_state (UserPresenceState)
|
||||
new_state (UserPresenceState)
|
||||
is_mine (bool): Whether the user is ours
|
||||
wheel_timer (WheelTimer)
|
||||
now (int): Time now in ms
|
||||
prev_state
|
||||
new_state
|
||||
is_mine: Whether the user is ours
|
||||
wheel_timer
|
||||
now: Time now in ms
|
||||
|
||||
Returns:
|
||||
3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
|
||||
- new_state: is the state to actually persist
|
||||
- persist_and_notify (bool): whether to persist and notify people
|
||||
- federation_ping (bool): whether we should send a ping over federation
|
||||
- persist_and_notify: whether to persist and notify people
|
||||
- federation_ping: whether we should send a ping over federation
|
||||
"""
|
||||
user_id = new_state.user_id
|
||||
|
||||
|
||||
@@ -39,6 +39,13 @@ class WellKnownBuilder:
|
||||
|
||||
result = {"m.homeserver": {"base_url": self._config.public_baseurl}}
|
||||
|
||||
# Point to BigBlueButton widget for calls
|
||||
result.update({
|
||||
"io.element.call_behaviour": {
|
||||
"widget_build_url": "http://localhost:8184/api/v1/dimension/bigbluebutton/widget_state",
|
||||
}
|
||||
})
|
||||
|
||||
if self._config.default_identity_server:
|
||||
result["m.identity_server"] = {
|
||||
"base_url": self._config.default_identity_server
|
||||
|
||||
@@ -715,7 +715,9 @@ class DatabasePool:
|
||||
# pool).
|
||||
assert not self.engine.in_transaction(conn)
|
||||
|
||||
with LoggingContext("runWithConnection", parent_context) as context:
|
||||
with LoggingContext(
|
||||
str(curr_context), parent_context=parent_context
|
||||
) as context:
|
||||
sched_duration_sec = monotonic_time() - start_time
|
||||
sql_scheduling_timer.observe(sched_duration_sec)
|
||||
context.add_database_scheduled(sched_duration_sec)
|
||||
|
||||
@@ -17,8 +17,10 @@ from functools import wraps
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Collection,
|
||||
Generic,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
@@ -57,13 +59,56 @@ class _Node:
|
||||
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
|
||||
|
||||
def __init__(
|
||||
self, prev_node, next_node, key, value, callbacks: Optional[set] = None
|
||||
self,
|
||||
prev_node,
|
||||
next_node,
|
||||
key,
|
||||
value,
|
||||
callbacks: Collection[Callable[[], None]] = (),
|
||||
):
|
||||
self.prev_node = prev_node
|
||||
self.next_node = next_node
|
||||
self.key = key
|
||||
self.value = value
|
||||
self.callbacks = callbacks or set()
|
||||
|
||||
# Set of callbacks to run when the node gets deleted. We store as a list
|
||||
# rather than a set to keep memory usage down (and since we expect few
|
||||
# entries per node, the performance of checking for duplication in a
|
||||
# list vs using a set is negligible).
|
||||
#
|
||||
# Note that we store this as an optional list to keep the memory
|
||||
# footprint down. Storing `None` is free as its a singleton, while empty
|
||||
# lists are 56 bytes (and empty sets are 216 bytes, if we did the naive
|
||||
# thing and used sets).
|
||||
self.callbacks = None # type: Optional[List[Callable[[], None]]]
|
||||
|
||||
self.add_callbacks(callbacks)
|
||||
|
||||
def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
|
||||
"""Add to stored list of callbacks, removing duplicates."""
|
||||
|
||||
if not callbacks:
|
||||
return
|
||||
|
||||
if not self.callbacks:
|
||||
self.callbacks = []
|
||||
|
||||
for callback in callbacks:
|
||||
if callback not in self.callbacks:
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def run_and_clear_callbacks(self) -> None:
|
||||
"""Run all callbacks and clear the stored list of callbacks. Used when
|
||||
the node is being deleted.
|
||||
"""
|
||||
|
||||
if not self.callbacks:
|
||||
return
|
||||
|
||||
for callback in self.callbacks:
|
||||
callback()
|
||||
|
||||
self.callbacks = None
|
||||
|
||||
|
||||
class LruCache(Generic[KT, VT]):
|
||||
@@ -177,10 +222,10 @@ class LruCache(Generic[KT, VT]):
|
||||
|
||||
self.len = synchronized(cache_len)
|
||||
|
||||
def add_node(key, value, callbacks: Optional[set] = None):
|
||||
def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()):
|
||||
prev_node = list_root
|
||||
next_node = prev_node.next_node
|
||||
node = _Node(prev_node, next_node, key, value, callbacks or set())
|
||||
node = _Node(prev_node, next_node, key, value, callbacks)
|
||||
prev_node.next_node = node
|
||||
next_node.prev_node = node
|
||||
cache[key] = node
|
||||
@@ -211,16 +256,15 @@ class LruCache(Generic[KT, VT]):
|
||||
deleted_len = size_callback(node.value)
|
||||
cached_cache_len[0] -= deleted_len
|
||||
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.callbacks.clear()
|
||||
node.run_and_clear_callbacks()
|
||||
|
||||
return deleted_len
|
||||
|
||||
@overload
|
||||
def cache_get(
|
||||
key: KT,
|
||||
default: Literal[None] = None,
|
||||
callbacks: Iterable[Callable[[], None]] = ...,
|
||||
callbacks: Collection[Callable[[], None]] = ...,
|
||||
update_metrics: bool = ...,
|
||||
) -> Optional[VT]:
|
||||
...
|
||||
@@ -229,7 +273,7 @@ class LruCache(Generic[KT, VT]):
|
||||
def cache_get(
|
||||
key: KT,
|
||||
default: T,
|
||||
callbacks: Iterable[Callable[[], None]] = ...,
|
||||
callbacks: Collection[Callable[[], None]] = ...,
|
||||
update_metrics: bool = ...,
|
||||
) -> Union[T, VT]:
|
||||
...
|
||||
@@ -238,13 +282,13 @@ class LruCache(Generic[KT, VT]):
|
||||
def cache_get(
|
||||
key: KT,
|
||||
default: Optional[T] = None,
|
||||
callbacks: Iterable[Callable[[], None]] = (),
|
||||
callbacks: Collection[Callable[[], None]] = (),
|
||||
update_metrics: bool = True,
|
||||
):
|
||||
node = cache.get(key, None)
|
||||
if node is not None:
|
||||
move_node_to_front(node)
|
||||
node.callbacks.update(callbacks)
|
||||
node.add_callbacks(callbacks)
|
||||
if update_metrics and metrics:
|
||||
metrics.inc_hits()
|
||||
return node.value
|
||||
@@ -260,10 +304,8 @@ class LruCache(Generic[KT, VT]):
|
||||
# We sometimes store large objects, e.g. dicts, which cause
|
||||
# the inequality check to take a long time. So let's only do
|
||||
# the check if we have some callbacks to call.
|
||||
if node.callbacks and value != node.value:
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.callbacks.clear()
|
||||
if value != node.value:
|
||||
node.run_and_clear_callbacks()
|
||||
|
||||
# We don't bother to protect this by value != node.value as
|
||||
# generally size_callback will be cheap compared with equality
|
||||
@@ -273,7 +315,7 @@ class LruCache(Generic[KT, VT]):
|
||||
cached_cache_len[0] -= size_callback(node.value)
|
||||
cached_cache_len[0] += size_callback(value)
|
||||
|
||||
node.callbacks.update(callbacks)
|
||||
node.add_callbacks(callbacks)
|
||||
|
||||
move_node_to_front(node)
|
||||
node.value = value
|
||||
@@ -326,8 +368,7 @@ class LruCache(Generic[KT, VT]):
|
||||
list_root.next_node = list_root
|
||||
list_root.prev_node = list_root
|
||||
for node in cache.values():
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.run_and_clear_callbacks()
|
||||
cache.clear()
|
||||
if size_callback:
|
||||
cached_cache_len[0] = 0
|
||||
|
||||
Reference in New Issue
Block a user