1
0

Merge EventStreamToken and RoomStreamToken

This commit is contained in:
Erik Johnston
2020-08-10 17:25:25 +01:00
parent 07d8afc56c
commit ba93cda363
9 changed files with 52 additions and 86 deletions

View File

@@ -18,7 +18,7 @@ from typing import List
from synapse.api.constants import Membership
from synapse.events import FrozenEvent
from synapse.types import RoomStreamToken, StateMap
from synapse.types import EventStreamToken, StateMap
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -125,8 +125,8 @@ class AdminHandler(BaseHandler):
else:
stream_ordering = room.stream_ordering
from_key = str(RoomStreamToken(0, 0))
to_key = str(RoomStreamToken(None, stream_ordering))
from_key = str(EventStreamToken(stream=0, topological=0))
to_key = str(EventStreamToken(stream_ordering))
written_events = set() # Events that we've processed in this room

View File

@@ -28,7 +28,6 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -143,9 +142,7 @@ class DeviceWorkerHandler(BaseHandler):
)
rooms_changed.update(event.room_id for event in member_events)
stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key.token
).stream
stream_ordering = from_token.room_key.stream
possibly_changed = set(changed)
possibly_left = set()

View File

@@ -164,7 +164,7 @@ class InitialSyncHandler(BaseHandler):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
room_end_token = EventStreamToken("s%d" % (event.stream_ordering,))
room_end_token = EventStreamToken(event.stream_ordering)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)

View File

@@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import Requester, RoomStreamToken
from synapse.types import EventStreamToken, Requester
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -338,8 +338,6 @@ class PaginationHandler(object):
)
room_token = pagin_config.from_token.room_key
room_token = RoomStreamToken.parse(room_token.token)
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
@@ -371,7 +369,7 @@ class PaginationHandler(object):
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
leave_token = EventStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)

View File

