From ba93cda3639aebcedef879b2d1a4c2cbe04b938f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Aug 2020 17:25:25 +0100 Subject: [PATCH] Merge EventStreamToken and RoomStreamToken --- synapse/handlers/admin.py | 6 +-- synapse/handlers/device.py | 5 +- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/pagination.py | 6 +-- synapse/handlers/room.py | 10 ++-- synapse/handlers/sync.py | 7 +-- .../storage/databases/main/purge_events.py | 4 +- synapse/storage/databases/main/stream.py | 46 ++++++++-------- synapse/types.py | 52 +++++-------------- 9 files changed, 52 insertions(+), 86 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 506bb2b275..9d8cfb4d9c 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -18,7 +18,7 @@ from typing import List from synapse.api.constants import Membership from synapse.events import FrozenEvent -from synapse.types import RoomStreamToken, StateMap +from synapse.types import EventStreamToken, StateMap from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -125,8 +125,8 @@ class AdminHandler(BaseHandler): else: stream_ordering = room.stream_ordering - from_key = str(RoomStreamToken(0, 0)) - to_key = str(RoomStreamToken(None, stream_ordering)) + from_key = str(EventStreamToken(stream=0, topological=0)) + to_key = str(EventStreamToken(stream_ordering)) written_events = set() # Events that we've processed in this room diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1238fa29a1..270c0b9882 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -28,7 +28,6 @@ from synapse.api.errors import ( from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ( - RoomStreamToken, StreamToken, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -143,9 +142,7 @@ class DeviceWorkerHandler(BaseHandler): ) rooms_changed.update(event.room_id for event in member_events) - stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key.token - ).stream + stream_ordering = from_token.room_key.stream possibly_changed = set(changed) possibly_left = set() diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 5da7dc2792..51524aa8b0 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -164,7 +164,7 @@ class InitialSyncHandler(BaseHandler): self.state_handler.get_current_state, event.room_id ) elif event.membership == Membership.LEAVE: - room_end_token = EventStreamToken("s%d" % (event.stream_ordering,)) + room_end_token = EventStreamToken(event.stream_ordering) deferred_room_state = run_in_background( self.state_store.get_state_for_events, [event.event_id] ) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 2b48d486d1..9b9d0c564e 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import Requester, RoomStreamToken +from synapse.types import EventStreamToken, Requester from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -338,8 +338,6 @@ class PaginationHandler(object): ) room_token = pagin_config.from_token.room_key - room_token = RoomStreamToken.parse(room_token.token) - pagin_config.from_token = pagin_config.from_token.copy_and_replace( "room_key", str(room_token) ) @@ -371,7 +369,7 @@ class PaginationHandler(object): leave_token = await self.store.get_topological_token_for_event( member_event_id ) - leave_token = RoomStreamToken.parse(leave_token) + leave_token = EventStreamToken.parse(leave_token) if leave_token.topological < max_topo: source_config.from_key = str(leave_token) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae7b90e968..23ebd6a2ea 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -46,7 +46,6 @@ from synapse.types import ( Requester, RoomAlias, RoomID, - RoomStreamToken, StateMap, StreamToken, UserID, @@ -1104,10 +1103,9 @@ class RoomEventSource(object): to_key = self.get_current_key() - from_token = RoomStreamToken.parse(from_key.token) - if from_token.topological: + if from_key.topological: logger.warning("Stream has topological part!!!! %r", from_key) - from_key = EventStreamToken(from_token.stream) + from_key = EventStreamToken(from_key.stream) app_service = self.store.get_app_service_by_user_id(user.to_string()) if app_service: @@ -1136,14 +1134,14 @@ class RoomEventSource(object): events[:] = events[:limit] if events: - end_key = EventStreamToken(events[-1].internal_metadata.after) + end_key = EventStreamToken.parse(events[-1].internal_metadata.after) else: end_key = to_key return (events, end_key) def get_current_key(self) -> EventStreamToken: - return EventStreamToken("s%d" % (self.store.get_room_max_stream_ordering(),)) + return EventStreamToken(self.store.get_room_max_stream_ordering(),) class RoomShutdownHandler(object): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d8efe39645..195ab3c3a6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -33,7 +33,6 @@ from synapse.types import ( EventStreamToken, JsonDict, MutableStateMap, - RoomStreamToken, StateMap, StreamToken, UserID, @@ -1483,9 +1482,7 @@ class SyncHandler(object): if rooms_changed: return True - stream_id = RoomStreamToken.parse_stream_token( - since_token.room_key.token - ).stream + stream_id = since_token.room_key.stream for room_id in sync_result_builder.joined_room_ids: if self.store.has_room_changed_since(room_id, stream_id): return True @@ -1751,7 +1748,7 @@ class SyncHandler(object): continue leave_token = now_token.copy_and_replace( - "room_key", EventStreamToken("s%d" % (event.stream_ordering,)), + "room_key", EventStreamToken(event.stream_ordering), ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ea833829ae..3f0c7c9e1e 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -19,7 +19,7 @@ from typing import Any, List, Set, Tuple from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore from synapse.storage.databases.main.state import StateGroupWorkerStore -from synapse.types import RoomStreamToken +from synapse.types import EventStreamToken logger = logging.getLogger(__name__) @@ -51,7 +51,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): ) def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): - token = RoomStreamToken.parse(token_str) + token = EventStreamToken.parse(token_str) # Tables that should be pruned: # event_auth diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 660ebd51e2..e769b152c3 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -49,7 +49,7 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_in_list_sql_clause from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -from synapse.types import EventStreamToken, RoomStreamToken +from synapse.types import EventStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache if TYPE_CHECKING: @@ -328,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): - list of recent events in the room - stream ordering key for the start of the chunk of events returned. """ - from_id = RoomStreamToken.parse_stream_token(from_key.token).stream + from_id = from_key.stream room_ids = self._events_stream_cache.get_entities_changed(room_ids, from_id) @@ -368,7 +368,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_ids from_key: The room_key portion of a StreamToken """ - from_key = RoomStreamToken.parse_stream_token(from_key.token).stream + return { room_id for room_id in room_ids @@ -404,8 +404,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if from_key == to_key: return [], from_key - from_id = RoomStreamToken.parse_stream_token(from_key.token).stream - to_id = RoomStreamToken.parse_stream_token(to_key.token).stream + from_id = from_key.stream + to_id = to_key.stream has_changed = self._events_stream_cache.has_entity_changed(room_id, from_id) @@ -437,7 +437,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = EventStreamToken("s%d" % min(r.stream_ordering for r in rows)) + key = EventStreamToken(min(r.stream_ordering for r in rows)) else: # Assume we didn't get anything because there was nothing to # get. @@ -448,8 +448,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): async def get_membership_changes_for_user( self, user_id: str, from_key: EventStreamToken, to_key: EventStreamToken ) -> List[EventBase]: - from_id = RoomStreamToken.parse_stream_token(from_key.token).stream - to_id = RoomStreamToken.parse_stream_token(to_key.token).stream + from_id = from_key.stream + to_id = to_key.stream if from_key == to_key: return [] @@ -581,7 +581,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): token. """ token = self.get_room_max_stream_ordering() - return EventStreamToken("s%d" % (token,)) + return EventStreamToken(token) async def get_stream_id_for_event(self, event_id: str) -> int: """The stream ID for an event @@ -606,7 +606,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): A "s%d" stream token. """ stream_id = await self.get_stream_id_for_event(event_id) - return EventStreamToken("s%d" % (stream_id,)) + return EventStreamToken(stream_id) async def get_topological_token_for_event(self, event_id: str) -> str: """The stream token for an event @@ -675,8 +675,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): else: topo = None internal = event.internal_metadata - internal.before = str(RoomStreamToken(topo, stream - 1)) - internal.after = str(RoomStreamToken(topo, stream)) + internal.before = str(EventStreamToken(topological=topo, stream=stream - 1)) + internal.after = str(EventStreamToken(topological=topo, stream=stream)) internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( @@ -748,12 +748,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Paginating backwards includes the event at the token, but paginating # forward doesn't. - before_token = RoomStreamToken( - results["topological_ordering"] - 1, results["stream_ordering"] + before_token = EventStreamToken( + topological=results["topological_ordering"] - 1, + stream=results["stream_ordering"], ) - after_token = RoomStreamToken( - results["topological_ordering"], results["stream_ordering"] + after_token = EventStreamToken( + topological=results["topological_ordering"], + stream=results["stream_ordering"], ) rows, start_token = self._paginate_room_events_txn( @@ -963,8 +965,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): bounds = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), - from_token=from_token, - to_token=to_token, + from_token=(from_token.topological, from_token.stream), + to_token=(to_token.topological, to_token.stream) if to_token else None, engine=self.database_engine, ) @@ -1023,12 +1025,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # when we are going backwards so we subtract one from the # stream part. toke -= 1 - next_token = RoomStreamToken(topo, toke) + next_token = EventStreamToken(topological=topo, stream=toke) else: # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token - return rows, EventStreamToken(str(next_token)) + return rows, next_token async def paginate_room_events( self, @@ -1057,9 +1059,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): and `to_key`). """ - from_key = RoomStreamToken.parse(from_key) + from_key = EventStreamToken.parse(from_key) if to_key: - to_key = RoomStreamToken.parse(to_key) + to_key = EventStreamToken.parse(to_key) rows, token = await self.db_pool.runInteraction( "paginate_room_events", diff --git a/synapse/types.py b/synapse/types.py index afb3632939..61d18e865e 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,7 @@ import re import string import sys from collections import namedtuple -from typing import Any, Dict, Mapping, MutableMapping, Tuple, Type, TypeVar +from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar import attr from signedjson.key import decode_verify_key_bytes @@ -388,7 +388,7 @@ class StreamToken( while len(keys) < len(cls._fields): # i.e. old token from before receipt_key keys.append("0") - return cls(EventStreamToken(keys[0]), *keys[1:]) + return cls(EventStreamToken.parse(keys[0]), *keys[1:]) except Exception: raise SynapseError(400, "Invalid Token") @@ -399,10 +399,7 @@ class StreamToken( def room_stream_id(self): # TODO(markjh): Awful hack to work around hacks in the presence tests # which assume that the keys are integers. - if type(self.room_key.token) is int: - return self.room_key.token - else: - return int(self.room_key.token[1:].split("-")[-1]) + return self.room_key.stream def is_after(self, other): """Does this token contain events that the other doesn't?""" @@ -440,39 +437,13 @@ class StreamToken( @attr.s(eq=True, order=True, frozen=True, slots=True) class EventStreamToken: - token = attr.ib() - - def __str__(self) -> str: - return str(self.token) - - -StreamToken.START = StreamToken.from_string("s0_0") - - -class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): - """Tokens are positions between events. The token "s1" comes after event 1. - - s0 s1 - | | - [0] V [1] V [2] - - Tokens can either be a point in the live event stream or a cursor going - through historic events. - - When traversing the live event stream events are ordered by when they - arrived at the homeserver. - - When traversing historic events the events are ordered by their depth in - the event graph "topological_ordering" and then by when they arrived at the - homeserver "stream_ordering". - - Live tokens start with an "s" followed by the "stream_ordering" id of the - event it comes after. Historic tokens start with a "t" followed by the - "topological_ordering" id of the event it comes after, followed by "-", - followed by the "stream_ordering" id of the event it comes after. - """ - - __slots__ = [] # type: list + stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) + topological = attr.ib( + type=Optional[int], + kw_only=True, + default=None, + validator=attr.validators.optional(attr.validators.instance_of(int)), + ) @classmethod def parse(cls, string): @@ -502,6 +473,9 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): return "s%d" % (self.stream,) +StreamToken.START = StreamToken.from_string("s0_0") + + class ThirdPartyInstanceID( namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id")) ):