Store room version on invite (#6983)
* commit '3e99528f2': Store room version on invite (#6983)
This commit is contained in:
1
changelog.d/6983.misc
Normal file
1
changelog.d/6983.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactoring work in preparation for changing the event redaction algorithm.
|
||||
@@ -60,6 +60,7 @@ from synapse.replication.http.devices import ReplicationUserDevicesResyncRestSer
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationCleanRoomRestServlet,
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
ReplicationStoreRoomOnInviteRestServlet,
|
||||
)
|
||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
@@ -160,8 +161,12 @@ class FederationHandler(BaseHandler):
|
||||
self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client(
|
||||
hs
|
||||
)
|
||||
self._maybe_store_room_on_invite = ReplicationStoreRoomOnInviteRestServlet.make_client(
|
||||
hs
|
||||
)
|
||||
else:
|
||||
self._device_list_updater = hs.get_device_handler().device_list_updater
|
||||
self._maybe_store_room_on_invite = self.store.maybe_store_room_on_invite
|
||||
|
||||
# When joining a room we need to queue any events for that room up
|
||||
self.room_queues = {}
|
||||
@@ -1548,6 +1553,13 @@ class FederationHandler(BaseHandler):
|
||||
if event.state_key == self._server_notices_mxid:
|
||||
raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
|
||||
|
||||
# keep a record of the room version, if we don't yet know it.
|
||||
# (this may get overwritten if we later get a different room version in a
|
||||
# join dance).
|
||||
await self._maybe_store_room_on_invite(
|
||||
room_id=event.room_id, room_version=room_version
|
||||
)
|
||||
|
||||
event.internal_metadata.outlier = True
|
||||
event.internal_metadata.out_of_band_membership = True
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ class ReplicationEndpoint(object):
|
||||
"""Helper base class for defining new replication HTTP endpoints.
|
||||
|
||||
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
|
||||
(with an `/:txn_id` prefix for cached requests.), where NAME is a name,
|
||||
(with a `/:txn_id` suffix for cached requests), where NAME is a name,
|
||||
PATH_ARGS are a tuple of parameters to be encoded in the URL.
|
||||
|
||||
For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import event_type_from_format_version
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
@@ -211,7 +212,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
|
||||
|
||||
Request format:
|
||||
|
||||
POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
|
||||
POST /_synapse/replication/fed_cleanup_room/:room_id/:txn_id
|
||||
|
||||
{}
|
||||
"""
|
||||
@@ -238,8 +239,41 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
|
||||
return 200, {}
|
||||
|
||||
|
||||
class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
|
||||
"""Called to clean up any data in DB for a given room, ready for the
|
||||
server to join the room.
|
||||
|
||||
Request format:
|
||||
|
||||
POST /_synapse/replication/store_room_on_invite/:room_id/:txn_id
|
||||
|
||||
{
|
||||
"room_version": "1",
|
||||
}
|
||||
"""
|
||||
|
||||
NAME = "store_room_on_invite"
|
||||
PATH_ARGS = ("room_id",)
|
||||
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@staticmethod
|
||||
def _serialize_payload(room_id, room_version):
|
||||
return {"room_version": room_version.identifier}
|
||||
|
||||
async def _handle_request(self, request, room_id):
|
||||
content = parse_json_object_from_request(request)
|
||||
room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
|
||||
await self.store.maybe_store_room_on_invite(room_id, room_version)
|
||||
return 200, {}
|
||||
|
||||
|
||||
def register_servlets(hs, http_server):
|
||||
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
|
||||
ReplicationFederationSendEduRestServlet(hs).register(http_server)
|
||||
ReplicationGetQueryRestServlet(hs).register(http_server)
|
||||
ReplicationCleanRoomRestServlet(hs).register(http_server)
|
||||
ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -1043,6 +1043,26 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||
logger.error("store_room with room_id=%s failed: %s", room_id, e)
|
||||
raise StoreError(500, "Problem creating room.")
|
||||
|
||||
async def maybe_store_room_on_invite(self, room_id: str, room_version: RoomVersion):
|
||||
"""
|
||||
When we receive an invite over federation, store the version of the room if we
|
||||
don't already know the room version.
|
||||
"""
|
||||
await self.db.simple_upsert(
|
||||
desc="maybe_store_room_on_invite",
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
values={},
|
||||
insertion_values={
|
||||
"room_version": room_version.identifier,
|
||||
"is_public": False,
|
||||
"creator": "",
|
||||
},
|
||||
# rooms has a unique constraint on room_id, so no need to lock when doing an
|
||||
# emulated upsert.
|
||||
lock=False,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_room_is_public(self, room_id, is_public):
|
||||
def set_room_is_public_txn(txn, next_id):
|
||||
|
||||
@@ -29,6 +29,14 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase):
|
||||
)
|
||||
return hs
|
||||
|
||||
def default_config(self, name="test"):
|
||||
conf = super().default_config(name)
|
||||
# we're using FederationReaderServer, which uses a SlavedStore, so we
|
||||
# have to tell the FederationHandler not to try to access stuff that is only
|
||||
# in the primary store.
|
||||
conf["worker_app"] = "yes"
|
||||
return conf
|
||||
|
||||
@parameterized.expand(
|
||||
[
|
||||
(["federation"], "auth_fail"),
|
||||
|
||||
@@ -74,6 +74,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||
"set_received_txn_response",
|
||||
"get_destination_retry_timings",
|
||||
"get_devices_by_remote",
|
||||
"maybe_store_room_on_invite",
|
||||
# Bits that user_directory needs
|
||||
"get_user_directory_stream_pos",
|
||||
"get_current_state_deltas",
|
||||
|
||||
Reference in New Issue
Block a user