From 309c7eb1a197b940d11249bca4fd8c19b7e84a07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 24 Mar 2020 17:43:42 +0000 Subject: [PATCH] Add some type aliases --- synapse/replication/tcp/streams/_base.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 007b105d4d..c14dff6c64 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -16,7 +16,7 @@ import logging from collections import namedtuple -from typing import Any, Awaitable, Callable, List, Optional, Tuple, Union +from typing import Any, Awaitable, Callable, List, Optional, Tuple import attr @@ -29,6 +29,15 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 500000 +# Some type aliases to make things a bit easier. + +# A stream position token +Token = int + +# A pair of position in stream and args used to create an instance of `ROW_TYPE`. +StreamRow = Tuple[Token, tuple] + + class Stream(object): """Base class for the streams. @@ -66,7 +75,7 @@ class Stream(object): """ self.last_token = self.current_token() - async def get_updates(self) -> Tuple[List[Tuple[int, JsonDict]], int, bool]: + async def get_updates(self) -> Tuple[List[Tuple[Token, JsonDict]], Token, bool]: """Gets all updates since the last time this function was called (or since the stream was constructed if it hadn't been called before). @@ -85,8 +94,8 @@ class Stream(object): return updates, current_token, limited async def get_updates_since( - self, from_token: Union[int, str], upto_token: int, limit: int = 100 - ) -> Tuple[List[Tuple[int, JsonDict]], int, bool]: + self, from_token: Token, upto_token: Token, limit: int = 100 + ) -> Tuple[List[Tuple[Token, JsonDict]], Token, bool]: """Like get_updates except allows specifying from when we should stream updates @@ -128,8 +137,8 @@ class Stream(object): def db_query_to_update_function( - query_function: Callable[[int, int, int], Awaitable[List[tuple]]] -) -> Callable[[int, int, int], Awaitable[Tuple[List[Tuple[int, tuple]], int, bool]]]: + query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]] +) -> Callable[[Token, Token, int], Awaitable[Tuple[List[StreamRow], Token, bool]]]: """Wraps a db query function which returns a list of rows to make it suitable for use as an `update_function` for the Stream class """ @@ -149,7 +158,7 @@ def db_query_to_update_function( def make_http_update_function( hs, stream_name: str -) -> Callable[[int, int, int], Awaitable[Tuple[List[Tuple[int, tuple]], int, bool]]]: +) -> Callable[[Token, Token, Token], Awaitable[Tuple[List[StreamRow], Token, bool]]]: """Makes a suitable function for use as an `update_function` that queries the master process for updates. """