Add EventStreamPosition type
This commit is contained in:
@@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
MutableStateMap,
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
StateMap,
|
||||
UserID,
|
||||
get_domain_from_id,
|
||||
@@ -2891,7 +2893,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
return result["max_stream_id"]
|
||||
else:
|
||||
max_stream_id = await self.storage.persistence.persist_events(
|
||||
max_stream_token = await self.storage.persistence.persist_events(
|
||||
event_and_contexts, backfilled=backfilled
|
||||
)
|
||||
|
||||
@@ -2902,12 +2904,12 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
for event, _ in event_and_contexts:
|
||||
await self._notify_persisted_event(event, max_stream_id)
|
||||
await self._notify_persisted_event(event, max_stream_token)
|
||||
|
||||
return max_stream_id
|
||||
return max_stream_token.stream
|
||||
|
||||
async def _notify_persisted_event(
|
||||
self, event: EventBase, max_stream_id: int
|
||||
self, event: EventBase, max_stream_token: RoomStreamToken
|
||||
) -> None:
|
||||
"""Checks to see if notifier/pushers should be notified about the
|
||||
event or not.
|
||||
@@ -2933,9 +2935,11 @@ class FederationHandler(BaseHandler):
|
||||
elif event.internal_metadata.is_outlier():
|
||||
return
|
||||
|
||||
event_stream_id = event.internal_metadata.stream_ordering
|
||||
event_pos = PersistedEventPosition(
|
||||
self._instance_name, event.internal_metadata.stream_ordering
|
||||
)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
|
||||
async def _clean_room_for_join(self, room_id: str) -> None:
|
||||
|
||||
@@ -1136,7 +1136,7 @@ class EventCreationHandler:
|
||||
if prev_state_ids:
|
||||
raise AuthError(403, "Changing the room create event is forbidden")
|
||||
|
||||
event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
|
||||
event_pos, max_stream_token = await self.storage.persistence.persist_event(
|
||||
event, context=context
|
||||
)
|
||||
|
||||
@@ -1147,7 +1147,7 @@ class EventCreationHandler:
|
||||
def _notify():
|
||||
try:
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
event, event_pos, max_stream_token, extra_users=extra_users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying about new room event")
|
||||
@@ -1159,7 +1159,7 @@ class EventCreationHandler:
|
||||
# matters as sometimes presence code can take a while.
|
||||
run_in_background(self._bump_active_time, requester.user)
|
||||
|
||||
return event_stream_id
|
||||
return event_pos.stream
|
||||
|
||||
async def _bump_active_time(self, user: UserID) -> None:
|
||||
try:
|
||||
|
||||
+29
-26
@@ -42,7 +42,13 @@ from synapse.logging.utils import log_function
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
|
||||
from synapse.types import (
|
||||
Collection,
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -189,7 +195,7 @@ class Notifier:
|
||||
self.store = hs.get_datastore()
|
||||
self.pending_new_room_events = (
|
||||
[]
|
||||
) # type: List[Tuple[int, EventBase, Collection[UserID]]]
|
||||
) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
|
||||
|
||||
# Called when there are new things to stream over replication
|
||||
self.replication_callbacks = [] # type: List[Callable[[], None]]
|
||||
@@ -248,8 +254,8 @@ class Notifier:
|
||||
def on_new_room_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
room_stream_id: int,
|
||||
max_room_stream_id: int,
|
||||
event_pos: PersistedEventPosition,
|
||||
max_room_stream_token: RoomStreamToken,
|
||||
extra_users: Collection[UserID] = [],
|
||||
):
|
||||
""" Used by handlers to inform the notifier something has happened
|
||||
@@ -263,16 +269,16 @@ class Notifier:
|
||||
until all previous events have been persisted before notifying
|
||||
the client streams.
|
||||
"""
|
||||
self.pending_new_room_events.append((room_stream_id, event, extra_users))
|
||||
self._notify_pending_new_room_events(max_room_stream_id)
|
||||
self.pending_new_room_events.append((event_pos, event, extra_users))
|
||||
self._notify_pending_new_room_events(max_room_stream_token)
|
||||
|
||||
self.notify_replication()
|
||||
|
||||
def _notify_pending_new_room_events(self, max_room_stream_id: int):
|
||||
def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
|
||||
"""Notify for the room events that were queued waiting for a previous
|
||||
event to be persisted.
|
||||
Args:
|
||||
max_room_stream_id: The highest stream_id below which all
|
||||
max_room_stream_token: The highest stream_id below which all
|
||||
events have been persisted.
|
||||
"""
|
||||
pending = self.pending_new_room_events
|
||||
@@ -281,11 +287,9 @@ class Notifier:
|
||||
users = set() # type: Set[UserID]
|
||||
rooms = set() # type: Set[str]
|
||||
|
||||
for room_stream_id, event, extra_users in pending:
|
||||
if room_stream_id > max_room_stream_id:
|
||||
self.pending_new_room_events.append(
|
||||
(room_stream_id, event, extra_users)
|
||||
)
|
||||
for event_pos, event, extra_users in pending:
|
||||
if event_pos.persisted_after(max_room_stream_token):
|
||||
self.pending_new_room_events.append((event_pos, event, extra_users))
|
||||
else:
|
||||
if (
|
||||
event.type == EventTypes.Member
|
||||
@@ -298,39 +302,38 @@ class Notifier:
|
||||
|
||||
if users or rooms:
|
||||
self.on_new_event(
|
||||
"room_key",
|
||||
RoomStreamToken(None, max_room_stream_id),
|
||||
users=users,
|
||||
rooms=rooms,
|
||||
"room_key", max_room_stream_token, users=users, rooms=rooms,
|
||||
)
|
||||
self._on_updated_room_token(max_room_stream_id)
|
||||
self._on_updated_room_token(max_room_stream_token)
|
||||
|
||||
def _on_updated_room_token(self, max_room_stream_id: int):
|
||||
def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
|
||||
"""Poke services that might care that the room position has been
|
||||
updated.
|
||||
"""
|
||||
|
||||
# poke any interested application service.
|
||||
run_as_background_process(
|
||||
"_notify_app_services", self._notify_app_services, max_room_stream_id
|
||||
"_notify_app_services", self._notify_app_services, max_room_stream_token
|
||||
)
|
||||
|
||||
run_as_background_process(
|
||||
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
|
||||
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
|
||||
)
|
||||
|
||||
if self.federation_sender:
|
||||
self.federation_sender.notify_new_events(max_room_stream_id)
|
||||
self.federation_sender.notify_new_events(max_room_stream_token.stream)
|
||||
|
||||
async def _notify_app_services(self, max_room_stream_id: int):
|
||||
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
|
||||
try:
|
||||
await self.appservice_handler.notify_interested_services(max_room_stream_id)
|
||||
await self.appservice_handler.notify_interested_services(
|
||||
max_room_stream_token.stream
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
|
||||
async def _notify_pusher_pool(self, max_room_stream_id: int):
|
||||
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
|
||||
try:
|
||||
await self._pusher_pool.on_new_notifications(max_room_stream_id)
|
||||
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
|
||||
except Exception:
|
||||
logger.exception("Error pusher pool of event")
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
|
||||
EventsStreamEventRow,
|
||||
EventsStreamRow,
|
||||
)
|
||||
from synapse.types import UserID
|
||||
from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -151,8 +151,14 @@ class ReplicationDataHandler:
|
||||
extra_users = () # type: Tuple[UserID, ...]
|
||||
if event.type == EventTypes.Member:
|
||||
extra_users = (UserID.from_string(event.state_key),)
|
||||
max_token = self.store.get_room_max_stream_ordering()
|
||||
self.notifier.on_new_room_event(event, token, max_token, extra_users)
|
||||
|
||||
max_token = RoomStreamToken(
|
||||
None, self.store.get_room_max_stream_ordering()
|
||||
)
|
||||
event_pos = PersistedEventPosition(instance_name, token)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_pos, max_token, extra_users
|
||||
)
|
||||
|
||||
# Notify any waiting deferreds. The list is ordered by position so we
|
||||
# just iterate through the list until we reach a position that is
|
||||
|
||||
@@ -31,7 +31,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.types import Collection, StateMap
|
||||
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -190,6 +190,7 @@ class EventsPersistenceStorage:
|
||||
self.persist_events_store = stores.persist_events
|
||||
|
||||
self._clock = hs.get_clock()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
@@ -198,7 +199,7 @@ class EventsPersistenceStorage:
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
) -> int:
|
||||
) -> RoomStreamToken:
|
||||
"""
|
||||
Write events to the database
|
||||
Args:
|
||||
@@ -228,11 +229,11 @@ class EventsPersistenceStorage:
|
||||
defer.gatherResults(deferreds, consumeErrors=True)
|
||||
)
|
||||
|
||||
return self.main_store.get_current_events_token()
|
||||
return RoomStreamToken(None, self.main_store.get_current_events_token())
|
||||
|
||||
async def persist_event(
|
||||
self, event: EventBase, context: EventContext, backfilled: bool = False
|
||||
) -> Tuple[int, int]:
|
||||
) -> Tuple[PersistedEventPosition, RoomStreamToken]:
|
||||
"""
|
||||
Returns:
|
||||
The stream ordering of `event`, and the stream ordering of the
|
||||
@@ -247,7 +248,10 @@ class EventsPersistenceStorage:
|
||||
await make_deferred_yieldable(deferred)
|
||||
|
||||
max_persisted_id = self.main_store.get_current_events_token()
|
||||
return (event.internal_metadata.stream_ordering, max_persisted_id)
|
||||
event_stream_id = event.internal_metadata.stream_ordering
|
||||
|
||||
pos = PersistedEventPosition(self._instance_name, event_stream_id)
|
||||
return pos, RoomStreamToken(None, max_persisted_id)
|
||||
|
||||
def _maybe_start_persisting(self, room_id: str):
|
||||
async def persisting_queue(item):
|
||||
|
||||
@@ -496,6 +496,21 @@ class StreamToken:
|
||||
StreamToken.START = StreamToken.from_string("s0_0")
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class PersistedEventPosition:
|
||||
"""Position of a newly persisted event with instance that persisted it.
|
||||
|
||||
This can be used to test whether the event is persisted before or after a
|
||||
RoomStreamToken.
|
||||
"""
|
||||
|
||||
instance_name = attr.ib(type=str)
|
||||
stream = attr.ib(type=int)
|
||||
|
||||
def persisted_after(self, token: RoomStreamToken) -> bool:
|
||||
return token.stream < self.stream
|
||||
|
||||
|
||||
class ThirdPartyInstanceID(
|
||||
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user