@@ -46,7 +46,6 @@ from synapse.types import (
Requester,
RoomAlias,
RoomID,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
@@ -1104,10 +1103,9 @@ class RoomEventSource(object):
to_key = self.get_current_key()
from_token = RoomStreamToken.parse(from_key.token)
if from_token.topological:
if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
from_key = EventStreamToken(from_token.stream)
from_key = EventStreamToken(from_key.stream)
app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
@@ -1136,14 +1134,14 @@ class RoomEventSource(object):
events[:] = events[:limit]
if events:
end_key = EventStreamToken(events[-1].internal_metadata.after)
end_key = EventStreamToken.parse(events[-1].internal_metadata.after)
else:
end_key = to_key
return (events, end_key)
def get_current_key(self) -> EventStreamToken:
return EventStreamToken("s%d" % (self.store.get_room_max_stream_ordering(),))
return EventStreamToken(self.store.get_room_max_stream_ordering(),)
class RoomShutdownHandler(object):

View File

@@ -33,7 +33,6 @@ from synapse.types import (
EventStreamToken,
JsonDict,
MutableStateMap,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
@@ -1483,9 +1482,7 @@ class SyncHandler(object):
if rooms_changed:
return True
stream_id = RoomStreamToken.parse_stream_token(
since_token.room_key.token
).stream
stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
return True
@@ -1751,7 +1748,7 @@ class SyncHandler(object):
continue
leave_token = now_token.copy_and_replace(
"room_key", EventStreamToken("s%d" % (event.stream_ordering,)),
"room_key", EventStreamToken(event.stream_ordering),
)
room_entries.append(
RoomSyncResultBuilder(

View File

@@ -19,7 +19,7 @@ from typing import Any, List, Set, Tuple
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken
from synapse.types import EventStreamToken
logger = logging.getLogger(__name__)
@@ -51,7 +51,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
)
def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
token = RoomStreamToken.parse(token_str)
token = EventStreamToken.parse(token_str)
# Tables that should be pruned:
# event_auth

View File

@@ -49,7 +49,7 @@ from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import EventStreamToken, RoomStreamToken
from synapse.types import EventStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
if TYPE_CHECKING:
@@ -328,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
- list of recent events in the room
- stream ordering key for the start of the chunk of events returned.
"""
from_id = RoomStreamToken.parse_stream_token(from_key.token).stream
from_id = from_key.stream
room_ids = self._events_stream_cache.get_entities_changed(room_ids, from_id)
@@ -368,7 +368,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_ids
from_key: The room_key portion of a StreamToken
"""
from_key = RoomStreamToken.parse_stream_token(from_key.token).stream
return {
room_id
for room_id in room_ids
@@ -404,8 +404,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if from_key == to_key:
return [], from_key
from_id = RoomStreamToken.parse_stream_token(from_key.token).stream
to_id = RoomStreamToken.parse_stream_token(to_key.token).stream
from_id = from_key.stream
to_id = to_key.stream
has_changed = self._events_stream_cache.has_entity_changed(room_id, from_id)
@@ -437,7 +437,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ret.reverse()
if rows:
key = EventStreamToken("s%d" % min(r.stream_ordering for r in rows))
key = EventStreamToken(min(r.stream_ordering for r in rows))
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -448,8 +448,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_membership_changes_for_user(
self, user_id: str, from_key: EventStreamToken, to_key: EventStreamToken
) -> List[EventBase]:
from_id = RoomStreamToken.parse_stream_token(from_key.token).stream
to_id = RoomStreamToken.parse_stream_token(to_key.token).stream
from_id = from_key.stream
to_id = to_key.stream
if from_key == to_key:
return []
@@ -581,7 +581,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
token.
"""
token = self.get_room_max_stream_ordering()
return EventStreamToken("s%d" % (token,))
return EventStreamToken(token)
async def get_stream_id_for_event(self, event_id: str) -> int:
"""The stream ID for an event
@@ -606,7 +606,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
A "s%d" stream token.
"""
stream_id = await self.get_stream_id_for_event(event_id)
return EventStreamToken("s%d" % (stream_id,))
return EventStreamToken(stream_id)
async def get_topological_token_for_event(self, event_id: str) -> str:
"""The stream token for an event
@@ -675,8 +675,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
else:
topo = None
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
internal.before = str(EventStreamToken(topological=topo, stream=stream - 1))
internal.after = str(EventStreamToken(topological=topo, stream=stream))
internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around(
@@ -748,12 +748,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1, results["stream_ordering"]
before_token = EventStreamToken(
topological=results["topological_ordering"] - 1,
stream=results["stream_ordering"],
)
after_token = RoomStreamToken(
results["topological_ordering"], results["stream_ordering"]
after_token = EventStreamToken(
topological=results["topological_ordering"],
stream=results["stream_ordering"],
)
rows, start_token = self._paginate_room_events_txn(
@@ -963,8 +965,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
bounds = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
from_token=from_token,
to_token=to_token,
from_token=(from_token.topological, from_token.stream),
to_token=(to_token.topological, to_token.stream) if to_token else None,
engine=self.database_engine,
)
@@ -1023,12 +1025,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
next_token = EventStreamToken(topological=topo, stream=toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, EventStreamToken(str(next_token))
return rows, next_token
async def paginate_room_events(
self,
@@ -1057,9 +1059,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and `to_key`).
"""
from_key = RoomStreamToken.parse(from_key)
from_key = EventStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
to_key = EventStreamToken.parse(to_key)
rows, token = await self.db_pool.runInteraction(
"paginate_room_events",

View File

@@ -18,7 +18,7 @@ import re
import string
import sys
from collections import namedtuple
from typing import Any, Dict, Mapping, MutableMapping, Tuple, Type, TypeVar
from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar
import attr
from signedjson.key import decode_verify_key_bytes
@@ -388,7 +388,7 @@ class StreamToken(
while len(keys) < len(cls._fields):
# i.e. old token from before receipt_key
keys.append("0")
return cls(EventStreamToken(keys[0]), *keys[1:])
return cls(EventStreamToken.parse(keys[0]), *keys[1:])
except Exception:
raise SynapseError(400, "Invalid Token")
@@ -399,10 +399,7 @@ class StreamToken(
def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests
# which assume that the keys are integers.
if type(self.room_key.token) is int:
return self.room_key.token
else:
return int(self.room_key.token[1:].split("-")[-1])
return self.room_key.stream
def is_after(self, other):
"""Does this token contain events that the other doesn't?"""
@@ -440,39 +437,13 @@ class StreamToken(
@attr.s(eq=True, order=True, frozen=True, slots=True)
class EventStreamToken:
token = attr.ib()
def __str__(self) -> str:
return str(self.token)
StreamToken.START = StreamToken.from_string("s0_0")
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
"""Tokens are positions between events. The token "s1" comes after event 1.
s0 s1
| |
[0] V [1] V [2]
Tokens can either be a point in the live event stream or a cursor going
through historic events.
When traversing the live event stream events are ordered by when they
arrived at the homeserver.
When traversing historic events the events are ordered by their depth in
the event graph "topological_ordering" and then by when they arrived at the
homeserver "stream_ordering".
Live tokens start with an "s" followed by the "stream_ordering" id of the
event it comes after. Historic tokens start with a "t" followed by the
"topological_ordering" id of the event it comes after, followed by "-",
followed by the "stream_ordering" id of the event it comes after.
"""
__slots__ = [] # type: list
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
topological = attr.ib(
type=Optional[int],
kw_only=True,
default=None,
validator=attr.validators.optional(attr.validators.instance_of(int)),
)
@classmethod
def parse(cls, string):
@@ -502,6 +473,9 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
return "s%d" % (self.stream,)
StreamToken.START = StreamToken.from_string("s0_0")
class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
):