Shuffle around code typing handlers
This commit is contained in:
@@ -66,7 +66,7 @@ from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientFactory
|
||||
from synapse.replication.tcp.commands import ClearUserSyncsCommand
|
||||
from synapse.replication.tcp.handler import WorkerReplicationDataHandler
|
||||
from synapse.replication.tcp.handler import ReplicationDataHandler
|
||||
from synapse.replication.tcp.streams import (
|
||||
AccountDataStream,
|
||||
DeviceListsStream,
|
||||
@@ -77,7 +77,6 @@ from synapse.replication.tcp.streams import (
|
||||
ReceiptsStream,
|
||||
TagAccountDataStream,
|
||||
ToDeviceStream,
|
||||
TypingStream,
|
||||
)
|
||||
from synapse.replication.tcp.streams.events import (
|
||||
EventsStream,
|
||||
@@ -381,43 +380,6 @@ class GenericWorkerPresence(object):
|
||||
return set()
|
||||
|
||||
|
||||
class GenericWorkerTyping(object):
|
||||
def __init__(self, hs):
|
||||
self._latest_room_serial = 0
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
"""
|
||||
Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
|
||||
def stream_positions(self):
|
||||
# We must update this typing token from the response of the previous
|
||||
# sync. In particular, the stream id may "reset" back to zero/a low
|
||||
# value which we *must* use for the next replication request.
|
||||
return {"typing": self._latest_room_serial}
|
||||
|
||||
def process_replication_rows(self, token, rows):
|
||||
if self._latest_room_serial > token:
|
||||
# The master has gone backwards. To prevent inconsistent data, just
|
||||
# clear everything.
|
||||
self._reset()
|
||||
|
||||
# Set the latest serial token to whatever the server gave us.
|
||||
self._latest_room_serial = token
|
||||
|
||||
for row in rows:
|
||||
self._room_serials[row.room_id] = token
|
||||
self._room_typing[row.room_id] = row.user_ids
|
||||
|
||||
def get_current_token(self) -> int:
|
||||
return self._latest_room_serial
|
||||
|
||||
|
||||
class GenericWorkerSlavedStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
@@ -619,17 +581,15 @@ class GenericWorkerServer(HomeServer):
|
||||
def build_presence_handler(self):
|
||||
return GenericWorkerPresence(self)
|
||||
|
||||
def build_typing_handler(self):
|
||||
return GenericWorkerTyping(self)
|
||||
|
||||
def build_replication_data_handler(self):
|
||||
return GenericWorkerReplicationHandler(self)
|
||||
|
||||
|
||||
class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
|
||||
class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.typing_handler = hs.get_typing_handler()
|
||||
# NB this is a SynchrotronPresence, not a normal PresenceHandler
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
@@ -643,14 +603,12 @@ class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
|
||||
self.send_handler = None
|
||||
|
||||
async def on_rdata(self, stream_name, token, rows):
|
||||
await super(GenericWorkerReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
await super().on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
|
||||
args.update(self.typing_handler.stream_positions())
|
||||
args = super().get_streams_to_replicate()
|
||||
|
||||
if self.send_handler:
|
||||
args.update(self.send_handler.stream_positions())
|
||||
return args
|
||||
@@ -698,11 +656,6 @@ class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
|
||||
await self.pusher_pool.on_new_receipts(
|
||||
token, token, {row.room_id for row in rows}
|
||||
)
|
||||
elif stream_name == TypingStream.NAME:
|
||||
self.typing_handler.process_replication_rows(token, rows)
|
||||
self.notifier.on_new_event(
|
||||
"typing_key", token, rooms=[row.room_id for row in rows]
|
||||
)
|
||||
elif stream_name == ToDeviceStream.NAME:
|
||||
entities = [row.entity for row in rows if row.entity.startswith("@")]
|
||||
if entities:
|
||||
@@ -938,6 +891,8 @@ def start(config_options):
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.send_federation = True
|
||||
|
||||
config.server.handle_typing = False
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
ss = GenericWorkerServer(
|
||||
|
||||
@@ -83,6 +83,8 @@ class ServerConfig(Config):
|
||||
# "disable" federation
|
||||
self.send_federation = config.get("send_federation", True)
|
||||
|
||||
self.handle_typing = config.get("handle_typing", True)
|
||||
|
||||
# Whether to enable user presence.
|
||||
self.use_presence = config.get("use_presence", True)
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import AuthError, SynapseError
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.replication.tcp.streams import TypingStream
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.metrics import Measure
|
||||
@@ -288,6 +289,52 @@ class TypingHandler(object):
|
||||
return self._latest_room_serial
|
||||
|
||||
|
||||
class TypingSlaveHandler(object):
|
||||
def __init__(self, hs):
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
self._latest_room_serial = 0
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
"""
|
||||
Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
|
||||
def stream_positions(self):
|
||||
# We must update this typing token from the response of the previous
|
||||
# sync. In particular, the stream id may "reset" back to zero/a low
|
||||
# value which we *must* use for the next replication request.
|
||||
return {"typing": self._latest_room_serial}
|
||||
|
||||
def process_replication_rows(self, stream_name, token, rows):
|
||||
if stream_name != TypingStream.NAME:
|
||||
return
|
||||
|
||||
if self._latest_room_serial > token:
|
||||
# The master has gone backwards. To prevent inconsistent data, just
|
||||
# clear everything.
|
||||
self._reset()
|
||||
|
||||
# Set the latest serial token to whatever the server gave us.
|
||||
self._latest_room_serial = token
|
||||
|
||||
for row in rows:
|
||||
self._room_serials[row.room_id] = token
|
||||
self._room_typing[row.room_id] = row.user_ids
|
||||
|
||||
self.notifier.on_new_event(
|
||||
"typing_key", token, rooms=[row.room_id for row in rows]
|
||||
)
|
||||
|
||||
def get_current_token(self) -> int:
|
||||
return self._latest_room_serial
|
||||
|
||||
|
||||
class TypingNotificationEventSource(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
|
||||
@@ -329,44 +329,16 @@ class ReplicationClientHandler:
|
||||
self.send_command(RdataCommand(stream_name, token, data))
|
||||
|
||||
|
||||
class DummyReplicationDataHandler:
|
||||
class ReplicationDataHandler:
|
||||
"""A replication data handler that simply discards all data.
|
||||
"""
|
||||
|
||||
async def on_rdata(self, stream_name: str, token: int, rows: list):
|
||||
"""Called to handle a batch of replication data with a given stream token.
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.typing_handler = hs.get_typing_handler()
|
||||
|
||||
By default this just pokes the slave store. Can be overridden in subclasses to
|
||||
handle more.
|
||||
|
||||
Args:
|
||||
stream_name (str): name of the replication stream for this batch of rows
|
||||
token (int): stream token for this batch of rows
|
||||
rows (list): a list of Stream.ROW_TYPE objects as returned by
|
||||
Stream.parse_row.
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_streams_to_replicate(self) -> Dict[str, int]:
|
||||
"""Called when a new connection has been established and we need to
|
||||
subscribe to streams.
|
||||
|
||||
Returns:
|
||||
map from stream name to the most recent update we have for
|
||||
that stream (ie, the point we want to start replicating from)
|
||||
"""
|
||||
return {}
|
||||
|
||||
async def on_position(self, stream_name: str, token: int):
|
||||
pass
|
||||
|
||||
|
||||
class WorkerReplicationDataHandler:
|
||||
"""A replication data handler that calls slave data stores.
|
||||
"""
|
||||
|
||||
def __init__(self, store):
|
||||
self.store = store
|
||||
self.slaved_store = hs.config.worker_app is not None
|
||||
self.slaved_typing = not hs.config.server.handle_typing
|
||||
|
||||
async def on_rdata(self, stream_name: str, token: int, rows: list):
|
||||
"""Called to handle a batch of replication data with a given stream token.
|
||||
@@ -380,7 +352,11 @@ class WorkerReplicationDataHandler:
|
||||
rows (list): a list of Stream.ROW_TYPE objects as returned by
|
||||
Stream.parse_row.
|
||||
"""
|
||||
self.store.process_replication_rows(stream_name, token, rows)
|
||||
if self.slaved_store:
|
||||
self.store.process_replication_rows(stream_name, token, rows)
|
||||
|
||||
if self.slaved_typing:
|
||||
self.typing_handler.process_replication_rows(stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self) -> Dict[str, int]:
|
||||
"""Called when a new connection has been established and we need to
|
||||
@@ -390,14 +366,25 @@ class WorkerReplicationDataHandler:
|
||||
map from stream name to the most recent update we have for
|
||||
that stream (ie, the point we want to start replicating from)
|
||||
"""
|
||||
args = self.store.stream_positions()
|
||||
user_account_data = args.pop("user_account_data", None)
|
||||
room_account_data = args.pop("room_account_data", None)
|
||||
if user_account_data:
|
||||
args["account_data"] = user_account_data
|
||||
elif room_account_data:
|
||||
args["account_data"] = room_account_data
|
||||
args = {} # type: Dict[str, int]
|
||||
|
||||
if self.slaved_store:
|
||||
args = self.store.stream_positions()
|
||||
user_account_data = args.pop("user_account_data", None)
|
||||
room_account_data = args.pop("room_account_data", None)
|
||||
if user_account_data:
|
||||
args["account_data"] = user_account_data
|
||||
elif room_account_data:
|
||||
args["account_data"] = room_account_data
|
||||
|
||||
if self.slaved_typing:
|
||||
args.update(self.typing_handler.stream_positions())
|
||||
|
||||
return args
|
||||
|
||||
async def on_position(self, stream_name: str, token: int):
|
||||
self.store.process_replication_rows(stream_name, token, [])
|
||||
if self.slaved_store:
|
||||
self.store.process_replication_rows(stream_name, token, [])
|
||||
|
||||
if self.slaved_typing:
|
||||
self.typing_handler.process_replication_rows(stream_name, token, [])
|
||||
|
||||
@@ -25,7 +25,7 @@ from twisted.internet.protocol import Factory
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
|
||||
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
|
||||
from synapse.replication.tcp.streams import STREAMS_MAP, Stream, TypingStream
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -79,8 +79,14 @@ class ReplicationStreamer(object):
|
||||
# hase been disabled on the master.
|
||||
continue
|
||||
|
||||
if stream == TypingStream:
|
||||
continue
|
||||
|
||||
self.streams.append(stream(hs))
|
||||
|
||||
if hs.config.server.handle_typing:
|
||||
self.streams.append(TypingStream(hs))
|
||||
|
||||
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
|
||||
|
||||
self.notifier.add_replication_callback(self.on_notifier_poke)
|
||||
|
||||
@@ -78,7 +78,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
|
||||
from synapse.handlers.set_password import SetPasswordHandler
|
||||
from synapse.handlers.stats import StatsHandler
|
||||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import TypingHandler
|
||||
from synapse.handlers.typing import TypingHandler, TypingSlaveHandler
|
||||
from synapse.handlers.user_directory import UserDirectoryHandler
|
||||
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
@@ -86,8 +86,8 @@ from synapse.notifier import Notifier
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.push.pusherpool import PusherPool
|
||||
from synapse.replication.tcp.handler import (
|
||||
DummyReplicationDataHandler,
|
||||
ReplicationClientHandler,
|
||||
ReplicationDataHandler,
|
||||
)
|
||||
from synapse.replication.tcp.resource import ReplicationStreamer
|
||||
from synapse.rest.media.v1.media_repository import (
|
||||
@@ -354,7 +354,10 @@ class HomeServer(object):
|
||||
return PresenceHandler(self)
|
||||
|
||||
def build_typing_handler(self):
|
||||
return TypingHandler(self)
|
||||
if self.config.handle_typing:
|
||||
return TypingHandler(self)
|
||||
else:
|
||||
return TypingSlaveHandler(self)
|
||||
|
||||
def build_sync_handler(self):
|
||||
return SyncHandler(self)
|
||||
@@ -555,7 +558,7 @@ class HomeServer(object):
|
||||
return ReplicationStreamer(self)
|
||||
|
||||
def build_replication_data_handler(self):
|
||||
return DummyReplicationDataHandler()
|
||||
return ReplicationDataHandler(self)
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
Reference in New Issue
Block a user