Compare commits
2 Commits
anoa/docs_
...
erikj/work
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a48296dd86 | ||
|
|
13f9422e38 |
@@ -319,11 +319,24 @@ effects of bursts of events from that bridge on events sent by normal users.
|
||||
|
||||
#### Stream writers
|
||||
|
||||
Additionally, there is *experimental* support for moving writing of specific
|
||||
streams (such as events) off of the main process to a particular worker. (This
|
||||
is only supported with Redis-based replication.)
|
||||
Additionally, there is support for moving writing of specific streams (such as
|
||||
events) off of the main process to a particular worker. (This is only supported
|
||||
with Redis-based replication.)
|
||||
|
||||
Currently supported streams are `events` and `typing`.
|
||||
Currently supported streams are, and which endpoints **must** be routed to them:
|
||||
* `events`
|
||||
|
||||
* `typing`:
|
||||
* `^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/typing`
|
||||
|
||||
* `to_device`:
|
||||
`^/_matrix/client/(api/v1|r0|unstable)/sendToDevice/`
|
||||
`^/_matrix/client/(api/v1|r0|unstable)/keys/claim`
|
||||
`^/_matrix/client/(api/v1|r0|unstable)/room_keys`
|
||||
|
||||
* `account_data`
|
||||
* `receipts`
|
||||
* `presence`
|
||||
|
||||
To enable this, the worker must have a HTTP replication listener configured,
|
||||
have a `worker_name` and be listed in the `instance_map` config. For example to
|
||||
@@ -340,10 +353,10 @@ stream_writers:
|
||||
events: event_persister1
|
||||
```
|
||||
|
||||
The `events` stream also experimentally supports having multiple writers, where
|
||||
work is sharded between them by room ID. Note that you *must* restart all worker
|
||||
instances when adding or removing event persisters. An example `stream_writers`
|
||||
configuration with multiple writers:
|
||||
The `events` stream also supports having multiple writers, where work is sharded
|
||||
between them by room ID. Note that you *must* restart all worker instances when
|
||||
adding or removing event persisters. An example `stream_writers` configuration
|
||||
with multiple writers:
|
||||
|
||||
```yaml
|
||||
stream_writers:
|
||||
@@ -352,6 +365,8 @@ stream_writers:
|
||||
- event_persister2
|
||||
```
|
||||
|
||||
All other streams currently only support having a single writer.
|
||||
|
||||
#### Background tasks
|
||||
|
||||
There is also *experimental* support for moving background tasks to a separate
|
||||
|
||||
@@ -22,6 +22,7 @@ from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.replication.http.typing import ReplicationTypingRestServlet
|
||||
from synapse.replication.tcp.streams import TypingStream
|
||||
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
@@ -61,7 +62,9 @@ class FollowerTypingHandler:
|
||||
if hs.should_send_federation():
|
||||
self.federation = hs.get_federation_sender()
|
||||
|
||||
if hs.config.worker.writers.typing != hs.get_instance_name():
|
||||
self._typing_repl_client = ReplicationTypingRestServlet.make_client(hs)
|
||||
self._typing_worker = hs.config.worker.writers.typing
|
||||
if self._typing_worker != hs.get_instance_name():
|
||||
hs.get_federation_registry().register_instance_for_edu(
|
||||
"m.typing",
|
||||
hs.config.worker.writers.typing,
|
||||
@@ -199,6 +202,30 @@ class FollowerTypingHandler:
|
||||
def get_current_token(self) -> int:
|
||||
return self._latest_room_serial
|
||||
|
||||
async def started_typing(
|
||||
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
|
||||
) -> None:
|
||||
await self._typing_repl_client(
|
||||
typing=True,
|
||||
instance_name=self._typing_worker,
|
||||
user_id=target_user.to_string(),
|
||||
requester=requester,
|
||||
room_id=room_id,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
async def stopped_typing(
|
||||
self, target_user: UserID, requester: Requester, room_id: str
|
||||
) -> None:
|
||||
await self._typing_repl_client(
|
||||
typing=True,
|
||||
instance_name=self._typing_worker,
|
||||
user_id=target_user.to_string(),
|
||||
requester=requester,
|
||||
room_id=room_id,
|
||||
timeout=None,
|
||||
)
|
||||
|
||||
|
||||
class TypingWriterHandler(FollowerTypingHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
|
||||
@@ -24,6 +24,7 @@ from synapse.replication.http import (
|
||||
register,
|
||||
send_event,
|
||||
streams,
|
||||
typing,
|
||||
)
|
||||
|
||||
REPLICATION_PREFIX = "/_synapse/replication"
|
||||
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
|
||||
streams.register_servlets(hs, self)
|
||||
account_data.register_servlets(hs, self)
|
||||
push.register_servlets(hs, self)
|
||||
typing.register_servlets(hs, self)
|
||||
|
||||
# The following can't currently be instantiated on workers.
|
||||
if hs.config.worker.worker_app is None:
|
||||
|
||||
89
synapse/replication/http/typing.py
Normal file
89
synapse/replication/http/typing.py
Normal file
@@ -0,0 +1,89 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.types import Requester, UserID
|
||||
from typing import TYPE_CHECKING
|
||||
import logging
|
||||
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.replication.http._base import ReplicationEndpoint
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplicationTypingRestServlet(ReplicationEndpoint):
|
||||
"""Call to start or stop a user typing in a room.
|
||||
|
||||
Request format:
|
||||
|
||||
POST /_synapse/replication/typing/:room_id/:user_id
|
||||
|
||||
{
|
||||
"requester": ...,
|
||||
"typing": true,
|
||||
"timeout": 30000
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
NAME = "typing"
|
||||
PATH_ARGS = ("room_id", "user_id")
|
||||
CACHE = False
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.handler = hs.get_typing_handler()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload(requester, room_id, user_id, typing, timeout):
|
||||
payload = {
|
||||
"requester": requester.serialize(),
|
||||
"typing": typing,
|
||||
"timeout": timeout,
|
||||
}
|
||||
|
||||
return payload
|
||||
|
||||
async def _handle_request(self, request, room_id, user_id):
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
requester = Requester.deserialize(self.store, content["requester"])
|
||||
request.requester = requester
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
|
||||
if content["typing"]:
|
||||
await self.handler.started_typing(
|
||||
target_user,
|
||||
requester,
|
||||
room_id,
|
||||
content["timeout"],
|
||||
)
|
||||
else:
|
||||
await self.handler.stopped_typing(
|
||||
target_user,
|
||||
requester,
|
||||
room_id,
|
||||
)
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
def register_servlets(hs, http_server):
|
||||
ReplicationTypingRestServlet(hs).register(http_server)
|
||||
@@ -1254,18 +1254,11 @@ class RoomTypingRestServlet(RestServlet):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
# If we're not on the typing writer instance we should scream if we get
|
||||
# requests.
|
||||
self._is_typing_writer = (
|
||||
hs.config.worker.writers.typing == hs.get_instance_name()
|
||||
)
|
||||
self.handler = hs.get_typing_handler()
|
||||
|
||||
async def on_PUT(self, request, room_id, user_id):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
if not self._is_typing_writer:
|
||||
raise Exception("Got /typing request on instance that is not typing writer")
|
||||
|
||||
room_id = urlparse.unquote(room_id)
|
||||
target_user = UserID.from_string(urlparse.unquote(user_id))
|
||||
|
||||
@@ -1276,19 +1269,16 @@ class RoomTypingRestServlet(RestServlet):
|
||||
# Limit timeout to stop people from setting silly typing timeouts.
|
||||
timeout = min(content.get("timeout", 30000), 120000)
|
||||
|
||||
# Defer getting the typing handler since it will raise on workers.
|
||||
typing_handler = self.hs.get_typing_writer_handler()
|
||||
|
||||
try:
|
||||
if content["typing"]:
|
||||
await typing_handler.started_typing(
|
||||
await self.handler.started_typing(
|
||||
target_user=target_user,
|
||||
requester=requester,
|
||||
room_id=room_id,
|
||||
timeout=timeout,
|
||||
)
|
||||
else:
|
||||
await typing_handler.stopped_typing(
|
||||
await self.handler.stopped_typing(
|
||||
target_user=target_user, requester=requester, room_id=room_id
|
||||
)
|
||||
except ShadowBanError:
|
||||
|
||||
Reference in New Issue
Block a user