Compare commits
2 Commits
travis/fix
...
erikj/work
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a48296dd86 | ||
|
|
13f9422e38 |
@@ -319,11 +319,24 @@ effects of bursts of events from that bridge on events sent by normal users.
|
|||||||
|
|
||||||
#### Stream writers
|
#### Stream writers
|
||||||
|
|
||||||
Additionally, there is *experimental* support for moving writing of specific
|
Additionally, there is support for moving writing of specific streams (such as
|
||||||
streams (such as events) off of the main process to a particular worker. (This
|
events) off of the main process to a particular worker. (This is only supported
|
||||||
is only supported with Redis-based replication.)
|
with Redis-based replication.)
|
||||||
|
|
||||||
Currently supported streams are `events` and `typing`.
|
Currently supported streams are, and which endpoints **must** be routed to them:
|
||||||
|
* `events`
|
||||||
|
|
||||||
|
* `typing`:
|
||||||
|
* `^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/typing`
|
||||||
|
|
||||||
|
* `to_device`:
|
||||||
|
`^/_matrix/client/(api/v1|r0|unstable)/sendToDevice/`
|
||||||
|
`^/_matrix/client/(api/v1|r0|unstable)/keys/claim`
|
||||||
|
`^/_matrix/client/(api/v1|r0|unstable)/room_keys`
|
||||||
|
|
||||||
|
* `account_data`
|
||||||
|
* `receipts`
|
||||||
|
* `presence`
|
||||||
|
|
||||||
To enable this, the worker must have a HTTP replication listener configured,
|
To enable this, the worker must have a HTTP replication listener configured,
|
||||||
have a `worker_name` and be listed in the `instance_map` config. For example to
|
have a `worker_name` and be listed in the `instance_map` config. For example to
|
||||||
@@ -340,10 +353,10 @@ stream_writers:
|
|||||||
events: event_persister1
|
events: event_persister1
|
||||||
```
|
```
|
||||||
|
|
||||||
The `events` stream also experimentally supports having multiple writers, where
|
The `events` stream also supports having multiple writers, where work is sharded
|
||||||
work is sharded between them by room ID. Note that you *must* restart all worker
|
between them by room ID. Note that you *must* restart all worker instances when
|
||||||
instances when adding or removing event persisters. An example `stream_writers`
|
adding or removing event persisters. An example `stream_writers` configuration
|
||||||
configuration with multiple writers:
|
with multiple writers:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
stream_writers:
|
stream_writers:
|
||||||
@@ -352,6 +365,8 @@ stream_writers:
|
|||||||
- event_persister2
|
- event_persister2
|
||||||
```
|
```
|
||||||
|
|
||||||
|
All other streams currently only support having a single writer.
|
||||||
|
|
||||||
#### Background tasks
|
#### Background tasks
|
||||||
|
|
||||||
There is also *experimental* support for moving background tasks to a separate
|
There is also *experimental* support for moving background tasks to a separate
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ from synapse.metrics.background_process_metrics import (
|
|||||||
run_as_background_process,
|
run_as_background_process,
|
||||||
wrap_as_background_process,
|
wrap_as_background_process,
|
||||||
)
|
)
|
||||||
|
from synapse.replication.http.typing import ReplicationTypingRestServlet
|
||||||
from synapse.replication.tcp.streams import TypingStream
|
from synapse.replication.tcp.streams import TypingStream
|
||||||
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
@@ -61,7 +62,9 @@ class FollowerTypingHandler:
|
|||||||
if hs.should_send_federation():
|
if hs.should_send_federation():
|
||||||
self.federation = hs.get_federation_sender()
|
self.federation = hs.get_federation_sender()
|
||||||
|
|
||||||
if hs.config.worker.writers.typing != hs.get_instance_name():
|
self._typing_repl_client = ReplicationTypingRestServlet.make_client(hs)
|
||||||
|
self._typing_worker = hs.config.worker.writers.typing
|
||||||
|
if self._typing_worker != hs.get_instance_name():
|
||||||
hs.get_federation_registry().register_instance_for_edu(
|
hs.get_federation_registry().register_instance_for_edu(
|
||||||
"m.typing",
|
"m.typing",
|
||||||
hs.config.worker.writers.typing,
|
hs.config.worker.writers.typing,
|
||||||
@@ -199,6 +202,30 @@ class FollowerTypingHandler:
|
|||||||
def get_current_token(self) -> int:
|
def get_current_token(self) -> int:
|
||||||
return self._latest_room_serial
|
return self._latest_room_serial
|
||||||
|
|
||||||
|
async def started_typing(
|
||||||
|
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
|
||||||
|
) -> None:
|
||||||
|
await self._typing_repl_client(
|
||||||
|
typing=True,
|
||||||
|
instance_name=self._typing_worker,
|
||||||
|
user_id=target_user.to_string(),
|
||||||
|
requester=requester,
|
||||||
|
room_id=room_id,
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def stopped_typing(
|
||||||
|
self, target_user: UserID, requester: Requester, room_id: str
|
||||||
|
) -> None:
|
||||||
|
await self._typing_repl_client(
|
||||||
|
typing=True,
|
||||||
|
instance_name=self._typing_worker,
|
||||||
|
user_id=target_user.to_string(),
|
||||||
|
requester=requester,
|
||||||
|
room_id=room_id,
|
||||||
|
timeout=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TypingWriterHandler(FollowerTypingHandler):
|
class TypingWriterHandler(FollowerTypingHandler):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ from synapse.replication.http import (
|
|||||||
register,
|
register,
|
||||||
send_event,
|
send_event,
|
||||||
streams,
|
streams,
|
||||||
|
typing,
|
||||||
)
|
)
|
||||||
|
|
||||||
REPLICATION_PREFIX = "/_synapse/replication"
|
REPLICATION_PREFIX = "/_synapse/replication"
|
||||||
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
|
|||||||
streams.register_servlets(hs, self)
|
streams.register_servlets(hs, self)
|
||||||
account_data.register_servlets(hs, self)
|
account_data.register_servlets(hs, self)
|
||||||
push.register_servlets(hs, self)
|
push.register_servlets(hs, self)
|
||||||
|
typing.register_servlets(hs, self)
|
||||||
|
|
||||||
# The following can't currently be instantiated on workers.
|
# The following can't currently be instantiated on workers.
|
||||||
if hs.config.worker.worker_app is None:
|
if hs.config.worker.worker_app is None:
|
||||||
|
|||||||
89
synapse/replication/http/typing.py
Normal file
89
synapse/replication/http/typing.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from synapse.types import Requester, UserID
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from synapse.http.servlet import parse_json_object_from_request
|
||||||
|
from synapse.replication.http._base import ReplicationEndpoint
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ReplicationTypingRestServlet(ReplicationEndpoint):
|
||||||
|
"""Call to start or stop a user typing in a room.
|
||||||
|
|
||||||
|
Request format:
|
||||||
|
|
||||||
|
POST /_synapse/replication/typing/:room_id/:user_id
|
||||||
|
|
||||||
|
{
|
||||||
|
"requester": ...,
|
||||||
|
"typing": true,
|
||||||
|
"timeout": 30000
|
||||||
|
}
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
NAME = "typing"
|
||||||
|
PATH_ARGS = ("room_id", "user_id")
|
||||||
|
CACHE = False
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
super().__init__(hs)
|
||||||
|
|
||||||
|
self.handler = hs.get_typing_handler()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def _serialize_payload(requester, room_id, user_id, typing, timeout):
|
||||||
|
payload = {
|
||||||
|
"requester": requester.serialize(),
|
||||||
|
"typing": typing,
|
||||||
|
"timeout": timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload
|
||||||
|
|
||||||
|
async def _handle_request(self, request, room_id, user_id):
|
||||||
|
content = parse_json_object_from_request(request)
|
||||||
|
|
||||||
|
requester = Requester.deserialize(self.store, content["requester"])
|
||||||
|
request.requester = requester
|
||||||
|
|
||||||
|
target_user = UserID.from_string(user_id)
|
||||||
|
|
||||||
|
if content["typing"]:
|
||||||
|
await self.handler.started_typing(
|
||||||
|
target_user,
|
||||||
|
requester,
|
||||||
|
room_id,
|
||||||
|
content["timeout"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await self.handler.stopped_typing(
|
||||||
|
target_user,
|
||||||
|
requester,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
return 200, {}
|
||||||
|
|
||||||
|
|
||||||
|
def register_servlets(hs, http_server):
|
||||||
|
ReplicationTypingRestServlet(hs).register(http_server)
|
||||||
@@ -1254,18 +1254,11 @@ class RoomTypingRestServlet(RestServlet):
|
|||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
|
|
||||||
# If we're not on the typing writer instance we should scream if we get
|
self.handler = hs.get_typing_handler()
|
||||||
# requests.
|
|
||||||
self._is_typing_writer = (
|
|
||||||
hs.config.worker.writers.typing == hs.get_instance_name()
|
|
||||||
)
|
|
||||||
|
|
||||||
async def on_PUT(self, request, room_id, user_id):
|
async def on_PUT(self, request, room_id, user_id):
|
||||||
requester = await self.auth.get_user_by_req(request)
|
requester = await self.auth.get_user_by_req(request)
|
||||||
|
|
||||||
if not self._is_typing_writer:
|
|
||||||
raise Exception("Got /typing request on instance that is not typing writer")
|
|
||||||
|
|
||||||
room_id = urlparse.unquote(room_id)
|
room_id = urlparse.unquote(room_id)
|
||||||
target_user = UserID.from_string(urlparse.unquote(user_id))
|
target_user = UserID.from_string(urlparse.unquote(user_id))
|
||||||
|
|
||||||
@@ -1276,19 +1269,16 @@ class RoomTypingRestServlet(RestServlet):
|
|||||||
# Limit timeout to stop people from setting silly typing timeouts.
|
# Limit timeout to stop people from setting silly typing timeouts.
|
||||||
timeout = min(content.get("timeout", 30000), 120000)
|
timeout = min(content.get("timeout", 30000), 120000)
|
||||||
|
|
||||||
# Defer getting the typing handler since it will raise on workers.
|
|
||||||
typing_handler = self.hs.get_typing_writer_handler()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if content["typing"]:
|
if content["typing"]:
|
||||||
await typing_handler.started_typing(
|
await self.handler.started_typing(
|
||||||
target_user=target_user,
|
target_user=target_user,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await typing_handler.stopped_typing(
|
await self.handler.stopped_typing(
|
||||||
target_user=target_user, requester=requester, room_id=room_id
|
target_user=target_user, requester=requester, room_id=room_id
|
||||||
)
|
)
|
||||||
except ShadowBanError:
|
except ShadowBanError:
|
||||||
|
|||||||
Reference in New Issue
Block a user