diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 2699e466bc..007b105d4d 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -147,6 +147,28 @@ def db_query_to_update_function( return update_function +def make_http_update_function( + hs, stream_name: str +) -> Callable[[int, int, int], Awaitable[Tuple[List[Tuple[int, tuple]], int, bool]]]: + """Makes a suitable function for use as an `update_function` that queries + the master process for updates. + """ + + client = ReplicationGetStreamUpdates.make_client(hs) + + async def update_function( + from_token: int, upto_token: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + return await client( + stream_name=stream_name, + from_token=from_token, + upto_token=upto_token, + limit=limit, + ) + + return update_function + + class BackfillStream(Stream): """We fetched some old events and either we had never seen that event before or it went from being an outlier to not. @@ -204,7 +226,7 @@ class PresenceStream(Stream): self.update_function = db_query_to_update_function(presence_handler.get_all_presence_updates) # type: ignore else: # Query master process - self.update_function = ReplicationGetStreamUpdates.make_client(hs) # type: ignore + self.update_function = make_http_update_function(hs, self.NAME) # type: ignore super(PresenceStream, self).__init__(hs) @@ -226,7 +248,7 @@ class TypingStream(Stream): self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore else: # Query master process - self.update_function = ReplicationGetStreamUpdates.make_client(hs) # type: ignore + self.update_function = make_http_update_function(hs, self.NAME) # type: ignore super(TypingStream, self).__init__(hs)