Compare commits

...

4 Commits

Author SHA1 Message Date
Hugh Nimmo-Smith
82fcdc5eef Changelog 2025-06-23 14:50:08 +01:00
Hugh Nimmo-Smith
2cfc292c77 Allow sending server notices via Module API 2025-06-23 14:50:08 +01:00
Hugh Nimmo-Smith
7ba64b6caf Changelog 2025-06-23 14:49:06 +01:00
Hugh Nimmo-Smith
68699d5338 Support workers sending server notices 2025-06-23 14:49:05 +01:00
12 changed files with 257 additions and 80 deletions

1
changelog.d/18569.misc Normal file
View File

@@ -0,0 +1 @@
Allow worker processes to send server notices.

1
changelog.d/18570.feat Normal file
View File

@@ -0,0 +1 @@
Support modules sending server notices.

View File

@@ -1892,6 +1892,33 @@ class ModuleApi:
"""Returns the current server time in milliseconds."""
return self._clock.time_msec()
async def send_server_notice(
self, user_id: str, type: str, event_content: JsonDict
) -> JsonDict:
"""Send a server notice to a user.
Added in Synapse v1.133.0.
Args:
user_id: The full user ID to send the server notice to. This must be a user
local to this homeserver.
type: The type of event to send. e.g. m.room.message
event_content: A dictionary representing the content of the event to send.
Returns:
The event that was sent. If state event deduplication happened, then
the previous, duplicate event instead.
Raises:
SynapseError if the event was not allowed.
"""
# Create and send the notice
event = await self._hs.get_server_notices_manager().send_notice(
user_id=user_id, event_content=event_content, type=type
)
return event
class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room

View File

@@ -1,7 +1,7 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023-2024 New Vector, Ltd
# Copyright (C) 2023-2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -33,6 +33,7 @@ from synapse.replication.http import (
register,
send_event,
send_events,
server_notices,
state,
streams,
)
@@ -66,3 +67,4 @@ class ReplicationRestResource(JsonResource):
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
delayed_events.register_servlets(hs, self)
server_notices.register_servlets(hs, self)

View File

@@ -0,0 +1,80 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
import logging
from typing import TYPE_CHECKING, Optional, Tuple
from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReplicationSendServerNoticeServlet(ReplicationEndpoint):
"""Send a server notice to a user"""
NAME = "send_server_notice"
PATH_ARGS = ()
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_notices_manager = hs.get_server_notices_manager()
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
event_content: dict,
type: str,
state_key: Optional[str] = None,
txn_id: Optional[str] = None,
) -> JsonDict:
"""
Args:
user_id: mxid of user to send event to.
event_content: content of event to send
type: type of event
state_key: the state key for the event, if it is a state event
txn_id: the transaction ID
"""
return {
"user_id": user_id,
"event_content": event_content,
"type": type,
"state_key": state_key,
"txn_id": txn_id,
}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, JsonDict]:
event = await self.server_notices_manager.send_notice(
user_id=content["user_id"],
event_content=content["event_content"],
type=content["type"],
state_key=content["state_key"],
txn_id=content["txn_id"],
)
return 200, event
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationSendServerNoticeServlet(hs).register(http_server)

View File

@@ -88,9 +88,6 @@ class SendServerNoticeServlet(RestServlet):
event_type = body.get("type", EventTypes.Message)
state_key = body.get("state_key")
# We grab the server notices manager here as its initialisation has a check for worker processes,
# but worker processes still need to initialise SendServerNoticeServlet (as it is part of the
# admin api).
if not self.server_notices_manager.is_enabled():
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Server notices are not enabled on this server"
@@ -113,7 +110,7 @@ class SendServerNoticeServlet(RestServlet):
txn_id=txn_id,
)
return HTTPStatus.OK, {"event_id": event.event_id}
return HTTPStatus.OK, {"event_id": event["event_id"]}
async def on_POST(
self,

View File

@@ -142,6 +142,9 @@ from synapse.replication.tcp.streams import STREAMS_MAP, Stream
from synapse.rest.media.media_repository_resource import MediaRepositoryResource
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_manager import (
WorkerServerNoticesManager,
)
from synapse.server_notices.worker_server_notices_sender import (
WorkerServerNoticesSender,
)
@@ -758,9 +761,9 @@ class HomeServer(metaclass=abc.ABCMeta):
return FederationHandlerRegistry(self)
@cache_in_self
def get_server_notices_manager(self) -> ServerNoticesManager:
def get_server_notices_manager(self) -> WorkerServerNoticesManager:
if self.config.worker.worker_app:
raise Exception("Workers cannot send server notices")
return WorkerServerNoticesManager(self)
return ServerNoticesManager(self)
@cache_in_self

View File

@@ -165,7 +165,7 @@ class ResourceLimitsServerNotices:
user_id, content, EventTypes.Message
)
content = {"pinned": [event.event_id]}
content = {"pinned": [event["event_id"]]}
await self._server_notices_manager.send_notice(
user_id, content, EventTypes.Pinned, ""
)

