Compare commits
8 Commits
v1.41.0
...
dmr/missin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ceb29d4e0f | ||
|
|
bec01c0758 | ||
|
|
6e613a10d0 | ||
|
|
964f29cb6f | ||
|
|
6a5f8fbcda | ||
|
|
430241a1e9 | ||
|
|
1a9f531c79 | ||
|
|
272b89d547 |
1
changelog.d/10524.feature
Normal file
1
changelog.d/10524.feature
Normal file
@@ -0,0 +1 @@
|
||||
Port the PresenceRouter module interface to the new generic interface.
|
||||
1
changelog.d/10561.bugfix
Normal file
1
changelog.d/10561.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Display an error on User-Interactive Authentication fallback pages when authentication fails. Contributed by Callum Brown.
|
||||
1
changelog.d/10614.misc
Normal file
1
changelog.d/10614.misc
Normal file
@@ -0,0 +1 @@
|
||||
Clean up some of the federation event authentication code for clarity.
|
||||
1
changelog.d/10615.misc
Normal file
1
changelog.d/10615.misc
Normal file
@@ -0,0 +1 @@
|
||||
Clean up some of the federation event authentication code for clarity.
|
||||
1
changelog.d/10629.misc
Normal file
1
changelog.d/10629.misc
Normal file
@@ -0,0 +1 @@
|
||||
Convert room member storage tuples to `attrs` classes.
|
||||
1
changelog.d/10630.misc
Normal file
1
changelog.d/10630.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use auto-attribs for the attrs classes used in sync.
|
||||
1
changelog.d/8830.removal
Normal file
1
changelog.d/8830.removal
Normal file
@@ -0,0 +1 @@
|
||||
Remove deprecated Shutdown Room and Purge Room Admin API.
|
||||
@@ -51,12 +51,10 @@
|
||||
- [Event Reports](admin_api/event_reports.md)
|
||||
- [Media](admin_api/media_admin_api.md)
|
||||
- [Purge History](admin_api/purge_history_api.md)
|
||||
- [Purge Rooms](admin_api/purge_room.md)
|
||||
- [Register Users](admin_api/register_api.md)
|
||||
- [Manipulate Room Membership](admin_api/room_membership.md)
|
||||
- [Rooms](admin_api/rooms.md)
|
||||
- [Server Notices](admin_api/server_notices.md)
|
||||
- [Shutdown Room](admin_api/shutdown_room.md)
|
||||
- [Statistics](admin_api/statistics.md)
|
||||
- [Users](admin_api/user_admin_api.md)
|
||||
- [Server Version](admin_api/version_api.md)
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
Deprecated: Purge room API
|
||||
==========================
|
||||
|
||||
**The old Purge room API is deprecated and will be removed in a future release.
|
||||
See the new [Delete Room API](rooms.md#delete-room-api) for more details.**
|
||||
|
||||
This API will remove all trace of a room from your database.
|
||||
|
||||
All local users must have left the room before it can be removed.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/purge_room
|
||||
|
||||
{
|
||||
"room_id": "!room:id"
|
||||
}
|
||||
```
|
||||
|
||||
You must authenticate using the access token of an admin user.
|
||||
@@ -1,102 +0,0 @@
|
||||
# Deprecated: Shutdown room API
|
||||
|
||||
**The old Shutdown room API is deprecated and will be removed in a future release.
|
||||
See the new [Delete Room API](rooms.md#delete-room-api) for more details.**
|
||||
|
||||
Shuts down a room, preventing new joins and moves local users and room aliases automatically
|
||||
to a new room. The new room will be created with the user specified by the
|
||||
`new_room_user_id` parameter as room administrator and will contain a message
|
||||
explaining what happened. Users invited to the new room will have power level
|
||||
-10 by default, and thus be unable to speak. The old room's power levels will be changed to
|
||||
disallow any further invites or joins.
|
||||
|
||||
The local server will only have the power to move local user and room aliases to
|
||||
the new room. Users on other servers will be unaffected.
|
||||
|
||||
## API
|
||||
|
||||
You will need to authenticate with an access token for an admin user.
|
||||
|
||||
### URL
|
||||
|
||||
`POST /_synapse/admin/v1/shutdown_room/{room_id}`
|
||||
|
||||
### URL Parameters
|
||||
|
||||
* `room_id` - The ID of the room (e.g `!someroom:example.com`)
|
||||
|
||||
### JSON Body Parameters
|
||||
|
||||
* `new_room_user_id` - Required. A string representing the user ID of the user that will admin
|
||||
the new room that all users in the old room will be moved to.
|
||||
* `room_name` - Optional. A string representing the name of the room that new users will be
|
||||
invited to.
|
||||
* `message` - Optional. A string containing the first message that will be sent as
|
||||
`new_room_user_id` in the new room. Ideally this will clearly convey why the
|
||||
original room was shut down.
|
||||
|
||||
If not specified, the default value of `room_name` is "Content Violation
|
||||
Notification". The default value of `message` is "Sharing illegal content on
|
||||
othis server is not permitted and rooms in violation will be blocked."
|
||||
|
||||
### Response Parameters
|
||||
|
||||
* `kicked_users` - An integer number representing the number of users that
|
||||
were kicked.
|
||||
* `failed_to_kick_users` - An integer number representing the number of users
|
||||
that were not kicked.
|
||||
* `local_aliases` - An array of strings representing the local aliases that were migrated from
|
||||
the old room to the new.
|
||||
* `new_room_id` - A string representing the room ID of the new room.
|
||||
|
||||
## Example
|
||||
|
||||
Request:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/shutdown_room/!somebadroom%3Aexample.com
|
||||
|
||||
{
|
||||
"new_room_user_id": "@someuser:example.com",
|
||||
"room_name": "Content Violation Notification",
|
||||
"message": "Bad Room has been shutdown due to content violations on this server. Please review our Terms of Service."
|
||||
}
|
||||
```
|
||||
|
||||
Response:
|
||||
|
||||
```
|
||||
{
|
||||
"kicked_users": 5,
|
||||
"failed_to_kick_users": 0,
|
||||
"local_aliases": ["#badroom:example.com", "#evilsaloon:example.com],
|
||||
"new_room_id": "!newroomid:example.com",
|
||||
},
|
||||
```
|
||||
|
||||
## Undoing room shutdowns
|
||||
|
||||
*Note*: This guide may be outdated by the time you read it. By nature of room shutdowns being performed at the database level,
|
||||
the structure can and does change without notice.
|
||||
|
||||
First, it's important to understand that a room shutdown is very destructive. Undoing a shutdown is not as simple as pretending it
|
||||
never happened - work has to be done to move forward instead of resetting the past. In fact, in some cases it might not be possible
|
||||
to recover at all:
|
||||
|
||||
* If the room was invite-only, your users will need to be re-invited.
|
||||
* If the room no longer has any members at all, it'll be impossible to rejoin.
|
||||
* The first user to rejoin will have to do so via an alias on a different server.
|
||||
|
||||
With all that being said, if you still want to try and recover the room:
|
||||
|
||||
1. For safety reasons, shut down Synapse.
|
||||
2. In the database, run `DELETE FROM blocked_rooms WHERE room_id = '!example:example.org';`
|
||||
* For caution: it's recommended to run this in a transaction: `BEGIN; DELETE ...;`, verify you got 1 result, then `COMMIT;`.
|
||||
* The room ID is the same one supplied to the shutdown room API, not the Content Violation room.
|
||||
3. Restart Synapse.
|
||||
|
||||
You will have to manually handle, if you so choose, the following:
|
||||
|
||||
* Aliases that would have been redirected to the Content Violation room.
|
||||
* Users that would have been booted from the room (and will have been force-joined to the Content Violation room).
|
||||
* Removal of the Content Violation room if desired.
|
||||
@@ -282,6 +282,52 @@ the request is a server admin.
|
||||
Modules can modify the `request_content` (by e.g. adding events to its `initial_state`),
|
||||
or deny the room's creation by raising a `module_api.errors.SynapseError`.
|
||||
|
||||
#### Presence router callbacks
|
||||
|
||||
Presence router callbacks allow module developers to specify additional users (local or remote)
|
||||
to receive certain presence updates from local users. Presence router callbacks can be
|
||||
registered using the module API's `register_presence_router_callbacks` method.
|
||||
|
||||
The available presence router callbacks are:
|
||||
|
||||
```python
|
||||
async def get_users_for_states(
|
||||
self,
|
||||
state_updates: Iterable["synapse.api.UserPresenceState"],
|
||||
) -> Dict[str, Set["synapse.api.UserPresenceState"]]:
|
||||
```
|
||||
**Requires** `get_interested_users` to also be registered
|
||||
|
||||
Called when processing updates to the presence state of one or more users. This callback can
|
||||
be used to instruct the server to forward that presence state to specific users. The module
|
||||
must return a dictionary that maps from Matrix user IDs (which can be local or remote) to the
|
||||
`UserPresenceState` changes that they should be forwarded.
|
||||
|
||||
Synapse will then attempt to send the specified presence updates to each user when possible.
|
||||
|
||||
```python
|
||||
async def get_interested_users(
|
||||
self,
|
||||
user_id: str
|
||||
) -> Union[Set[str], "synapse.module_api.PRESENCE_ALL_USERS"]
|
||||
```
|
||||
**Requires** `get_users_for_states` to also be registered
|
||||
|
||||
Called when determining which users someone should be able to see the presence state of. This
|
||||
callback should return complementary results to `get_users_for_state` or the presence information
|
||||
may not be properly forwarded.
|
||||
|
||||
The callback is given the Matrix user ID for a local user that is requesting presence data and
|
||||
should return the Matrix user IDs of the users whose presence state they are allowed to
|
||||
query. The returned users can be local or remote.
|
||||
|
||||
Alternatively the callback can return `synapse.module_api.PRESENCE_ALL_USERS`
|
||||
to indicate that the user should receive updates from all known users.
|
||||
|
||||
For example, if the user `@alice:example.org` is passed to this method, and the Set
|
||||
`{"@bob:example.com", "@charlie:somewhere.org"}` is returned, this signifies that Alice
|
||||
should receive presence updates sent by Bob and Charlie, regardless of whether these users
|
||||
share a room.
|
||||
|
||||
### Porting an existing module that uses the old interface
|
||||
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
<h2 style="color:red">
|
||||
This page of the Synapse documentation is now deprecated. For up to date
|
||||
documentation on setting up or writing a presence router module, please see
|
||||
<a href="modules.md">this page</a>.
|
||||
</h2>
|
||||
|
||||
# Presence Router Module
|
||||
|
||||
Synapse supports configuring a module that can specify additional users
|
||||
|
||||
@@ -108,20 +108,6 @@ presence:
|
||||
#
|
||||
#enabled: false
|
||||
|
||||
# Presence routers are third-party modules that can specify additional logic
|
||||
# to where presence updates from users are routed.
|
||||
#
|
||||
presence_router:
|
||||
# The custom module's class. Uncomment to use a custom presence router module.
|
||||
#
|
||||
#module: "my_custom_router.PresenceRouter"
|
||||
|
||||
# Configuration options of the custom module. Refer to your module's
|
||||
# documentation for available options.
|
||||
#
|
||||
#config:
|
||||
# example_option: 'something'
|
||||
|
||||
# Whether to require authentication to retrieve profile data (avatars,
|
||||
# display names) of other users through the client API. Defaults to
|
||||
# 'false'. Note that profile data is also available via the federation
|
||||
|
||||
@@ -85,6 +85,19 @@ process, for example:
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
|
||||
# Upgrading to v1.xx.0
|
||||
|
||||
## Removal of old Room Admin API
|
||||
|
||||
The following admin APIs were deprecated in [Synapse 1.25](https://github.com/matrix-org/synapse/blob/v1.25.0/CHANGES.md#removal-warning)
|
||||
(released on 2021-01-13) and have now been removed:
|
||||
|
||||
- `POST /_synapse/admin/v1/purge_room`
|
||||
- `POST /_synapse/admin/v1/shutdown_room/<room_id>`
|
||||
|
||||
Any scripts still using the above APIs should be converted to use the
|
||||
[Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api).
|
||||
|
||||
|
||||
# Upgrading to v1.xx.0
|
||||
|
||||
@@ -112,6 +125,14 @@ environment variable.
|
||||
See [using a forward proxy with Synapse documentation](setup/forward_proxy.md) for
|
||||
details.
|
||||
|
||||
## User-interactive authentication fallback templates can now display errors
|
||||
|
||||
This may affect you if you make use of custom HTML templates for the
|
||||
[reCAPTCHA](../synapse/res/templates/recaptcha.html) or
|
||||
[terms](../synapse/res/templates/terms.html) fallback pages.
|
||||
|
||||
The template is now provided an `error` variable if the authentication
|
||||
process failed. See the default templates linked above for an example.
|
||||
|
||||
# Upgrading to v1.39.0
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ from synapse.app import check_bind_error
|
||||
from synapse.app.phone_stats_home import start_phone_stats_home
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.events.presence_router import load_legacy_presence_router
|
||||
from synapse.events.spamcheck import load_legacy_spam_checkers
|
||||
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
@@ -370,6 +371,7 @@ async def start(hs: "HomeServer"):
|
||||
|
||||
load_legacy_spam_checkers(hs)
|
||||
load_legacy_third_party_event_rules(hs)
|
||||
load_legacy_presence_router(hs)
|
||||
|
||||
# If we've configured an expiry time for caches, start the background job now.
|
||||
setup_expire_lru_cache_entries(hs)
|
||||
|
||||
@@ -248,6 +248,7 @@ class ServerConfig(Config):
|
||||
self.use_presence = config.get("use_presence", True)
|
||||
|
||||
# Custom presence router module
|
||||
# This is the legacy way of configuring it (the config should now be put in the modules section)
|
||||
self.presence_router_module_class = None
|
||||
self.presence_router_config = None
|
||||
presence_router_config = presence_config.get("presence_router")
|
||||
@@ -858,20 +859,6 @@ class ServerConfig(Config):
|
||||
#
|
||||
#enabled: false
|
||||
|
||||
# Presence routers are third-party modules that can specify additional logic
|
||||
# to where presence updates from users are routed.
|
||||
#
|
||||
presence_router:
|
||||
# The custom module's class. Uncomment to use a custom presence router module.
|
||||
#
|
||||
#module: "my_custom_router.PresenceRouter"
|
||||
|
||||
# Configuration options of the custom module. Refer to your module's
|
||||
# documentation for available options.
|
||||
#
|
||||
#config:
|
||||
# example_option: 'something'
|
||||
|
||||
# Whether to require authentication to retrieve profile data (avatars,
|
||||
# display names) of other users through the client API. Defaults to
|
||||
# 'false'. Note that profile data is also available via the federation
|
||||
|
||||
@@ -11,45 +11,115 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Union,
|
||||
)
|
||||
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
GET_USERS_FOR_STATES_CALLBACK = Callable[
|
||||
[Iterable[UserPresenceState]], Awaitable[Dict[str, Set[UserPresenceState]]]
|
||||
]
|
||||
GET_INTERESTED_USERS_CALLBACK = Callable[
|
||||
[str], Awaitable[Union[Set[str], "PresenceRouter.ALL_USERS"]]
|
||||
]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_legacy_presence_router(hs: "HomeServer"):
|
||||
"""Wrapper that loads a presence router module configured using the old
|
||||
configuration, and registers the hooks they implement.
|
||||
"""
|
||||
|
||||
if hs.config.presence_router_module_class is None:
|
||||
return
|
||||
|
||||
module = hs.config.presence_router_module_class
|
||||
config = hs.config.presence_router_config
|
||||
api = hs.get_module_api()
|
||||
|
||||
presence_router = module(config=config, module_api=api)
|
||||
|
||||
# The known hooks. If a module implements a method which name appears in this set,
|
||||
# we'll want to register it.
|
||||
presence_router_methods = {
|
||||
"get_users_for_states",
|
||||
"get_interested_users",
|
||||
}
|
||||
|
||||
# All methods that the module provides should be async, but this wasn't enforced
|
||||
# in the old module system, so we wrap them if needed
|
||||
def async_wrapper(f: Optional[Callable]) -> Optional[Callable[..., Awaitable]]:
|
||||
# f might be None if the callback isn't implemented by the module. In this
|
||||
# case we don't want to register a callback at all so we return None.
|
||||
if f is None:
|
||||
return None
|
||||
|
||||
def run(*args, **kwargs):
|
||||
# mypy doesn't do well across function boundaries so we need to tell it
|
||||
# f is definitely not None.
|
||||
assert f is not None
|
||||
|
||||
return maybe_awaitable(f(*args, **kwargs))
|
||||
|
||||
return run
|
||||
|
||||
# Register the hooks through the module API.
|
||||
hooks = {
|
||||
hook: async_wrapper(getattr(presence_router, hook, None))
|
||||
for hook in presence_router_methods
|
||||
}
|
||||
|
||||
api.register_presence_router_callbacks(**hooks)
|
||||
|
||||
|
||||
class PresenceRouter:
|
||||
"""
|
||||
A module that the homeserver will call upon to help route user presence updates to
|
||||
additional destinations. If a custom presence router is configured, calls will be
|
||||
passed to that instead.
|
||||
additional destinations.
|
||||
"""
|
||||
|
||||
ALL_USERS = "ALL"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.custom_presence_router = None
|
||||
# Initially there are no callbacks
|
||||
self._get_users_for_states_callbacks: List[GET_USERS_FOR_STATES_CALLBACK] = []
|
||||
self._get_interested_users_callbacks: List[GET_INTERESTED_USERS_CALLBACK] = []
|
||||
|
||||
# Check whether a custom presence router module has been configured
|
||||
if hs.config.presence_router_module_class:
|
||||
# Initialise the module
|
||||
self.custom_presence_router = hs.config.presence_router_module_class(
|
||||
config=hs.config.presence_router_config, module_api=hs.get_module_api()
|
||||
def register_presence_router_callbacks(
|
||||
self,
|
||||
get_users_for_states: Optional[GET_USERS_FOR_STATES_CALLBACK] = None,
|
||||
get_interested_users: Optional[GET_INTERESTED_USERS_CALLBACK] = None,
|
||||
):
|
||||
# PresenceRouter modules are required to implement both of these methods
|
||||
# or neither of them as they are assumed to act in a complementary manner
|
||||
paired_methods = [get_users_for_states, get_interested_users]
|
||||
if paired_methods.count(None) == 1:
|
||||
raise RuntimeError(
|
||||
"PresenceRouter modules must register neither or both of the paired callbacks: "
|
||||
"[get_users_for_states, get_interested_users]"
|
||||
)
|
||||
|
||||
# Ensure the module has implemented the required methods
|
||||
required_methods = ["get_users_for_states", "get_interested_users"]
|
||||
for method_name in required_methods:
|
||||
if not hasattr(self.custom_presence_router, method_name):
|
||||
raise Exception(
|
||||
"PresenceRouter module '%s' must implement all required methods: %s"
|
||||
% (
|
||||
hs.config.presence_router_module_class.__name__,
|
||||
", ".join(required_methods),
|
||||
)
|
||||
)
|
||||
# Append the methods provided to the lists of callbacks
|
||||
if get_users_for_states is not None:
|
||||
self._get_users_for_states_callbacks.append(get_users_for_states)
|
||||
|
||||
if get_interested_users is not None:
|
||||
self._get_interested_users_callbacks.append(get_interested_users)
|
||||
|
||||
async def get_users_for_states(
|
||||
self,
|
||||
@@ -66,14 +136,40 @@ class PresenceRouter:
|
||||
A dictionary of user_id -> set of UserPresenceState, indicating which
|
||||
presence updates each user should receive.
|
||||
"""
|
||||
if self.custom_presence_router is not None:
|
||||
# Ask the custom module
|
||||
return await self.custom_presence_router.get_users_for_states(
|
||||
state_updates=state_updates
|
||||
)
|
||||
|
||||
# Don't include any extra destinations for presence updates
|
||||
return {}
|
||||
# Bail out early if we don't have any callbacks to run.
|
||||
if len(self._get_users_for_states_callbacks) == 0:
|
||||
# Don't include any extra destinations for presence updates
|
||||
return {}
|
||||
|
||||
users_for_states = {}
|
||||
# run all the callbacks for get_users_for_states and combine the results
|
||||
for callback in self._get_users_for_states_callbacks:
|
||||
try:
|
||||
result = await callback(state_updates)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to run module API callback %s: %s", callback, e)
|
||||
continue
|
||||
|
||||
if not isinstance(result, Dict):
|
||||
logger.warning(
|
||||
"Wrong type returned by module API callback %s: %s, expected Dict",
|
||||
callback,
|
||||
result,
|
||||
)
|
||||
continue
|
||||
|
||||
for key, new_entries in result.items():
|
||||
if not isinstance(new_entries, Set):
|
||||
logger.warning(
|
||||
"Wrong type returned by module API callback %s: %s, expected Set",
|
||||
callback,
|
||||
new_entries,
|
||||
)
|
||||
break
|
||||
users_for_states.setdefault(key, set()).update(new_entries)
|
||||
|
||||
return users_for_states
|
||||
|
||||
async def get_interested_users(self, user_id: str) -> Union[Set[str], ALL_USERS]:
|
||||
"""
|
||||
@@ -92,12 +188,36 @@ class PresenceRouter:
|
||||
A set of user IDs to return presence updates for, or ALL_USERS to return all
|
||||
known updates.
|
||||
"""
|
||||
if self.custom_presence_router is not None:
|
||||
# Ask the custom module for interested users
|
||||
return await self.custom_presence_router.get_interested_users(
|
||||
user_id=user_id
|
||||
)
|
||||
|
||||
# A custom presence router is not defined.
|
||||
# Don't report any additional interested users
|
||||
return set()
|
||||
# Bail out early if we don't have any callbacks to run.
|
||||
if len(self._get_interested_users_callbacks) == 0:
|
||||
# Don't report any additional interested users
|
||||
return set()
|
||||
|
||||
interested_users = set()
|
||||
# run all the callbacks for get_interested_users and combine the results
|
||||
for callback in self._get_interested_users_callbacks:
|
||||
try:
|
||||
result = await callback(user_id)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to run module API callback %s: %s", callback, e)
|
||||
continue
|
||||
|
||||
# If one of the callbacks returns ALL_USERS then we can stop calling all
|
||||
# of the other callbacks, since the set of interested_users is already as
|
||||
# large as it can possibly be
|
||||
if result == PresenceRouter.ALL_USERS:
|
||||
return PresenceRouter.ALL_USERS
|
||||
|
||||
if not isinstance(result, Set):
|
||||
logger.warning(
|
||||
"Wrong type returned by module API callback %s: %s, expected set",
|
||||
callback,
|
||||
result,
|
||||
)
|
||||
continue
|
||||
|
||||
# Add the new interested users to the set
|
||||
interested_users.update(result)
|
||||
|
||||
return interested_users
|
||||
|
||||
@@ -627,23 +627,28 @@ class AuthHandler(BaseHandler):
|
||||
|
||||
async def add_oob_auth(
|
||||
self, stagetype: str, authdict: Dict[str, Any], clientip: str
|
||||
) -> bool:
|
||||
) -> None:
|
||||
"""
|
||||
Adds the result of out-of-band authentication into an existing auth
|
||||
session. Currently used for adding the result of fallback auth.
|
||||
|
||||
Raises:
|
||||
LoginError if the stagetype is unknown or the session is missing.
|
||||
LoginError is raised by check_auth if authentication fails.
|
||||
"""
|
||||
if stagetype not in self.checkers:
|
||||
raise LoginError(400, "", Codes.MISSING_PARAM)
|
||||
if "session" not in authdict:
|
||||
raise LoginError(400, "", Codes.MISSING_PARAM)
|
||||
|
||||
result = await self.checkers[stagetype].check_auth(authdict, clientip)
|
||||
if result:
|
||||
await self.store.mark_ui_auth_stage_complete(
|
||||
authdict["session"], stagetype, result
|
||||
raise LoginError(
|
||||
400, f"Unknown UIA stage type: {stagetype}", Codes.INVALID_PARAM
|
||||
)
|
||||
return True
|
||||
return False
|
||||
if "session" not in authdict:
|
||||
raise LoginError(400, "Missing session ID", Codes.MISSING_PARAM)
|
||||
|
||||
# If authentication fails a LoginError is raised. Otherwise, store
|
||||
# the successful result.
|
||||
result = await self.checkers[stagetype].check_auth(authdict, clientip)
|
||||
await self.store.mark_ui_auth_stage_complete(
|
||||
authdict["session"], stagetype, result
|
||||
)
|
||||
|
||||
def get_session_id(self, clientdict: Dict[str, Any]) -> Optional[str]:
|
||||
"""
|
||||
|
||||
@@ -285,175 +285,172 @@ class FederationHandler(BaseHandler):
|
||||
# - Fetching any missing prev events to fill in gaps in the graph
|
||||
# - Fetching state if we have a hole in the graph
|
||||
if not pdu.internal_metadata.is_outlier():
|
||||
# We only backfill backwards to the min depth.
|
||||
min_depth = await self.get_min_depth_for_context(pdu.room_id)
|
||||
|
||||
logger.debug("min_depth: %d", min_depth)
|
||||
|
||||
prevs = set(pdu.prev_event_ids())
|
||||
seen = await self.store.have_events_in_timeline(prevs)
|
||||
missing_prevs = prevs - seen
|
||||
|
||||
if min_depth is not None and pdu.depth < min_depth:
|
||||
# This is so that we don't notify the user about this
|
||||
# message, to work around the fact that some events will
|
||||
# reference really really old events we really don't want to
|
||||
# send to the clients.
|
||||
pdu.internal_metadata.outlier = True
|
||||
elif min_depth is not None and pdu.depth > min_depth:
|
||||
missing_prevs = prevs - seen
|
||||
if sent_to_us_directly and missing_prevs:
|
||||
# If we're missing stuff, ensure we only fetch stuff one
|
||||
# at a time.
|
||||
logger.info(
|
||||
"Acquiring room lock to fetch %d missing prev_events: %s",
|
||||
len(missing_prevs),
|
||||
shortstr(missing_prevs),
|
||||
)
|
||||
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
|
||||
if missing_prevs:
|
||||
if sent_to_us_directly:
|
||||
# We only backfill backwards to the min depth.
|
||||
min_depth = await self.get_min_depth_for_context(pdu.room_id)
|
||||
logger.debug("min_depth: %d", min_depth)
|
||||
|
||||
if min_depth is not None and pdu.depth > min_depth:
|
||||
# If we're missing stuff, ensure we only fetch stuff one
|
||||
# at a time.
|
||||
logger.info(
|
||||
"Acquired room lock to fetch %d missing prev_events",
|
||||
"Acquiring room lock to fetch %d missing prev_events: %s",
|
||||
len(missing_prevs),
|
||||
shortstr(missing_prevs),
|
||||
)
|
||||
|
||||
try:
|
||||
await self._get_missing_events_for_pdu(
|
||||
origin, pdu, prevs, min_depth
|
||||
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
|
||||
logger.info(
|
||||
"Acquired room lock to fetch %d missing prev_events",
|
||||
len(missing_prevs),
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
"Error fetching missing prev_events for %s: %s"
|
||||
% (event_id, e)
|
||||
) from e
|
||||
|
||||
try:
|
||||
await self._get_missing_events_for_pdu(
|
||||
origin, pdu, prevs, min_depth
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
"Error fetching missing prev_events for %s: %s"
|
||||
% (event_id, e)
|
||||
) from e
|
||||
|
||||
# Update the set of things we've seen after trying to
|
||||
# fetch the missing stuff
|
||||
seen = await self.store.have_events_in_timeline(prevs)
|
||||
missing_prevs = prevs - seen
|
||||
|
||||
if not prevs - seen:
|
||||
logger.info(
|
||||
"Found all missing prev_events",
|
||||
)
|
||||
if not missing_prevs:
|
||||
logger.info("Found all missing prev_events")
|
||||
|
||||
missing_prevs = prevs - seen
|
||||
if missing_prevs:
|
||||
# We've still not been able to get all of the prev_events for this event.
|
||||
#
|
||||
# In this case, we need to fall back to asking another server in the
|
||||
# federation for the state at this event. That's ok provided we then
|
||||
# resolve the state against other bits of the DAG before using it (which
|
||||
# will ensure that you can't just take over a room by sending an event,
|
||||
# withholding its prev_events, and declaring yourself to be an admin in
|
||||
# the subsequent state request).
|
||||
#
|
||||
# Now, if we're pulling this event as a missing prev_event, then clearly
|
||||
# this event is not going to become the only forward-extremity and we are
|
||||
# guaranteed to resolve its state against our existing forward
|
||||
# extremities, so that should be fine.
|
||||
#
|
||||
# On the other hand, if this event was pushed to us, it is possible for
|
||||
# it to become the only forward-extremity in the room, and we would then
|
||||
# trust its state to be the state for the whole room. This is very bad.
|
||||
# Further, if the event was pushed to us, there is no excuse for us not to
|
||||
# have all the prev_events. We therefore reject any such events.
|
||||
#
|
||||
# XXX this really feels like it could/should be merged with the above,
|
||||
# but there is an interaction with min_depth that I'm not really
|
||||
# following.
|
||||
if missing_prevs:
|
||||
# since this event was pushed to us, it is possible for it to
|
||||
# become the only forward-extremity in the room, and we would then
|
||||
# trust its state to be the state for the whole room. This is very
|
||||
# bad. Further, if the event was pushed to us, there is no excuse
|
||||
# for us not to have all the prev_events. (XXX: apart from
|
||||
# min_depth?)
|
||||
#
|
||||
# We therefore reject any such events.
|
||||
logger.warning(
|
||||
"Rejecting: failed to fetch %d prev events: %s",
|
||||
len(missing_prevs),
|
||||
shortstr(missing_prevs),
|
||||
)
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
403,
|
||||
(
|
||||
"Your server isn't divulging details about prev_events "
|
||||
"referenced in this event."
|
||||
),
|
||||
affected=pdu.event_id,
|
||||
)
|
||||
|
||||
if sent_to_us_directly:
|
||||
logger.warning(
|
||||
"Rejecting: failed to fetch %d prev events: %s",
|
||||
len(missing_prevs),
|
||||
else:
|
||||
# We don't have all of the prev_events for this event.
|
||||
#
|
||||
# In this case, we need to fall back to asking another server in the
|
||||
# federation for the state at this event. That's ok provided we then
|
||||
# resolve the state against other bits of the DAG before using it (which
|
||||
# will ensure that you can't just take over a room by sending an event,
|
||||
# withholding its prev_events, and declaring yourself to be an admin in
|
||||
# the subsequent state request).
|
||||
#
|
||||
# Since we're pulling this event as a missing prev_event, then clearly
|
||||
# this event is not going to become the only forward-extremity and we are
|
||||
# guaranteed to resolve its state against our existing forward
|
||||
# extremities, so that should be fine.
|
||||
#
|
||||
# XXX this really feels like it could/should be merged with the above,
|
||||
# but there is an interaction with min_depth that I'm not really
|
||||
# following.
|
||||
logger.info(
|
||||
"Event %s is missing prev_events %s: calculating state for a "
|
||||
"backwards extremity",
|
||||
event_id,
|
||||
shortstr(missing_prevs),
|
||||
)
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
403,
|
||||
(
|
||||
"Your server isn't divulging details about prev_events "
|
||||
"referenced in this event."
|
||||
),
|
||||
affected=pdu.event_id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Event %s is missing prev_events %s: calculating state for a "
|
||||
"backwards extremity",
|
||||
event_id,
|
||||
shortstr(missing_prevs),
|
||||
)
|
||||
# Calculate the state after each of the previous events, and
|
||||
# resolve them to find the correct state at the current event.
|
||||
event_map = {event_id: pdu}
|
||||
try:
|
||||
# Get the state of the events we know about
|
||||
ours = await self.state_store.get_state_groups_ids(
|
||||
room_id, seen
|
||||
)
|
||||
|
||||
# Calculate the state after each of the previous events, and
|
||||
# resolve them to find the correct state at the current event.
|
||||
event_map = {event_id: pdu}
|
||||
try:
|
||||
# Get the state of the events we know about
|
||||
ours = await self.state_store.get_state_groups_ids(room_id, seen)
|
||||
# state_maps is a list of mappings from (type, state_key) to event_id
|
||||
state_maps: List[StateMap[str]] = list(ours.values())
|
||||
|
||||
# state_maps is a list of mappings from (type, state_key) to event_id
|
||||
state_maps: List[StateMap[str]] = list(ours.values())
|
||||
# we don't need this any more, let's delete it.
|
||||
del ours
|
||||
|
||||
# we don't need this any more, let's delete it.
|
||||
del ours
|
||||
|
||||
# Ask the remote server for the states we don't
|
||||
# know about
|
||||
for p in missing_prevs:
|
||||
logger.info("Requesting state after missing prev_event %s", p)
|
||||
|
||||
with nested_logging_context(p):
|
||||
# note that if any of the missing prevs share missing state or
|
||||
# auth events, the requests to fetch those events are deduped
|
||||
# by the get_pdu_cache in federation_client.
|
||||
remote_state = (
|
||||
await self._get_state_after_missing_prev_event(
|
||||
origin, room_id, p
|
||||
)
|
||||
# Ask the remote server for the states we don't
|
||||
# know about
|
||||
for p in missing_prevs:
|
||||
logger.info(
|
||||
"Requesting state after missing prev_event %s", p
|
||||
)
|
||||
|
||||
remote_state_map = {
|
||||
(x.type, x.state_key): x.event_id for x in remote_state
|
||||
}
|
||||
state_maps.append(remote_state_map)
|
||||
with nested_logging_context(p):
|
||||
# note that if any of the missing prevs share missing state or
|
||||
# auth events, the requests to fetch those events are deduped
|
||||
# by the get_pdu_cache in federation_client.
|
||||
remote_state = (
|
||||
await self._get_state_after_missing_prev_event(
|
||||
origin, room_id, p
|
||||
)
|
||||
)
|
||||
|
||||
for x in remote_state:
|
||||
event_map[x.event_id] = x
|
||||
remote_state_map = {
|
||||
(x.type, x.state_key): x.event_id
|
||||
for x in remote_state
|
||||
}
|
||||
state_maps.append(remote_state_map)
|
||||
|
||||
room_version = await self.store.get_room_version_id(room_id)
|
||||
state_map = (
|
||||
await self._state_resolution_handler.resolve_events_with_store(
|
||||
for x in remote_state:
|
||||
event_map[x.event_id] = x
|
||||
|
||||
room_version = await self.store.get_room_version_id(room_id)
|
||||
state_map = await self._state_resolution_handler.resolve_events_with_store(
|
||||
room_id,
|
||||
room_version,
|
||||
state_maps,
|
||||
event_map,
|
||||
state_res_store=StateResolutionStore(self.store),
|
||||
)
|
||||
)
|
||||
|
||||
# We need to give _process_received_pdu the actual state events
|
||||
# rather than event ids, so generate that now.
|
||||
# We need to give _process_received_pdu the actual state events
|
||||
# rather than event ids, so generate that now.
|
||||
|
||||
# First though we need to fetch all the events that are in
|
||||
# state_map, so we can build up the state below.
|
||||
evs = await self.store.get_events(
|
||||
list(state_map.values()),
|
||||
get_prev_content=False,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
)
|
||||
event_map.update(evs)
|
||||
# First though we need to fetch all the events that are in
|
||||
# state_map, so we can build up the state below.
|
||||
evs = await self.store.get_events(
|
||||
list(state_map.values()),
|
||||
get_prev_content=False,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
)
|
||||
event_map.update(evs)
|
||||
|
||||
state = [event_map[e] for e in state_map.values()]
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Error attempting to resolve state at missing " "prev_events",
|
||||
exc_info=True,
|
||||
)
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
403,
|
||||
"We can't get valid state history.",
|
||||
affected=event_id,
|
||||
)
|
||||
state = [event_map[e] for e in state_map.values()]
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Error attempting to resolve state at missing "
|
||||
"prev_events",
|
||||
exc_info=True,
|
||||
)
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
403,
|
||||
"We can't get valid state history.",
|
||||
affected=event_id,
|
||||
)
|
||||
|
||||
# A second round of checks for all events. Check that the event passes auth
|
||||
# based on `auth_events`, this allows us to assert that the event would
|
||||
@@ -2375,6 +2372,7 @@ class FederationHandler(BaseHandler):
|
||||
not event.internal_metadata.is_outlier()
|
||||
and not backfilled
|
||||
and not context.rejected
|
||||
and (await self.store.get_min_depth(event.room_id)) <= event.depth
|
||||
):
|
||||
await self.action_generator.handle_push_actions_for_event(
|
||||
event, context
|
||||
|
||||
@@ -151,7 +151,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
limit = 10
|
||||
|
||||
async def handle_room(event: RoomsForUser):
|
||||
d = {
|
||||
d: JsonDict = {
|
||||
"room_id": event.room_id,
|
||||
"membership": event.membership,
|
||||
"visibility": (
|
||||
|
||||
@@ -86,20 +86,20 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
|
||||
SyncRequestKey = Tuple[Any, ...]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SyncConfig:
|
||||
user = attr.ib(type=UserID)
|
||||
filter_collection = attr.ib(type=FilterCollection)
|
||||
is_guest = attr.ib(type=bool)
|
||||
request_key = attr.ib(type=SyncRequestKey)
|
||||
device_id = attr.ib(type=Optional[str])
|
||||
user: UserID
|
||||
filter_collection: FilterCollection
|
||||
is_guest: bool
|
||||
request_key: SyncRequestKey
|
||||
device_id: Optional[str]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class TimelineBatch:
|
||||
prev_batch = attr.ib(type=StreamToken)
|
||||
events = attr.ib(type=List[EventBase])
|
||||
limited = attr.ib(type=bool)
|
||||
prev_batch: StreamToken
|
||||
events: List[EventBase]
|
||||
limited: bool
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
@@ -113,16 +113,16 @@ class TimelineBatch:
|
||||
# if there are updates for it, which we check after the instance has been created.
|
||||
# This should not be a big deal because we update the notification counts afterwards as
|
||||
# well anyway.
|
||||
@attr.s(slots=True)
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class JoinedSyncResult:
|
||||
room_id = attr.ib(type=str)
|
||||
timeline = attr.ib(type=TimelineBatch)
|
||||
state = attr.ib(type=StateMap[EventBase])
|
||||
ephemeral = attr.ib(type=List[JsonDict])
|
||||
account_data = attr.ib(type=List[JsonDict])
|
||||
unread_notifications = attr.ib(type=JsonDict)
|
||||
summary = attr.ib(type=Optional[JsonDict])
|
||||
unread_count = attr.ib(type=int)
|
||||
room_id: str
|
||||
timeline: TimelineBatch
|
||||
state: StateMap[EventBase]
|
||||
ephemeral: List[JsonDict]
|
||||
account_data: List[JsonDict]
|
||||
unread_notifications: JsonDict
|
||||
summary: Optional[JsonDict]
|
||||
unread_count: int
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
@@ -138,12 +138,12 @@ class JoinedSyncResult:
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class ArchivedSyncResult:
|
||||
room_id = attr.ib(type=str)
|
||||
timeline = attr.ib(type=TimelineBatch)
|
||||
state = attr.ib(type=StateMap[EventBase])
|
||||
account_data = attr.ib(type=List[JsonDict])
|
||||
room_id: str
|
||||
timeline: TimelineBatch
|
||||
state: StateMap[EventBase]
|
||||
account_data: List[JsonDict]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
@@ -152,37 +152,37 @@ class ArchivedSyncResult:
|
||||
return bool(self.timeline or self.state or self.account_data)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class InvitedSyncResult:
|
||||
room_id = attr.ib(type=str)
|
||||
invite = attr.ib(type=EventBase)
|
||||
room_id: str
|
||||
invite: EventBase
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Invited rooms should always be reported to the client"""
|
||||
return True
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class KnockedSyncResult:
|
||||
room_id = attr.ib(type=str)
|
||||
knock = attr.ib(type=EventBase)
|
||||
room_id: str
|
||||
knock: EventBase
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Knocked rooms should always be reported to the client"""
|
||||
return True
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class GroupsSyncResult:
|
||||
join = attr.ib(type=JsonDict)
|
||||
invite = attr.ib(type=JsonDict)
|
||||
leave = attr.ib(type=JsonDict)
|
||||
join: JsonDict
|
||||
invite: JsonDict
|
||||
leave: JsonDict
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.join or self.invite or self.leave)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class DeviceLists:
|
||||
"""
|
||||
Attributes:
|
||||
@@ -190,27 +190,27 @@ class DeviceLists:
|
||||
left: List of user_ids whose devices we no longer track
|
||||
"""
|
||||
|
||||
changed = attr.ib(type=Collection[str])
|
||||
left = attr.ib(type=Collection[str])
|
||||
changed: Collection[str]
|
||||
left: Collection[str]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.changed or self.left)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _RoomChanges:
|
||||
"""The set of room entries to include in the sync, plus the set of joined
|
||||
and left room IDs since last sync.
|
||||
"""
|
||||
|
||||
room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
|
||||
invited = attr.ib(type=List[InvitedSyncResult])
|
||||
knocked = attr.ib(type=List[KnockedSyncResult])
|
||||
newly_joined_rooms = attr.ib(type=List[str])
|
||||
newly_left_rooms = attr.ib(type=List[str])
|
||||
room_entries: List["RoomSyncResultBuilder"]
|
||||
invited: List[InvitedSyncResult]
|
||||
knocked: List[KnockedSyncResult]
|
||||
newly_joined_rooms: List[str]
|
||||
newly_left_rooms: List[str]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class SyncResult:
|
||||
"""
|
||||
Attributes:
|
||||
@@ -230,18 +230,18 @@ class SyncResult:
|
||||
groups: Group updates, if any
|
||||
"""
|
||||
|
||||
next_batch = attr.ib(type=StreamToken)
|
||||
presence = attr.ib(type=List[JsonDict])
|
||||
account_data = attr.ib(type=List[JsonDict])
|
||||
joined = attr.ib(type=List[JoinedSyncResult])
|
||||
invited = attr.ib(type=List[InvitedSyncResult])
|
||||
knocked = attr.ib(type=List[KnockedSyncResult])
|
||||
archived = attr.ib(type=List[ArchivedSyncResult])
|
||||
to_device = attr.ib(type=List[JsonDict])
|
||||
device_lists = attr.ib(type=DeviceLists)
|
||||
device_one_time_keys_count = attr.ib(type=JsonDict)
|
||||
device_unused_fallback_key_types = attr.ib(type=List[str])
|
||||
groups = attr.ib(type=Optional[GroupsSyncResult])
|
||||
next_batch: StreamToken
|
||||
presence: List[JsonDict]
|
||||
account_data: List[JsonDict]
|
||||
joined: List[JoinedSyncResult]
|
||||
invited: List[InvitedSyncResult]
|
||||
knocked: List[KnockedSyncResult]
|
||||
archived: List[ArchivedSyncResult]
|
||||
to_device: List[JsonDict]
|
||||
device_lists: DeviceLists
|
||||
device_one_time_keys_count: JsonDict
|
||||
device_unused_fallback_key_types: List[str]
|
||||
groups: Optional[GroupsSyncResult]
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Make the result appear empty if there are no updates. This is used
|
||||
@@ -701,7 +701,7 @@ class SyncHandler:
|
||||
name_id = state_ids.get((EventTypes.Name, ""))
|
||||
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
|
||||
|
||||
summary = {}
|
||||
summary: JsonDict = {}
|
||||
empty_ms = MemberSummary([], 0)
|
||||
|
||||
# TODO: only send these when they change.
|
||||
@@ -2076,21 +2076,23 @@ class SyncHandler:
|
||||
# If the membership's stream ordering is after the given stream
|
||||
# ordering, we need to go and work out if the user was in the room
|
||||
# before.
|
||||
for room_id, event_pos in joined_rooms:
|
||||
if not event_pos.persisted_after(room_key):
|
||||
joined_room_ids.add(room_id)
|
||||
for joined_room in joined_rooms:
|
||||
if not joined_room.event_pos.persisted_after(room_key):
|
||||
joined_room_ids.add(joined_room.room_id)
|
||||
continue
|
||||
|
||||
logger.info("User joined room after current token: %s", room_id)
|
||||
logger.info("User joined room after current token: %s", joined_room.room_id)
|
||||
|
||||
extrems = (
|
||||
await self.store.get_forward_extremities_for_room_at_stream_ordering(
|
||||
room_id, event_pos.stream
|
||||
joined_room.room_id, joined_room.event_pos.stream
|
||||
)
|
||||
)
|
||||
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
|
||||
users_in_room = await self.state.get_current_users_in_room(
|
||||
joined_room.room_id, extrems
|
||||
)
|
||||
if user_id in users_in_room:
|
||||
joined_room_ids.add(room_id)
|
||||
joined_room_ids.add(joined_room.room_id)
|
||||
|
||||
return frozenset(joined_room_ids)
|
||||
|
||||
@@ -2160,7 +2162,7 @@ def _calculate_state(
|
||||
return {event_id_to_key[e]: e for e in state_ids}
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class SyncResultBuilder:
|
||||
"""Used to help build up a new SyncResult for a user
|
||||
|
||||
@@ -2182,23 +2184,23 @@ class SyncResultBuilder:
|
||||
to_device (list)
|
||||
"""
|
||||
|
||||
sync_config = attr.ib(type=SyncConfig)
|
||||
full_state = attr.ib(type=bool)
|
||||
since_token = attr.ib(type=Optional[StreamToken])
|
||||
now_token = attr.ib(type=StreamToken)
|
||||
joined_room_ids = attr.ib(type=FrozenSet[str])
|
||||
sync_config: SyncConfig
|
||||
full_state: bool
|
||||
since_token: Optional[StreamToken]
|
||||
now_token: StreamToken
|
||||
joined_room_ids: FrozenSet[str]
|
||||
|
||||
presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
|
||||
account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
|
||||
joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
|
||||
invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
|
||||
knocked = attr.ib(type=List[KnockedSyncResult], default=attr.Factory(list))
|
||||
archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
|
||||
groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
|
||||
to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
|
||||
presence: List[JsonDict] = attr.Factory(list)
|
||||
account_data: List[JsonDict] = attr.Factory(list)
|
||||
joined: List[JoinedSyncResult] = attr.Factory(list)
|
||||
invited: List[InvitedSyncResult] = attr.Factory(list)
|
||||
knocked: List[KnockedSyncResult] = attr.Factory(list)
|
||||
archived: List[ArchivedSyncResult] = attr.Factory(list)
|
||||
groups: Optional[GroupsSyncResult] = None
|
||||
to_device: List[JsonDict] = attr.Factory(list)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class RoomSyncResultBuilder:
|
||||
"""Stores information needed to create either a `JoinedSyncResult` or
|
||||
`ArchivedSyncResult`.
|
||||
@@ -2214,10 +2216,10 @@ class RoomSyncResultBuilder:
|
||||
upto_token: Latest point to return events from.
|
||||
"""
|
||||
|
||||
room_id = attr.ib(type=str)
|
||||
rtype = attr.ib(type=str)
|
||||
events = attr.ib(type=Optional[List[EventBase]])
|
||||
newly_joined = attr.ib(type=bool)
|
||||
full_state = attr.ib(type=bool)
|
||||
since_token = attr.ib(type=Optional[StreamToken])
|
||||
upto_token = attr.ib(type=StreamToken)
|
||||
room_id: str
|
||||
rtype: str
|
||||
events: Optional[List[EventBase]]
|
||||
newly_joined: bool
|
||||
full_state: bool
|
||||
since_token: Optional[StreamToken]
|
||||
upto_token: StreamToken
|
||||
|
||||
@@ -49,7 +49,7 @@ class UserInteractiveAuthChecker:
|
||||
clientip: The IP address of the client.
|
||||
|
||||
Raises:
|
||||
SynapseError if authentication failed
|
||||
LoginError if authentication failed.
|
||||
|
||||
Returns:
|
||||
The result of authentication (to pass back to the client?)
|
||||
@@ -131,7 +131,9 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
|
||||
)
|
||||
if resp_body["success"]:
|
||||
return True
|
||||
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
|
||||
raise LoginError(
|
||||
401, "Captcha authentication failed", errcode=Codes.UNAUTHORIZED
|
||||
)
|
||||
|
||||
|
||||
class _BaseThreepidAuthChecker:
|
||||
@@ -191,7 +193,9 @@ class _BaseThreepidAuthChecker:
|
||||
raise AssertionError("Unrecognized threepid medium: %s" % (medium,))
|
||||
|
||||
if not threepid:
|
||||
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
|
||||
raise LoginError(
|
||||
401, "Unable to get validated threepid", errcode=Codes.UNAUTHORIZED
|
||||
)
|
||||
|
||||
if threepid["medium"] != medium:
|
||||
raise LoginError(
|
||||
|
||||
@@ -32,6 +32,7 @@ from twisted.internet import defer
|
||||
from twisted.web.resource import IResource
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.http.server import (
|
||||
DirectServeHtmlResource,
|
||||
@@ -57,6 +58,8 @@ This package defines the 'stable' API which can be used by extension modules whi
|
||||
are loaded into Synapse.
|
||||
"""
|
||||
|
||||
PRESENCE_ALL_USERS = PresenceRouter.ALL_USERS
|
||||
|
||||
__all__ = [
|
||||
"errors",
|
||||
"make_deferred_yieldable",
|
||||
@@ -70,6 +73,7 @@ __all__ = [
|
||||
"DirectServeHtmlResource",
|
||||
"DirectServeJsonResource",
|
||||
"ModuleApi",
|
||||
"PRESENCE_ALL_USERS",
|
||||
]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -111,6 +115,7 @@ class ModuleApi:
|
||||
self._spam_checker = hs.get_spam_checker()
|
||||
self._account_validity_handler = hs.get_account_validity_handler()
|
||||
self._third_party_event_rules = hs.get_third_party_event_rules()
|
||||
self._presence_router = hs.get_presence_router()
|
||||
|
||||
#################################################################################
|
||||
# The following methods should only be called during the module's initialisation.
|
||||
@@ -130,6 +135,11 @@ class ModuleApi:
|
||||
"""Registers callbacks for third party event rules capabilities."""
|
||||
return self._third_party_event_rules.register_third_party_rules_callbacks
|
||||
|
||||
@property
|
||||
def register_presence_router_callbacks(self):
|
||||
"""Registers callbacks for presence router capabilities."""
|
||||
return self._presence_router.register_presence_router_callbacks
|
||||
|
||||
def register_web_resource(self, path: str, resource: IResource):
|
||||
"""Registers a web resource to be served at the given path.
|
||||
|
||||
|
||||
@@ -16,6 +16,9 @@ function captchaDone() {
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="{{ myurl }}">
|
||||
<div>
|
||||
{% if error is defined %}
|
||||
<p class="error"><strong>Error: {{ error }}</strong></p>
|
||||
{% endif %}
|
||||
<p>
|
||||
Hello! We need to prevent computer programs and other automated
|
||||
things from creating accounts on this server.
|
||||
|
||||
@@ -8,6 +8,9 @@
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="{{ myurl }}">
|
||||
<div>
|
||||
{% if error is defined %}
|
||||
<p class="error"><strong>Error: {{ error }}</strong></p>
|
||||
{% endif %}
|
||||
<p>
|
||||
Please click the button below if you agree to the
|
||||
<a href="{{ terms_url }}">privacy policy of this homeserver.</a>
|
||||
|
||||
@@ -36,7 +36,6 @@ from synapse.rest.admin.event_reports import (
|
||||
)
|
||||
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
|
||||
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
|
||||
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
|
||||
from synapse.rest.admin.rooms import (
|
||||
DeleteRoomRestServlet,
|
||||
ForwardExtremitiesRestServlet,
|
||||
@@ -47,7 +46,6 @@ from synapse.rest.admin.rooms import (
|
||||
RoomMembersRestServlet,
|
||||
RoomRestServlet,
|
||||
RoomStateRestServlet,
|
||||
ShutdownRoomRestServlet,
|
||||
)
|
||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
|
||||
@@ -221,7 +219,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
RoomMembersRestServlet(hs).register(http_server)
|
||||
DeleteRoomRestServlet(hs).register(http_server)
|
||||
JoinRoomAliasServlet(hs).register(http_server)
|
||||
PurgeRoomServlet(hs).register(http_server)
|
||||
SendServerNoticeServlet(hs).register(http_server)
|
||||
VersionServlet(hs).register(http_server)
|
||||
UserAdminServlet(hs).register(http_server)
|
||||
@@ -255,7 +252,6 @@ def register_servlets_for_client_rest_resource(
|
||||
PurgeHistoryRestServlet(hs).register(http_server)
|
||||
ResetPasswordRestServlet(hs).register(http_server)
|
||||
SearchUsersRestServlet(hs).register(http_server)
|
||||
ShutdownRoomRestServlet(hs).register(http_server)
|
||||
UserRegisterServlet(hs).register(http_server)
|
||||
DeleteGroupAdminRestServlet(hs).register(http_server)
|
||||
AccountValidityRenewServlet(hs).register(http_server)
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
assert_params_in_dict,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.admin import assert_requester_is_admin
|
||||
from synapse.rest.admin._base import admin_patterns
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class PurgeRoomServlet(RestServlet):
|
||||
"""Servlet which will remove all trace of a room from the database
|
||||
|
||||
POST /_synapse/admin/v1/purge_room
|
||||
{
|
||||
"room_id": "!room:id"
|
||||
}
|
||||
|
||||
returns:
|
||||
|
||||
{}
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/purge_room$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
assert_params_in_dict(body, ("room_id",))
|
||||
|
||||
await self.pagination_handler.purge_room(body["room_id"])
|
||||
|
||||
return 200, {}
|
||||
@@ -46,41 +46,6 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ShutdownRoomRestServlet(RestServlet):
|
||||
"""Shuts down a room by removing all local users from the room and blocking
|
||||
all future invites and joins to the room. Any local aliases will be repointed
|
||||
to a new room created by `new_room_user_id` and kicked users will be auto
|
||||
joined to the new room.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/shutdown_room/(?P<room_id>[^/]+)")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.room_shutdown_handler = hs.get_room_shutdown_handler()
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
assert_params_in_dict(content, ["new_room_user_id"])
|
||||
|
||||
ret = await self.room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content["new_room_user_id"],
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=True,
|
||||
)
|
||||
|
||||
return (200, ret)
|
||||
|
||||
|
||||
class DeleteRoomRestServlet(RestServlet):
|
||||
"""Delete a room from server.
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.errors import LoginError, SynapseError
|
||||
from synapse.api.urls import CLIENT_API_PREFIX
|
||||
from synapse.http.server import respond_with_html
|
||||
from synapse.http.servlet import RestServlet, parse_string
|
||||
@@ -95,29 +95,32 @@ class AuthRestServlet(RestServlet):
|
||||
|
||||
authdict = {"response": response, "session": session}
|
||||
|
||||
success = await self.auth_handler.add_oob_auth(
|
||||
LoginType.RECAPTCHA, authdict, request.getClientIP()
|
||||
)
|
||||
|
||||
if success:
|
||||
html = self.success_template.render()
|
||||
else:
|
||||
try:
|
||||
await self.auth_handler.add_oob_auth(
|
||||
LoginType.RECAPTCHA, authdict, request.getClientIP()
|
||||
)
|
||||
except LoginError as e:
|
||||
# Authentication failed, let user try again
|
||||
html = self.recaptcha_template.render(
|
||||
session=session,
|
||||
myurl="%s/r0/auth/%s/fallback/web"
|
||||
% (CLIENT_API_PREFIX, LoginType.RECAPTCHA),
|
||||
sitekey=self.hs.config.recaptcha_public_key,
|
||||
error=e.msg,
|
||||
)
|
||||
else:
|
||||
# No LoginError was raised, so authentication was successful
|
||||
html = self.success_template.render()
|
||||
|
||||
elif stagetype == LoginType.TERMS:
|
||||
authdict = {"session": session}
|
||||
|
||||
success = await self.auth_handler.add_oob_auth(
|
||||
LoginType.TERMS, authdict, request.getClientIP()
|
||||
)
|
||||
|
||||
if success:
|
||||
html = self.success_template.render()
|
||||
else:
|
||||
try:
|
||||
await self.auth_handler.add_oob_auth(
|
||||
LoginType.TERMS, authdict, request.getClientIP()
|
||||
)
|
||||
except LoginError as e:
|
||||
# Authentication failed, let user try again
|
||||
html = self.terms_template.render(
|
||||
session=session,
|
||||
terms_url="%s_matrix/consent?v=%s"
|
||||
@@ -127,10 +130,16 @@ class AuthRestServlet(RestServlet):
|
||||
),
|
||||
myurl="%s/r0/auth/%s/fallback/web"
|
||||
% (CLIENT_API_PREFIX, LoginType.TERMS),
|
||||
error=e.msg,
|
||||
)
|
||||
else:
|
||||
# No LoginError was raised, so authentication was successful
|
||||
html = self.success_template.render()
|
||||
|
||||
elif stagetype == LoginType.SSO:
|
||||
# The SSO fallback workflow should not post here,
|
||||
raise SynapseError(404, "Fallback SSO auth does not support POST requests.")
|
||||
|
||||
else:
|
||||
raise SynapseError(404, "Unknown auth stage type")
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from synapse.http.site import SynapseRequest
|
||||
from synapse.types import JsonDict, StreamToken
|
||||
from synapse.util import json_decoder
|
||||
|
||||
from ...logging import issue9533_logger
|
||||
from ._base import client_patterns, set_timeline_upper_limit
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -185,6 +186,9 @@ class SyncRestServlet(RestServlet):
|
||||
full_state=full_state,
|
||||
)
|
||||
|
||||
issue9533_logger.debug(
|
||||
"Sync body generated, next batch: %s", sync_result.next_batch
|
||||
)
|
||||
# the client may have disconnected by now; don't bother to serialize the
|
||||
# response if so.
|
||||
if request._disconnected:
|
||||
@@ -246,6 +250,9 @@ class SyncRestServlet(RestServlet):
|
||||
|
||||
if sync_result.to_device:
|
||||
response["to_device"] = {"events": sync_result.to_device}
|
||||
issue9533_logger.debug(
|
||||
"to_device sent down in sync %s", response["to_device"]
|
||||
)
|
||||
|
||||
if sync_result.device_lists.changed:
|
||||
response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
|
||||
|
||||
@@ -57,4 +57,8 @@ textarea, input {
|
||||
|
||||
background-color: #f8f8f8;
|
||||
border: 1px #ccc solid;
|
||||
}
|
||||
}
|
||||
|
||||
.error {
|
||||
color: red;
|
||||
}
|
||||
|
||||
@@ -135,6 +135,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
has_changed = self._device_inbox_stream_cache.has_entity_changed(
|
||||
user_id, last_stream_id
|
||||
)
|
||||
issue9533_logger.debug(
|
||||
"get_new_messages_for_device"
|
||||
" user %s device %s last_stream_id %s current_stream_id %s has_changed %s",
|
||||
user_id,
|
||||
device_id,
|
||||
last_stream_id,
|
||||
current_stream_id,
|
||||
has_changed,
|
||||
)
|
||||
if not has_changed:
|
||||
return ([], current_stream_id)
|
||||
|
||||
@@ -157,9 +166,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
stream_pos = current_stream_id
|
||||
return messages, stream_pos
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
result = await self.db_pool.runInteraction(
|
||||
"get_new_messages_for_device", get_new_messages_for_device_txn
|
||||
)
|
||||
issue9533_logger.debug(
|
||||
"get_all_new_device_messages: after sorting, "
|
||||
"last_stream_id %s, current_stream_id %s result %s",
|
||||
last_stream_id,
|
||||
current_stream_id,
|
||||
result,
|
||||
)
|
||||
# TODO check that the stream_pos here gets read by the consumer---we may need
|
||||
# to adjust the next_batch token if we did use teh limit above
|
||||
return result
|
||||
|
||||
@trace
|
||||
async def delete_messages_for_device(
|
||||
@@ -181,6 +200,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
set_tag("last_deleted_stream_id", last_deleted_stream_id)
|
||||
issue9533_logger.debug(
|
||||
"delete_messages_for_device: user %s device %s; cache says last_deleted_stream_id=%s",
|
||||
user_id,
|
||||
device_id,
|
||||
last_deleted_stream_id,
|
||||
)
|
||||
|
||||
if last_deleted_stream_id:
|
||||
has_changed = self._device_inbox_stream_cache.has_entity_changed(
|
||||
@@ -188,6 +213,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
)
|
||||
if not has_changed:
|
||||
log_kv({"message": "No changes in cache since last check"})
|
||||
issue9533_logger.debug("No changes in cache since last check")
|
||||
return 0
|
||||
|
||||
def delete_messages_for_device_txn(txn):
|
||||
@@ -204,6 +230,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
log_kv({"message": f"deleted {count} messages for device", "count": count})
|
||||
issue9533_logger.debug("deleted %s messages for device", count)
|
||||
|
||||
# Update the cache, ensuring that we only ever increase the value
|
||||
last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
@@ -212,6 +239,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
self._last_device_delete_cache[(user_id, device_id)] = max(
|
||||
last_deleted_stream_id, up_to_stream_id
|
||||
)
|
||||
issue9533_logger.debug(
|
||||
"cache updated for (user, device) to %s",
|
||||
max(last_deleted_stream_id, up_to_stream_id),
|
||||
)
|
||||
|
||||
return count
|
||||
|
||||
@@ -355,9 +386,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
|
||||
return updates, upto_token, limited
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
result = await self.db_pool.runInteraction(
|
||||
"get_all_new_device_messages", get_all_new_device_messages_txn
|
||||
)
|
||||
issue9533_logger.debug(
|
||||
"get_all_new_device_messages: after sorting, got %s", result
|
||||
)
|
||||
return result
|
||||
|
||||
@trace
|
||||
async def add_messages_to_device_inbox(
|
||||
|
||||
@@ -307,7 +307,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def get_invited_rooms_for_local_user(self, user_id: str) -> RoomsForUser:
|
||||
async def get_invited_rooms_for_local_user(
|
||||
self, user_id: str
|
||||
) -> List[RoomsForUser]:
|
||||
"""Get all the rooms the *local* user is invited to.
|
||||
|
||||
Args:
|
||||
@@ -522,7 +524,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
_get_users_server_still_shares_room_with_txn,
|
||||
)
|
||||
|
||||
async def get_rooms_for_user(self, user_id: str, on_invalidate=None):
|
||||
async def get_rooms_for_user(
|
||||
self, user_id: str, on_invalidate=None
|
||||
) -> FrozenSet[str]:
|
||||
"""Returns a set of room_ids the user is currently joined to.
|
||||
|
||||
If a remote user only returns rooms this server is currently
|
||||
|
||||
@@ -365,7 +365,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
return False
|
||||
|
||||
async def update_profile_in_user_dir(
|
||||
self, user_id: str, display_name: str, avatar_url: str
|
||||
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
|
||||
) -> None:
|
||||
"""
|
||||
Update or add a user's profile in the user directory.
|
||||
|
||||
@@ -14,25 +14,40 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.types import PersistedEventPosition
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
RoomsForUser = namedtuple(
|
||||
"RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering")
|
||||
)
|
||||
|
||||
GetRoomsForUserWithStreamOrdering = namedtuple(
|
||||
"GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos")
|
||||
)
|
||||
@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True)
|
||||
class RoomsForUser:
|
||||
room_id: str
|
||||
sender: str
|
||||
membership: str
|
||||
event_id: str
|
||||
stream_ordering: int
|
||||
|
||||
|
||||
# We store this using a namedtuple so that we save about 3x space over using a
|
||||
# dict.
|
||||
ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
|
||||
@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True)
|
||||
class GetRoomsForUserWithStreamOrdering:
|
||||
room_id: str
|
||||
event_pos: PersistedEventPosition
|
||||
|
||||
# "members" points to a truncated list of (user_id, event_id) tuples for users of
|
||||
# a given membership type, suitable for use in calculating heroes for a room.
|
||||
# "count" points to the total numberr of users of a given membership type.
|
||||
MemberSummary = namedtuple("MemberSummary", ("members", "count"))
|
||||
|
||||
@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True)
|
||||
class ProfileInfo:
|
||||
avatar_url: Optional[str]
|
||||
display_name: Optional[str]
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, weakref_slot=True, auto_attribs=True)
|
||||
class MemberSummary:
|
||||
# A truncated list of (user_id, event_id) tuples for users of a given
|
||||
# membership type, suitable for use in calculating heroes for a room.
|
||||
members: List[Tuple[str, str]]
|
||||
# The total number of users of a given membership type.
|
||||
count: int
|
||||
|
||||
@@ -17,7 +17,7 @@ from unittest.mock import Mock
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import EduTypes
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
from synapse.events.presence_router import PresenceRouter, load_legacy_presence_router
|
||||
from synapse.federation.units import Transaction
|
||||
from synapse.handlers.presence import UserPresenceState
|
||||
from synapse.module_api import ModuleApi
|
||||
@@ -34,7 +34,7 @@ class PresenceRouterTestConfig:
|
||||
users_who_should_receive_all_presence = attr.ib(type=List[str], default=[])
|
||||
|
||||
|
||||
class PresenceRouterTestModule:
|
||||
class LegacyPresenceRouterTestModule:
|
||||
def __init__(self, config: PresenceRouterTestConfig, module_api: ModuleApi):
|
||||
self._config = config
|
||||
self._module_api = module_api
|
||||
@@ -77,6 +77,53 @@ class PresenceRouterTestModule:
|
||||
return config
|
||||
|
||||
|
||||
class PresenceRouterTestModule:
|
||||
def __init__(self, config: PresenceRouterTestConfig, api: ModuleApi):
|
||||
self._config = config
|
||||
self._module_api = api
|
||||
api.register_presence_router_callbacks(
|
||||
get_users_for_states=self.get_users_for_states,
|
||||
get_interested_users=self.get_interested_users,
|
||||
)
|
||||
|
||||
async def get_users_for_states(
|
||||
self, state_updates: Iterable[UserPresenceState]
|
||||
) -> Dict[str, Set[UserPresenceState]]:
|
||||
users_to_state = {
|
||||
user_id: set(state_updates)
|
||||
for user_id in self._config.users_who_should_receive_all_presence
|
||||
}
|
||||
return users_to_state
|
||||
|
||||
async def get_interested_users(
|
||||
self, user_id: str
|
||||
) -> Union[Set[str], PresenceRouter.ALL_USERS]:
|
||||
if user_id in self._config.users_who_should_receive_all_presence:
|
||||
return PresenceRouter.ALL_USERS
|
||||
|
||||
return set()
|
||||
|
||||
@staticmethod
|
||||
def parse_config(config_dict: dict) -> PresenceRouterTestConfig:
|
||||
"""Parse a configuration dictionary from the homeserver config, do
|
||||
some validation and return a typed PresenceRouterConfig.
|
||||
|
||||
Args:
|
||||
config_dict: The configuration dictionary.
|
||||
|
||||
Returns:
|
||||
A validated config object.
|
||||
"""
|
||||
# Initialise a typed config object
|
||||
config = PresenceRouterTestConfig()
|
||||
|
||||
config.users_who_should_receive_all_presence = config_dict.get(
|
||||
"users_who_should_receive_all_presence"
|
||||
)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
class PresenceRouterTestCase(FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
@@ -86,9 +133,17 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
|
||||
]
|
||||
|
||||
def make_homeserver(self, reactor, clock):
|
||||
return self.setup_test_homeserver(
|
||||
hs = self.setup_test_homeserver(
|
||||
federation_transport_client=Mock(spec=["send_transaction"]),
|
||||
)
|
||||
# Load the modules into the homeserver
|
||||
module_api = hs.get_module_api()
|
||||
for module, config in hs.config.modules.loaded_modules:
|
||||
module(config=config, api=module_api)
|
||||
|
||||
load_legacy_presence_router(hs)
|
||||
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
self.sync_handler = self.hs.get_sync_handler()
|
||||
@@ -98,7 +153,7 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
|
||||
{
|
||||
"presence": {
|
||||
"presence_router": {
|
||||
"module": __name__ + ".PresenceRouterTestModule",
|
||||
"module": __name__ + ".LegacyPresenceRouterTestModule",
|
||||
"config": {
|
||||
"users_who_should_receive_all_presence": [
|
||||
"@presence_gobbler:test",
|
||||
@@ -109,7 +164,28 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
|
||||
"send_federation": True,
|
||||
}
|
||||
)
|
||||
def test_receiving_all_presence_legacy(self):
|
||||
self.receiving_all_presence_test_body()
|
||||
|
||||
@override_config(
|
||||
{
|
||||
"modules": [
|
||||
{
|
||||
"module": __name__ + ".PresenceRouterTestModule",
|
||||
"config": {
|
||||
"users_who_should_receive_all_presence": [
|
||||
"@presence_gobbler:test",
|
||||
]
|
||||
},
|
||||
},
|
||||
],
|
||||
"send_federation": True,
|
||||
}
|
||||
)
|
||||
def test_receiving_all_presence(self):
|
||||
self.receiving_all_presence_test_body()
|
||||
|
||||
def receiving_all_presence_test_body(self):
|
||||
"""Test that a user that does not share a room with another other can receive
|
||||
presence for them, due to presence routing.
|
||||
"""
|
||||
@@ -203,7 +279,7 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
|
||||
{
|
||||
"presence": {
|
||||
"presence_router": {
|
||||
"module": __name__ + ".PresenceRouterTestModule",
|
||||
"module": __name__ + ".LegacyPresenceRouterTestModule",
|
||||
"config": {
|
||||
"users_who_should_receive_all_presence": [
|
||||
"@presence_gobbler1:test",
|
||||
@@ -216,7 +292,30 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
|
||||
"send_federation": True,
|
||||
}
|
||||
)
|
||||
def test_send_local_online_presence_to_with_module_legacy(self):
|
||||
self.send_local_online_presence_to_with_module_test_body()
|
||||
|
||||
@override_config(
|
||||
{
|
||||
"modules": [
|
||||
{
|
||||
"module": __name__ + ".PresenceRouterTestModule",
|
||||
"config": {
|
||||
"users_who_should_receive_all_presence": [
|
||||
"@presence_gobbler1:test",
|
||||
"@presence_gobbler2:test",
|
||||
"@far_away_person:island",
|
||||
]
|
||||
},
|
||||
},
|
||||
],
|
||||
"send_federation": True,
|
||||
}
|
||||
)
|
||||
def test_send_local_online_presence_to_with_module(self):
|
||||
self.send_local_online_presence_to_with_module_test_body()
|
||||
|
||||
def send_local_online_presence_to_with_module_test_body(self):
|
||||
"""Tests that send_local_presence_to_users sends local online presence to a set
|
||||
of specified local and remote users, with a custom PresenceRouter module enabled.
|
||||
"""
|
||||
|
||||
@@ -20,7 +20,7 @@ from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
|
||||
from synapse.handlers.room import RoomEventSource
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.storage.roommember import RoomsForUser
|
||||
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
|
||||
from synapse.types import PersistedEventPosition
|
||||
|
||||
from tests.server import FakeTransport
|
||||
@@ -216,7 +216,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||
self.check(
|
||||
"get_rooms_for_user_with_stream_ordering",
|
||||
(USER_ID_2,),
|
||||
{(ROOM_ID, expected_pos)},
|
||||
{GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
|
||||
)
|
||||
|
||||
def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
|
||||
@@ -305,7 +305,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||
expected_pos = PersistedEventPosition(
|
||||
"master", j2.internal_metadata.stream_ordering
|
||||
)
|
||||
self.assertEqual(joined_rooms, {(ROOM_ID, expected_pos)})
|
||||
self.assertEqual(
|
||||
joined_rooms,
|
||||
{GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
|
||||
)
|
||||
|
||||
event_id = 0
|
||||
|
||||
|
||||
@@ -29,123 +29,6 @@ from tests import unittest
|
||||
"""Tests admin REST events for /rooms paths."""
|
||||
|
||||
|
||||
class ShutdownRoomTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||
login.register_servlets,
|
||||
events.register_servlets,
|
||||
room.register_servlets,
|
||||
room.register_deprecated_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
hs.config.user_consent_version = "1"
|
||||
|
||||
consent_uri_builder = Mock()
|
||||
consent_uri_builder.build_user_consent_uri.return_value = "http://example.com"
|
||||
self.event_creation_handler._consent_uri_builder = consent_uri_builder
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
self.other_user = self.register_user("user", "pass")
|
||||
self.other_user_token = self.login("user", "pass")
|
||||
|
||||
# Mark the admin user as having consented
|
||||
self.get_success(self.store.user_set_consent_version(self.admin_user, "1"))
|
||||
|
||||
def test_shutdown_room_consent(self):
|
||||
"""Test that we can shutdown rooms with local users who have not
|
||||
yet accepted the privacy policy. This used to fail when we tried to
|
||||
force part the user from the old room.
|
||||
"""
|
||||
self.event_creation_handler._block_events_without_consent_error = None
|
||||
|
||||
room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_token)
|
||||
|
||||
# Assert one user in room
|
||||
users_in_room = self.get_success(self.store.get_users_in_room(room_id))
|
||||
self.assertEqual([self.other_user], users_in_room)
|
||||
|
||||
# Enable require consent to send events
|
||||
self.event_creation_handler._block_events_without_consent_error = "Error"
|
||||
|
||||
# Assert that the user is getting consent error
|
||||
self.helper.send(
|
||||
room_id, body="foo", tok=self.other_user_token, expect_code=403
|
||||
)
|
||||
|
||||
# Test that the admin can still send shutdown
|
||||
url = "/_synapse/admin/v1/shutdown_room/" + room_id
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
url.encode("ascii"),
|
||||
json.dumps({"new_room_user_id": self.admin_user}),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Assert there is now no longer anyone in the room
|
||||
users_in_room = self.get_success(self.store.get_users_in_room(room_id))
|
||||
self.assertEqual([], users_in_room)
|
||||
|
||||
def test_shutdown_room_block_peek(self):
|
||||
"""Test that a world_readable room can no longer be peeked into after
|
||||
it has been shut down.
|
||||
"""
|
||||
|
||||
self.event_creation_handler._block_events_without_consent_error = None
|
||||
|
||||
room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_token)
|
||||
|
||||
# Enable world readable
|
||||
url = "rooms/%s/state/m.room.history_visibility" % (room_id,)
|
||||
channel = self.make_request(
|
||||
"PUT",
|
||||
url.encode("ascii"),
|
||||
json.dumps({"history_visibility": "world_readable"}),
|
||||
access_token=self.other_user_token,
|
||||
)
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Test that the admin can still send shutdown
|
||||
url = "/_synapse/admin/v1/shutdown_room/" + room_id
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
url.encode("ascii"),
|
||||
json.dumps({"new_room_user_id": self.admin_user}),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Assert we can no longer peek into the room
|
||||
self._assert_peek(room_id, expect_code=403)
|
||||
|
||||
def _assert_peek(self, room_id, expect_code):
|
||||
"""Assert that the admin user can (or cannot) peek into the room."""
|
||||
|
||||
url = "rooms/%s/initialSync" % (room_id,)
|
||||
channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok
|
||||
)
|
||||
self.assertEqual(
|
||||
expect_code, int(channel.result["code"]), msg=channel.result["body"]
|
||||
)
|
||||
|
||||
url = "events?timeout=0&room_id=" + room_id
|
||||
channel = self.make_request(
|
||||
"GET", url.encode("ascii"), access_token=self.admin_user_tok
|
||||
)
|
||||
self.assertEqual(
|
||||
expect_code, int(channel.result["code"]), msg=channel.result["body"]
|
||||
)
|
||||
|
||||
|
||||
@parameterized_class(
|
||||
("method", "url_template"),
|
||||
[
|
||||
@@ -557,51 +440,6 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
|
||||
class PurgeRoomTestCase(unittest.HomeserverTestCase):
|
||||
"""Test /purge_room admin API."""
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
def test_purge_room(self):
|
||||
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
# All users have to have left the room.
|
||||
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
url = "/_synapse/admin/v1/purge_room"
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
url.encode("ascii"),
|
||||
{"room_id": room_id},
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
# Test that the following tables have been purged of all rows related to the room.
|
||||
for table in PURGE_TABLES:
|
||||
count = self.get_success(
|
||||
self.store.db_pool.simple_select_one_onecol(
|
||||
table=table,
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="COUNT(*)",
|
||||
desc="test_purge_room",
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(count, 0, msg=f"Rows not purged in {table}")
|
||||
|
||||
|
||||
class RoomTestCase(unittest.HomeserverTestCase):
|
||||
"""Test /room admin API."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user