View File

@@ -21,7 +21,9 @@ import logging
from typing import TYPE_CHECKING, Optional
from synapse.api.constants import EventTypes, Membership, RoomCreationPreset
from synapse.events import EventBase
from synapse.server_notices.worker_server_notices_manager import (
WorkerServerNoticesManager,
)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, create_requester
from synapse.util.caches.descriptors import cached
@@ -33,9 +35,9 @@ logger = logging.getLogger(__name__)
SERVER_NOTICE_ROOM_TAG = "m.server_notice"
class ServerNoticesManager:
class ServerNoticesManager(WorkerServerNoticesManager):
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastores().main
super().__init__(hs)
self._config = hs.config
self._account_data_handler = hs.get_account_data_handler()
self._room_creation_handler = hs.get_room_creation_handler()
@@ -47,11 +49,6 @@ class ServerNoticesManager:
self._server_name = hs.hostname
self._notifier = hs.get_notifier()
self.server_notices_mxid = self._config.servernotices.server_notices_mxid
def is_enabled(self) -> bool:
"""Checks if server notices are enabled on this server."""
return self.server_notices_mxid is not None
async def send_notice(
self,
@@ -60,7 +57,7 @@ class ServerNoticesManager:
type: str = EventTypes.Message,
state_key: Optional[str] = None,
txn_id: Optional[str] = None,
) -> EventBase:
) -> JsonDict:
"""Send a notice to the given user
Creates the server notices room, if none exists.
@@ -69,15 +66,16 @@ class ServerNoticesManager:
user_id: mxid of user to send event to.
event_content: content of event to send
type: type of event
is_state_event: Is the event a state event
state_key: the state key for the event, if it is a state event
txn_id: The transaction ID.
"""
room_id = await self.get_or_create_notice_room_for_user(user_id)
await self.maybe_invite_user_to_room(user_id, room_id)
room_id = await self._get_or_create_notice_room_for_user(user_id)
await self._maybe_invite_user_to_room(user_id, room_id)
assert self.server_notices_mxid is not None
assert self._server_notices_mxid is not None
requester = create_requester(
self.server_notices_mxid, authenticated_entity=self._server_name
self._server_notices_mxid,
authenticated_entity=self._server_name,
)
logger.info("Sending server notice to %s", user_id)
@@ -85,7 +83,7 @@ class ServerNoticesManager:
event_dict = {
"type": type,
"room_id": room_id,
"sender": self.server_notices_mxid,
"sender": self._server_notices_mxid,
"content": event_content,
}
@@ -95,45 +93,10 @@ class ServerNoticesManager:
event, _ = await self._event_creation_handler.create_and_send_nonmember_event(
requester, event_dict, ratelimit=False, txn_id=txn_id
)
return event
return event.get_dict()
@cached()
async def maybe_get_notice_room_for_user(self, user_id: str) -> Optional[str]:
"""Try to look up the server notice room for this user if it exists.
Does not create one if none can be found.
Args:
user_id: the user we want a server notice room for.
Returns:
The room's ID, or None if no room could be found.
"""
# If there is no server notices MXID, then there is no server notices room
if self.server_notices_mxid is None:
return None
rooms = await self._store.get_rooms_for_local_user_where_membership_is(
user_id, [Membership.INVITE, Membership.JOIN]
)
for room in rooms:
# it's worth noting that there is an asymmetry here in that we
# expect the user to be invited or joined, but the system user must
# be joined. This is kinda deliberate, in that if somebody somehow
# manages to invite the system user to a room, that doesn't make it
# the server notices room.
is_server_notices_room = await self._store.check_local_user_in_room(
user_id=self.server_notices_mxid, room_id=room.room_id
)
if is_server_notices_room:
# we found a room which our user shares with the system notice
# user
return room.room_id
return None
@cached()
async def get_or_create_notice_room_for_user(self, user_id: str) -> str:
async def _get_or_create_notice_room_for_user(self, user_id: str) -> str:
"""Get the room for notices for a given user
If we have not yet created a notice room for this user, create it, but don't
@@ -145,13 +108,13 @@ class ServerNoticesManager:
Returns:
room id of notice room.
"""
if self.server_notices_mxid is None:
if self._server_notices_mxid is None:
raise Exception("Server notices not enabled")
assert self._is_mine_id(user_id), "Cannot send server notices to remote users"
requester = create_requester(
self.server_notices_mxid, authenticated_entity=self._server_name
self._server_notices_mxid, authenticated_entity=self._server_name
)
room_id = await self.maybe_get_notice_room_for_user(user_id)
@@ -246,7 +209,7 @@ class ServerNoticesManager:
logger.info("Created server notices room %s for %s", room_id, user_id)
return room_id
async def maybe_invite_user_to_room(self, user_id: str, room_id: str) -> None:
async def _maybe_invite_user_to_room(self, user_id: str, room_id: str) -> None:
"""Invite the given user to the given server room, unless the user has already
joined or been invited to it.
@@ -254,9 +217,9 @@ class ServerNoticesManager:
user_id: The ID of the user to invite.
room_id: The ID of the room to invite the user to.
"""
assert self.server_notices_mxid is not None
assert self._server_notices_mxid is not None
requester = create_requester(
self.server_notices_mxid, authenticated_entity=self._server_name
self._server_notices_mxid, authenticated_entity=self._server_name
)
# Check whether the user has already joined or been invited to this room. If
@@ -307,13 +270,13 @@ class ServerNoticesManager:
"""
logger.debug("Checking whether notice user profile has changed for %s", room_id)
assert self.server_notices_mxid is not None
assert self._server_notices_mxid is not None
notice_user_data_in_room = (
await self._storage_controllers.state.get_current_state_event(
room_id,
EventTypes.Member,
self.server_notices_mxid,
self._server_notices_mxid,
)
)
@@ -327,7 +290,7 @@ class ServerNoticesManager:
logger.info("Updating notice user profile in room %s", room_id)
await self._room_member_handler.update_membership(
requester=requester,
target=UserID.from_string(self.server_notices_mxid),
target=UserID.from_string(self._server_notices_mxid),
room_id=room_id,
action="join",
ratelimit=False,

View File

@@ -0,0 +1,101 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
import logging
from typing import TYPE_CHECKING, Optional
from synapse.api.constants import EventTypes, Membership
from synapse.events import JsonDict
from synapse.replication.http.server_notices import (
ReplicationSendServerNoticeServlet,
)
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class WorkerServerNoticesManager:
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastores().main
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
self._send_server_notice = ReplicationSendServerNoticeServlet.make_client(hs)
def is_enabled(self) -> bool:
"""Checks if server notices are enabled on this server."""
return self._server_notices_mxid is not None
async def send_notice(
self,
user_id: str,
event_content: dict,
type: str = EventTypes.Message,
state_key: Optional[str] = None,
txn_id: Optional[str] = None,
) -> JsonDict:
"""Send a notice to the given user
Creates the server notices room, if none exists.
Args:
user_id: mxid of user to send event to.
event_content: content of event to send
type: type of event
is_state_event: Is the event a state event
txn_id: The transaction ID.
"""
return await self._send_server_notice(
user_id=user_id,
event_content=event_content,
type=type,
state_key=state_key,
txn_id=txn_id,
)
@cached()
async def maybe_get_notice_room_for_user(self, user_id: str) -> Optional[str]:
"""Try to look up the server notice room for this user if it exists.
Does not create one if none can be found.
Args:
user_id: the user we want a server notice room for.
Returns:
The room's ID, or None if no room could be found.
"""
# If there is no server notices MXID, then there is no server notices room
if self._server_notices_mxid is None:
return None
rooms = await self._store.get_rooms_for_local_user_where_membership_is(
user_id, [Membership.INVITE, Membership.JOIN]
)
for room in rooms:
# it's worth noting that there is an asymmetry here in that we
# expect the user to be invited or joined, but the system user must
# be joined. This is kinda deliberate, in that if somebody somehow
# manages to invite the system user to a room, that doesn't make it
# the server notices room.
is_server_notices_room = await self._store.check_local_user_in_room(
user_id=self._server_notices_mxid, room_id=room.room_id
)
if is_server_notices_room:
# we found a room which our user shares with the system notice
# user
return room.room_id
return None

View File

@@ -18,7 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import List, Sequence
from typing import List, Sequence, cast
from twisted.test.proto_helpers import MemoryReactor
@@ -26,6 +26,7 @@ import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict
from synapse.util import Clock
@@ -47,7 +48,9 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.store = hs.get_datastores().main
self.room_shutdown_handler = hs.get_room_shutdown_handler()
self.pagination_handler = hs.get_pagination_handler()
self.server_notices_manager = self.hs.get_server_notices_manager()
self.server_notices_manager = cast(
ServerNoticesManager, self.hs.get_server_notices_manager()
)
# Create user
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -276,7 +279,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertEqual(messages[0]["sender"], "@notices:test")
# invalidate cache of server notices room_ids
self.server_notices_manager.get_or_create_notice_room_for_user.invalidate_all()
self.server_notices_manager._get_or_create_notice_room_for_user.invalidate_all()
# send second message
channel = self.make_request(
@@ -351,7 +354,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
# invalidate cache of server notices room_ids
# if server tries to send to a cached room_id the user gets the message
# in old room
self.server_notices_manager.get_or_create_notice_room_for_user.invalidate_all()
self.server_notices_manager._get_or_create_notice_room_for_user.invalidate_all()
# send second message
channel = self.make_request(
@@ -451,7 +454,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
# invalidate cache of server notices room_ids
# if server tries to send to a cached room_id it gives an error
self.server_notices_manager.get_or_create_notice_room_for_user.invalidate_all()
self.server_notices_manager._get_or_create_notice_room_for_user.invalidate_all()
# send second message
channel = self.make_request(
@@ -532,7 +535,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
# simulate a change in server config after a server restart.
new_display_name = "new display name"
self.server_notices_manager._config.servernotices.server_notices_mxid_display_name = new_display_name
self.server_notices_manager.get_or_create_notice_room_for_user.cache.invalidate_all()
self.server_notices_manager._get_or_create_notice_room_for_user.cache.invalidate_all()
self.make_request(
"POST",
@@ -576,7 +579,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
# simulate a change in server config after a server restart.
new_avatar_url = "test/new-url"
self.server_notices_manager._config.servernotices.server_notices_mxid_avatar_url = new_avatar_url
self.server_notices_manager.get_or_create_notice_room_for_user.cache.invalidate_all()
self.server_notices_manager._get_or_create_notice_room_for_user.cache.invalidate_all()
self.make_request(
"POST",
@@ -689,7 +692,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
# simulate a change in server config after a server restart.
new_avatar_url = "test/new-url"
self.server_notices_manager._config.servernotices.server_notices_room_avatar_url = new_avatar_url
self.server_notices_manager.get_or_create_notice_room_for_user.cache.invalidate_all()
self.server_notices_manager._get_or_create_notice_room_for_user.cache.invalidate_all()
self.make_request(
"POST",

View File

@@ -82,9 +82,6 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase):
self.user_id = "@user_id:test"
self._rlsn._server_notices_manager.get_or_create_notice_room_for_user = (
AsyncMock(return_value="!something:localhost")
)
self._rlsn._server_notices_manager.maybe_get_notice_room_for_user = AsyncMock(
return_value="!something:localhost"
)
@@ -297,9 +294,11 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
# Now lets get the last load of messages in the service notice room and
# check that there is only one server notice
room_id = self.get_success(
self.server_notices_manager.get_or_create_notice_room_for_user(self.user_id)
self.server_notices_manager.maybe_get_notice_room_for_user(self.user_id)
)
assert room_id is not None, "No server notices room found"
token = self.event_source.get_current_token()
events, _ = self.get_success(
self.store.get_recent_events_for_room(