Compare commits
11 Commits
v1.99.0
...
anoa/prese
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
43131b266d | ||
|
|
f11ccf7d7d | ||
|
|
2d656e7b7f | ||
|
|
18f50fb85d | ||
|
|
847cc492d8 | ||
|
|
e5b7f22953 | ||
|
|
b9dedcaa65 | ||
|
|
0c49a14aec | ||
|
|
5a43256e93 | ||
|
|
bf24bd797c | ||
|
|
de669a2abd |
@@ -384,7 +384,11 @@ massive excess of outgoing federation requests (see `discussion
|
||||
indicate that your server is also issuing far more outgoing federation
|
||||
requests than can be accounted for by your users' activity, this is a
|
||||
likely cause. The misbehavior can be worked around by setting
|
||||
``use_presence: false`` in the Synapse config file.
|
||||
the following in the Synapse config file:
|
||||
|
||||
.. code-block:: yaml
|
||||
presence:
|
||||
enabled: false
|
||||
|
||||
People can't accept room invitations from me
|
||||
--------------------------------------------
|
||||
|
||||
1
changelog.d/9491.feature
Normal file
1
changelog.d/9491.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add a Synapse module for routing presence updates between users.
|
||||
156
docs/presence_routing_module.md
Normal file
156
docs/presence_routing_module.md
Normal file
@@ -0,0 +1,156 @@
|
||||
# Presence Routing Module
|
||||
|
||||
Synapse supports configuring a module that can specify additional users
|
||||
(local or remote) to should receive certain presence updates from local
|
||||
users.
|
||||
|
||||
The presence routing module is implemented as a Python class, which will be imported by
|
||||
the running Synapse.
|
||||
|
||||
## Python Presence Router Class
|
||||
|
||||
The Python class is instantiated with two objects:
|
||||
|
||||
* A configuration object of some type (see below).
|
||||
* An instance of `synapse.module_api.ModuleApi`.
|
||||
|
||||
It then implements methods related to presence routing.
|
||||
|
||||
Note that one method of `ModuleApi` that may be useful is:
|
||||
|
||||
```python
|
||||
async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
|
||||
```
|
||||
|
||||
which can be given a list of local or remote MXIDs to broadcast known, online user
|
||||
presence to (for those users that the receiving user is considered interested in).
|
||||
It does not include state for users who are currently offline.
|
||||
|
||||
### Example
|
||||
|
||||
Below is an example implementation of a presence router class.
|
||||
|
||||
```python
|
||||
from typing import Dict, Iterable, Set, Union
|
||||
from typing_extensions import Literal
|
||||
from synapse.handlers.presence import UserPresenceState
|
||||
from synapse.module_api import ModuleApi
|
||||
class PresenceRouterConfig:
|
||||
def __init__(self):
|
||||
# Config options with their defaults
|
||||
# A list of users to always send all user presence updates to
|
||||
self.always_send_to_users = [] # type: List[str]
|
||||
|
||||
# A list of users to ignore presence updates for. Does not affect
|
||||
# shared-room presence relationships
|
||||
self.blacklisted_users = [] # type: List[str]
|
||||
class ExamplePresenceRouter:
|
||||
"""An example implementation of synapse.presence_router.PresenceRouter.
|
||||
Supports routing all presence to a configured set of users, or a subset
|
||||
of presence from certain users to members of certain rooms.
|
||||
Args:
|
||||
config: A configuration object.
|
||||
module_api: An instance of Synapse's ModuleApi.
|
||||
"""
|
||||
def __init__(self, config: PresenceRouterConfig, module_api: ModuleApi):
|
||||
self._config = config
|
||||
self._module_api = module_api
|
||||
@staticmethod
|
||||
def parse_config(config_dict: dict) -> PresenceRouterConfig:
|
||||
"""Parse a configuration dictionary from the homeserver config, do
|
||||
some validation and return a typed PresenceRouterConfig.
|
||||
Args:
|
||||
config_dict: The configuration dictionary.
|
||||
Returns:
|
||||
A validated config object.
|
||||
"""
|
||||
# Initialise a typed config object
|
||||
config = PresenceRouterConfig()
|
||||
always_send_to_users = config_dict.get("always_send_to_users")
|
||||
blacklisted_users = config_dict.get("blacklisted_users")
|
||||
# Do some validation of config options... otherwise raise a
|
||||
# synapse.config.ConfigError.
|
||||
config.always_send_to_users = always_send_to_users
|
||||
config.blacklisted_users = blacklisted_users
|
||||
return config
|
||||
async def get_users_for_states(
|
||||
self,
|
||||
state_updates: Iterable[UserPresenceState],
|
||||
) -> Dict[str, Set[UserPresenceState]]:
|
||||
"""Given an iterable of user presence updates, determine where each one
|
||||
needs to go. Returned results will not affect presence updates that are
|
||||
sent between users who share a room.
|
||||
Args:
|
||||
state_updates: An iterable of user presence state updates.
|
||||
Returns:
|
||||
A dictionary of user_id -> set of UserPresenceState that the user should
|
||||
receive.
|
||||
"""
|
||||
destination_users = {} # type: Dict[str, Set[UserPresenceState]
|
||||
# Ignore any updates for blacklisted users
|
||||
desired_updates = set()
|
||||
for update in state_updates:
|
||||
if update.state_key not in self._config.blacklisted_users:
|
||||
desired_updates.add(update)
|
||||
# Send all presence updates to specific users
|
||||
for user_id in self._config.always_send_to_users:
|
||||
destination_users[user_id] = desired_updates
|
||||
return destination_users
|
||||
async def get_interested_users(self, user_id: str) -> Union[Set[str], Literal["ALL"]]:
|
||||
"""
|
||||
Retrieve a list of users that `user_id` is interested in receiving the
|
||||
presence of. This will be in addition to those they share a room with.
|
||||
Optionally, the literal str "ALL" can be returned to indicate that this user
|
||||
should receive all incoming local and remote presence updates.
|
||||
Note that this method will only be called for local users.
|
||||
Args:
|
||||
user_id: A user requesting presence updates.
|
||||
Returns:
|
||||
A set of user IDs to return additional presence updates for, or "ALL" to return
|
||||
presence updates for all other users.
|
||||
"""
|
||||
if user_id in self._config.always_send_to_users:
|
||||
return "ALL"
|
||||
return set()
|
||||
```
|
||||
|
||||
#### A note on `get_users_for_states` and `get_interested_users`
|
||||
|
||||
Both of these methods are effectively two different sides of the same coin. The logic
|
||||
regarding which users should receive updates for other users should be the same
|
||||
between them.
|
||||
|
||||
`get_users_for_states` is called when presence updates come in from either federation
|
||||
or local users, and is used to either direct local presence to remote users, or to
|
||||
wake up the sync streams of local users to collect remote presence.
|
||||
|
||||
In contrast, `get_interested_users` is used to determine the users that presence should
|
||||
be fetched for when a local user is syncing. This presence in then retrieved, before
|
||||
being fed through `get_users_for_states` once again, with only the syncing user's
|
||||
routing information pulled from the resulting dictionary.
|
||||
|
||||
Thus, if one method is implemented, the other should be as well. Their routing logic
|
||||
should line up as well, else you may run into unintended behaviour.
|
||||
|
||||
## Configuration
|
||||
|
||||
Once you've crafted your module and installed it into the same Python environment as
|
||||
Synapse, amend your homeserver config file with the following.
|
||||
|
||||
```yaml
|
||||
presence:
|
||||
routing_module:
|
||||
module: my_module.ExamplePresenceRouter
|
||||
config:
|
||||
# Any configuration options for your module. The below is an example.
|
||||
# of setting options for ExamplePresenceRouter.
|
||||
always_send_to_users: ["@presence_gobbler:example.org"]
|
||||
blacklisted_users:
|
||||
- "@alice:example.com"
|
||||
- "@bob:example.com"
|
||||
...
|
||||
```
|
||||
|
||||
The contents of `config` will be passed as a Python dictionary to the static
|
||||
`parse_config` method of your class. The object returned by this method will
|
||||
then be passed to the `__init__` method of your module as `config`.
|
||||
@@ -82,9 +82,28 @@ pid_file: DATADIR/homeserver.pid
|
||||
#
|
||||
#soft_file_limit: 0
|
||||
|
||||
# Set to false to disable presence tracking on this homeserver.
|
||||
# Presence tracking allows users to see the state (e.g online/offline)
|
||||
# of other local and remote users.
|
||||
#
|
||||
#use_presence: false
|
||||
presence:
|
||||
# Uncomment to disable presence tracking on this homeserver. This option
|
||||
# replaces the previous top-level 'use_presence' option.
|
||||
#
|
||||
#enabled: false
|
||||
|
||||
# Presence routers are third-party modules that can specify additional logic
|
||||
# to where presence updates from users are routed.
|
||||
#
|
||||
presence_router:
|
||||
# The custom module's class. Uncomment to use a custom presence router module.
|
||||
#
|
||||
#module: "my_custom_router.PresenceRouter"
|
||||
|
||||
# Configuration options of the custom module. Refer to your module's
|
||||
# documentation for available options.
|
||||
#
|
||||
#config:
|
||||
# example_option: 'something'
|
||||
|
||||
# Whether to require authentication to retrieve profile data (avatars,
|
||||
# display names) of other users through the client API. Defaults to
|
||||
|
||||
@@ -282,6 +282,7 @@ class GenericWorkerPresence(BasePresenceHandler):
|
||||
self.hs = hs
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self.presence_router = hs.get_presence_router()
|
||||
self._presence_enabled = hs.config.use_presence
|
||||
|
||||
# The number of ongoing syncs on this process, by user id.
|
||||
@@ -394,7 +395,7 @@ class GenericWorkerPresence(BasePresenceHandler):
|
||||
return _user_syncing()
|
||||
|
||||
async def notify_from_replication(self, states, stream_id):
|
||||
parties = await get_interested_parties(self.store, states)
|
||||
parties = await get_interested_parties(self.store, self.presence_router, states)
|
||||
room_ids_to_states, users_to_states = parties
|
||||
|
||||
self.notifier.on_new_event(
|
||||
|
||||
@@ -27,6 +27,7 @@ import yaml
|
||||
from netaddr import AddrFormatError, IPNetwork, IPSet
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.util.module_loader import load_module
|
||||
from synapse.util.stringutils import parse_and_validate_server_name
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
@@ -238,7 +239,20 @@ class ServerConfig(Config):
|
||||
self.public_baseurl = config.get("public_baseurl")
|
||||
|
||||
# Whether to enable user presence.
|
||||
self.use_presence = config.get("use_presence", True)
|
||||
presence_config = config.get("presence") or {}
|
||||
self.use_presence = presence_config.get("enabled")
|
||||
if self.use_presence is None:
|
||||
self.use_presence = config.get("use_presence", True)
|
||||
|
||||
# Custom presence router module
|
||||
self.presence_router_module_class = None
|
||||
self.presence_router_config = None
|
||||
presence_router_config = presence_config.get("presence_router")
|
||||
if presence_router_config:
|
||||
(
|
||||
self.presence_router_module_class,
|
||||
self.presence_router_config,
|
||||
) = load_module(presence_router_config, ("presence", "presence_router"))
|
||||
|
||||
# Whether to update the user directory or not. This should be set to
|
||||
# false only if we are updating the user directory in a worker
|
||||
@@ -834,9 +848,28 @@ class ServerConfig(Config):
|
||||
#
|
||||
#soft_file_limit: 0
|
||||
|
||||
# Set to false to disable presence tracking on this homeserver.
|
||||
# Presence tracking allows users to see the state (e.g online/offline)
|
||||
# of other local and remote users.
|
||||
#
|
||||
#use_presence: false
|
||||
presence:
|
||||
# Uncomment to disable presence tracking on this homeserver. This option
|
||||
# replaces the previous top-level 'use_presence' option.
|
||||
#
|
||||
#enabled: false
|
||||
|
||||
# Presence routers are third-party modules that can specify additional logic
|
||||
# to where presence updates from users are routed.
|
||||
#
|
||||
presence_router:
|
||||
# The custom module's class. Uncomment to use a custom presence router module.
|
||||
#
|
||||
#module: "my_custom_router.PresenceRouter"
|
||||
|
||||
# Configuration options of the custom module. Refer to your module's
|
||||
# documentation for available options.
|
||||
#
|
||||
#config:
|
||||
# example_option: 'something'
|
||||
|
||||
# Whether to require authentication to retrieve profile data (avatars,
|
||||
# display names) of other users through the client API. Defaults to
|
||||
|
||||
96
synapse/events/presence_router.py
Normal file
96
synapse/events/presence_router.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
from synapse.api.presence import UserPresenceState
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class PresenceRouter:
|
||||
"""
|
||||
A module that the homeserver will call upon to help route user presence updates to
|
||||
additional destinations. If a custom presence router is configured, calls will be
|
||||
passed to that instead.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.custom_presence_router = None
|
||||
|
||||
# Check whether a custom presence router module has been configured
|
||||
if hs.config.presence_router_module_class:
|
||||
# Initialise the module
|
||||
self.custom_presence_router = hs.config.presence_router_module_class(
|
||||
config=hs.config.presence_router_config, module_api=hs.get_module_api()
|
||||
)
|
||||
|
||||
async def get_users_for_states(
|
||||
self,
|
||||
state_updates: Iterable[UserPresenceState],
|
||||
) -> Dict[str, Set[UserPresenceState]]:
|
||||
"""
|
||||
Given an iterable of user presence updates, determine where each one
|
||||
needs to go.
|
||||
|
||||
Args:
|
||||
state_updates: An iterable of user presence state updates.
|
||||
|
||||
Returns:
|
||||
A dictionary of user_id -> set of UserPresenceState, indicating which
|
||||
presence updates each user should receive.
|
||||
"""
|
||||
if self.custom_presence_router is not None and hasattr(
|
||||
self.custom_presence_router, "get_users_for_states"
|
||||
):
|
||||
# Ask the custom module
|
||||
return await self.custom_presence_router.get_users_for_states(
|
||||
state_updates=state_updates
|
||||
)
|
||||
|
||||
# Don't include any extra destinations for presence updates
|
||||
return {}
|
||||
|
||||
async def get_interested_users(
|
||||
self, user_id: str
|
||||
) -> Union[Set[str], Literal["ALL"]]:
|
||||
"""
|
||||
Retrieve a list of users that the provided user is interested in receiving the presence
|
||||
of. Optionally, the str "ALL" can be returned to mean that this user should receive all
|
||||
local and remote incoming presence.
|
||||
|
||||
Note that this method will only be called for local users.
|
||||
|
||||
Args:
|
||||
user_id: A user requesting presence updates.
|
||||
|
||||
Returns:
|
||||
A set of user IDs to return presence updates for, or ALL to return all
|
||||
known updates.
|
||||
"""
|
||||
if self.custom_presence_router is not None and hasattr(
|
||||
self.custom_presence_router, "get_interested_users"
|
||||
):
|
||||
# Ask the custom module for interested users
|
||||
return await self.custom_presence_router.get_interested_users(
|
||||
user_id=user_id
|
||||
)
|
||||
|
||||
# A custom presence router is not defined, or doesn't implement any relevant function.
|
||||
# Don't report any additional interested users.
|
||||
return set()
|
||||
@@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -43,6 +43,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import ReadReceipt, RoomStreamToken
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_pdus_destination_dist_count = Counter(
|
||||
@@ -66,7 +69,7 @@ CATCH_UP_STARTUP_INTERVAL_SEC = 5
|
||||
|
||||
|
||||
class FederationSender:
|
||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
|
||||
@@ -76,6 +79,7 @@ class FederationSender:
|
||||
self.clock = hs.get_clock()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self._presence_router = hs.get_presence_router()
|
||||
self._transaction_manager = TransactionManager(hs)
|
||||
|
||||
self._instance_name = hs.get_instance_name()
|
||||
@@ -498,7 +502,9 @@ class FederationSender:
|
||||
"""Given a list of states populate self.pending_presence_by_dest and
|
||||
poke to send a new transaction to each destination
|
||||
"""
|
||||
hosts_and_states = await get_interested_remotes(self.store, states, self.state)
|
||||
hosts_and_states = await get_interested_remotes(
|
||||
self.store, self._presence_router, states, self.state
|
||||
)
|
||||
|
||||
for destinations, states in hosts_and_states:
|
||||
for destination in destinations:
|
||||
|
||||
@@ -25,15 +25,26 @@ The methods that define policy are:
|
||||
import abc
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
from prometheus_client import Counter
|
||||
from typing_extensions import ContextManager
|
||||
from typing_extensions import ContextManager, Literal
|
||||
|
||||
import synapse.metrics
|
||||
from synapse.api.constants import EventTypes, Membership, PresenceState
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.metrics import LaterGauge
|
||||
@@ -42,7 +53,7 @@ from synapse.state import StateHandler
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.descriptors import _CacheContext, cached
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.wheel_timer import WheelTimer
|
||||
|
||||
@@ -207,6 +218,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
self.notifier = hs.get_notifier()
|
||||
self.federation = hs.get_federation_sender()
|
||||
self.state = hs.get_state_handler()
|
||||
self.presence_router = hs.get_presence_router()
|
||||
self._presence_enabled = hs.config.use_presence
|
||||
|
||||
federation_registry = hs.get_federation_registry()
|
||||
@@ -651,7 +663,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
"""
|
||||
stream_id, max_token = await self.store.update_presence(states)
|
||||
|
||||
parties = await get_interested_parties(self.store, states)
|
||||
parties = await get_interested_parties(self.store, self.presence_router, states)
|
||||
room_ids_to_states, users_to_states = parties
|
||||
|
||||
self.notifier.on_new_event(
|
||||
@@ -1033,7 +1045,12 @@ class PresenceEventSource:
|
||||
#
|
||||
# Presence -> Notifier -> PresenceEventSource -> Presence
|
||||
#
|
||||
# Same with get_module_api, get_presence_router
|
||||
#
|
||||
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
|
||||
self.get_presence_handler = hs.get_presence_handler
|
||||
self.get_module_api = hs.get_module_api
|
||||
self.get_presence_router = hs.get_presence_router
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastore()
|
||||
self.state = hs.get_state_handler()
|
||||
@@ -1047,7 +1064,7 @@ class PresenceEventSource:
|
||||
include_offline=True,
|
||||
explicit_room_id=None,
|
||||
**kwargs
|
||||
):
|
||||
) -> Tuple[List[UserPresenceState], int]:
|
||||
# The process for getting presence events are:
|
||||
# 1. Get the rooms the user is in.
|
||||
# 2. Get the list of user in the rooms.
|
||||
@@ -1060,7 +1077,17 @@ class PresenceEventSource:
|
||||
# We don't try and limit the presence updates by the current token, as
|
||||
# sending down the rare duplicate is not a concern.
|
||||
|
||||
user_id = user.to_string()
|
||||
stream_change_cache = self.store.presence_stream_cache
|
||||
|
||||
with Measure(self.clock, "presence.get_new_events"):
|
||||
if user_id in self.get_module_api().send_full_presence_to_local_users:
|
||||
# This user has been specified by a module to receive all current, online
|
||||
# user presence. Removing from_key and setting include_offline to false
|
||||
# will do effectively this.
|
||||
from_key = None
|
||||
include_offline = False
|
||||
|
||||
if from_key is not None:
|
||||
from_key = int(from_key)
|
||||
|
||||
@@ -1083,59 +1110,186 @@ class PresenceEventSource:
|
||||
# doesn't return. C.f. #5503.
|
||||
return [], max_token
|
||||
|
||||
presence = self.get_presence_handler()
|
||||
stream_change_cache = self.store.presence_stream_cache
|
||||
|
||||
# Figure out which other users this user should receive updates for
|
||||
users_interested_in = await self._get_interested_in(user, explicit_room_id)
|
||||
|
||||
user_ids_changed = set() # type: Collection[str]
|
||||
changed = None
|
||||
if from_key:
|
||||
changed = stream_change_cache.get_all_entities_changed(from_key)
|
||||
# We have a set of users that we're interested in the presence of. We want to
|
||||
# cross-reference that with the users that have actually changed their presence.
|
||||
|
||||
if changed is not None and len(changed) < 500:
|
||||
assert isinstance(user_ids_changed, set)
|
||||
|
||||
# For small deltas, its quicker to get all changes and then
|
||||
# work out if we share a room or they're in our presence list
|
||||
get_updates_counter.labels("stream").inc()
|
||||
for other_user_id in changed:
|
||||
if other_user_id in users_interested_in:
|
||||
user_ids_changed.add(other_user_id)
|
||||
else:
|
||||
# Too many possible updates. Find all users we can see and check
|
||||
# if any of them have changed.
|
||||
get_updates_counter.labels("full").inc()
|
||||
# Check whether this user should see all user updates
|
||||
|
||||
if users_interested_in == "ALL":
|
||||
if from_key:
|
||||
user_ids_changed = stream_change_cache.get_entities_changed(
|
||||
users_interested_in, from_key
|
||||
# We need to return all new presence updates to this user, regardless of whether
|
||||
# they share a room with that user
|
||||
return await self._filter_all_presence_updates_for_user(
|
||||
user_id, max_token, from_key, include_offline
|
||||
)
|
||||
else:
|
||||
user_ids_changed = users_interested_in
|
||||
# This user should receive all user presence, and hasn't provided a from_key.
|
||||
# Send all currently known user presence states.
|
||||
users_to_state = await self.store.get_presence_for_all_users(
|
||||
include_offline=include_offline
|
||||
)
|
||||
|
||||
updates = await presence.current_state_for_users(user_ids_changed)
|
||||
return list(users_to_state.values()), max_token
|
||||
|
||||
if include_offline:
|
||||
return (list(updates.values()), max_token)
|
||||
else:
|
||||
return (
|
||||
[s for s in updates.values() if s.state != PresenceState.OFFLINE],
|
||||
max_token,
|
||||
# The set of users that we're interested in and that have had a presence update.
|
||||
# We'll actually pull the presence updates for these users at the end.
|
||||
interested_and_updated_users = (
|
||||
set()
|
||||
) # type: Union[Set[str], FrozenSet[str]]
|
||||
|
||||
if from_key:
|
||||
# First get all users that have had a presence update
|
||||
updated_users = stream_change_cache.get_all_entities_changed(from_key)
|
||||
|
||||
# Cross-reference users we're interested in with those that have had updates.
|
||||
# Use a slightly-optimised method for processing smaller sets of updates.
|
||||
if updated_users is not None and len(updated_users) < 500:
|
||||
# For small deltas, it's quicker to get all changes and then
|
||||
# cross-reference with the users we're interested in
|
||||
get_updates_counter.labels("stream").inc()
|
||||
for other_user_id in updated_users:
|
||||
if other_user_id in users_interested_in:
|
||||
# mypy thinks this variable could be a FrozenSet as it's possibly set
|
||||
# to one in the `get_entities_changed` call below, and `add()` is not
|
||||
# method on a FrozenSet. That doesn't affect us here though, as
|
||||
# `interested_and_updated_users` is clearly a set() above.
|
||||
interested_and_updated_users.add(other_user_id) # type: ignore
|
||||
else:
|
||||
# Too many possible updates. Find all users we can see and check
|
||||
# if any of them have changed.
|
||||
get_updates_counter.labels("full").inc()
|
||||
|
||||
interested_and_updated_users = (
|
||||
stream_change_cache.get_entities_changed(
|
||||
users_interested_in, from_key
|
||||
)
|
||||
)
|
||||
else:
|
||||
# No from_key has been specified. Return the presence for all users
|
||||
# this user is interested in
|
||||
interested_and_updated_users = users_interested_in
|
||||
|
||||
# Retrieve the current presence state for each user
|
||||
users_to_state = await self.get_presence_handler().current_state_for_users(
|
||||
interested_and_updated_users
|
||||
)
|
||||
presence_updates = list(users_to_state.values())
|
||||
|
||||
# Remove the user from the list of users to receive all presence
|
||||
if user_id in self.get_module_api().send_full_presence_to_local_users:
|
||||
self.get_module_api().send_full_presence_to_local_users.remove(user_id)
|
||||
|
||||
if not include_offline:
|
||||
# Filter out offline presence states
|
||||
presence_updates = self._filter_offline_presence_state(presence_updates)
|
||||
|
||||
return presence_updates, max_token
|
||||
|
||||
async def _filter_all_presence_updates_for_user(
|
||||
self,
|
||||
user_id: str,
|
||||
max_token: int,
|
||||
from_key: int,
|
||||
include_offline: bool,
|
||||
) -> Tuple[List[UserPresenceState], int]:
|
||||
# Only return updates since the last sync
|
||||
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
|
||||
from_key
|
||||
)
|
||||
if not updated_users:
|
||||
updated_users = []
|
||||
|
||||
# Get the actual presence update for each change
|
||||
users_to_state = await self.get_presence_handler().current_state_for_users(
|
||||
updated_users
|
||||
)
|
||||
|
||||
# TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
|
||||
# module for information on a number of users when we then only take the info
|
||||
# for a single user
|
||||
|
||||
# Filter through the presence router
|
||||
users_to_state_set = await self.get_presence_router().get_users_for_states(
|
||||
users_to_state.values()
|
||||
)
|
||||
|
||||
# We only want the mapping for the syncing user
|
||||
presence_updates = list(users_to_state_set[user_id])
|
||||
|
||||
# Remove the user from the list of users to receive all presence
|
||||
if user_id in self.get_module_api().send_full_presence_to_local_users:
|
||||
self.get_module_api().send_full_presence_to_local_users.remove(user_id)
|
||||
|
||||
if not include_offline:
|
||||
# Filter out offline states
|
||||
presence_updates = self._filter_offline_presence_state(presence_updates)
|
||||
|
||||
# Return presence updates for all users since the last sync
|
||||
return presence_updates, max_token
|
||||
|
||||
def _filter_offline_presence_state(
|
||||
self, presence_updates: Iterable[UserPresenceState]
|
||||
) -> List[UserPresenceState]:
|
||||
"""Given an iterable containing user presence updates, return a list with any offline
|
||||
presence states removed.
|
||||
|
||||
Args:
|
||||
presence_updates: Presence states to filter
|
||||
|
||||
Returns:
|
||||
A new list with any offline presence states removed.
|
||||
"""
|
||||
return [
|
||||
update
|
||||
for update in presence_updates
|
||||
if update.state != PresenceState.OFFLINE
|
||||
]
|
||||
|
||||
def get_current_key(self):
|
||||
return self.store.get_current_presence_token()
|
||||
|
||||
@cached(num_args=2, cache_context=True)
|
||||
async def _get_interested_in(self, user, explicit_room_id, cache_context):
|
||||
async def _get_interested_in(
|
||||
self,
|
||||
user: UserID,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
cache_context: Optional[_CacheContext] = None,
|
||||
) -> Union[Set[str], Literal["ALL"]]:
|
||||
"""Returns the set of users that the given user should see presence
|
||||
updates for
|
||||
updates for.
|
||||
|
||||
Args:
|
||||
user: The user to retrieve presence updates for.
|
||||
explicit_room_id: A
|
||||
"""
|
||||
user_id = user.to_string()
|
||||
users_interested_in = set()
|
||||
users_interested_in.add(user_id) # So that we receive our own presence
|
||||
|
||||
# cache_context isn't likely to ever be None due to the @cached decorator,
|
||||
# but we can't have a non-optional argument after the optional argument
|
||||
# explicit_room_id either. Assert cache_context is not None so we can use it
|
||||
# without mypy complaining.
|
||||
assert cache_context
|
||||
|
||||
# Check with the presence router whether we should poll additional users for
|
||||
# their presence information
|
||||
additional_users = await self.get_presence_router().get_interested_users(
|
||||
user.to_string()
|
||||
)
|
||||
if additional_users == "ALL":
|
||||
# If the module requested that this user see the presence updates of *all*
|
||||
# users, then simply return that instead of calculating what rooms this
|
||||
# user shares
|
||||
return "ALL"
|
||||
|
||||
# Add the additional users from the router
|
||||
users_interested_in.update(additional_users)
|
||||
|
||||
# Find the users who share a room with this user
|
||||
users_who_share_room = await self.store.get_users_who_share_room_with_user(
|
||||
user_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
@@ -1306,14 +1460,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||
|
||||
|
||||
async def get_interested_parties(
|
||||
store: DataStore, states: List[UserPresenceState]
|
||||
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
|
||||
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
|
||||
"""Given a list of states return which entities (rooms, users)
|
||||
are interested in the given states.
|
||||
|
||||
Args:
|
||||
store
|
||||
states
|
||||
store: The homeserver's data store.
|
||||
presence_router: A module for augmenting the destinations for presence updates.
|
||||
states: A list of incoming user presence updates.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of `(room_ids_to_states, users_to_states)`,
|
||||
@@ -1329,11 +1484,22 @@ async def get_interested_parties(
|
||||
# Always notify self
|
||||
users_to_states.setdefault(state.user_id, []).append(state)
|
||||
|
||||
# Ask a presence routing module for any additional parties if one
|
||||
# is loaded.
|
||||
router_users_to_states = await presence_router.get_users_for_states(states)
|
||||
|
||||
# Update the dictionaries with additional destinations and state to send
|
||||
for user_id, user_states in router_users_to_states.items():
|
||||
users_to_states.setdefault(user_id, []).extend(user_states)
|
||||
|
||||
return room_ids_to_states, users_to_states
|
||||
|
||||
|
||||
async def get_interested_remotes(
|
||||
store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
|
||||
store: DataStore,
|
||||
presence_router: PresenceRouter,
|
||||
states: List[UserPresenceState],
|
||||
state_handler: StateHandler,
|
||||
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
|
||||
"""Given a list of presence states figure out which remote servers
|
||||
should be sent which.
|
||||
@@ -1341,9 +1507,10 @@ async def get_interested_remotes(
|
||||
All the presence states should be for local users only.
|
||||
|
||||
Args:
|
||||
store
|
||||
states
|
||||
state_handler
|
||||
store: The homeserver's data store.
|
||||
presence_router: A module for augmenting the destinations for presence updates.
|
||||
states: A list of incoming user presence updates.
|
||||
state_handler:
|
||||
|
||||
Returns:
|
||||
A list of 2-tuples of destinations and states, where for
|
||||
@@ -1355,7 +1522,9 @@ async def get_interested_remotes(
|
||||
# First we look up the rooms each user is in (as well as any explicit
|
||||
# subscriptions), then for each distinct room we look up the remote
|
||||
# hosts in those rooms.
|
||||
room_ids_to_states, users_to_states = await get_interested_parties(store, states)
|
||||
room_ids_to_states, users_to_states = await get_interested_parties(
|
||||
store, presence_router, states
|
||||
)
|
||||
|
||||
for room_id, states in room_ids_to_states.items():
|
||||
hosts = await state_handler.get_current_hosts_in_room(room_id)
|
||||
|
||||
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Generator, Iterable, Optional, Tuple
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.federation.sender import FederationSender
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
@@ -50,11 +51,17 @@ class ModuleApi:
|
||||
self._auth = hs.get_auth()
|
||||
self._auth_handler = auth_handler
|
||||
self._server_name = hs.hostname
|
||||
self._presence_stream = hs.get_event_sources().sources["presence"]
|
||||
|
||||
# We expose these as properties below in order to attach a helpful docstring.
|
||||
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
||||
self._public_room_list_manager = PublicRoomListManager(hs)
|
||||
|
||||
# The next time these users sync, they will receive the current presence
|
||||
# state of all local users. Users are added by send_local_online_presence_to,
|
||||
# and removed after a successful sync.
|
||||
self.send_full_presence_to_local_users = set()
|
||||
|
||||
@property
|
||||
def http_client(self):
|
||||
"""Allows making outbound HTTP requests to remote resources.
|
||||
@@ -385,6 +392,34 @@ class ModuleApi:
|
||||
|
||||
return event
|
||||
|
||||
async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
|
||||
"""
|
||||
Forces the equivalent of a presence initial_sync for a set of local or remote
|
||||
users. The users will receive presence for all currently online users that they
|
||||
are considered interested in.
|
||||
|
||||
Updates to remote users will be sent immediately, whereas local users will receive
|
||||
them on their next sync attempt.
|
||||
"""
|
||||
for user in users:
|
||||
if self._hs.is_mine_id(user):
|
||||
# Modify SyncHandler._generate_sync_entry_for_presence to call
|
||||
# presence_source.get_new_events with an empty `from_key` if
|
||||
# that user's ID were in a list modified by ModuleApi somewhere.
|
||||
# That user would then get all presence state on next incremental sync.
|
||||
|
||||
# Force a presence initial_sync for this user next time
|
||||
self.send_full_presence_to_local_users.add(user)
|
||||
else:
|
||||
# Retrieve presence state for currently online users that this user
|
||||
# is considered interested in
|
||||
presence_events = await self._presence_stream.get_new_events(
|
||||
user, from_key=None, include_offline=False
|
||||
)
|
||||
await FederationSender.send_presence(
|
||||
[ev.state for ev in presence_events]
|
||||
)
|
||||
|
||||
|
||||
class PublicRoomListManager:
|
||||
"""Contains methods for adding to, removing from and querying whether a room
|
||||
|
||||
@@ -51,6 +51,7 @@ from synapse.crypto import context_factory
|
||||
from synapse.crypto.context_factory import RegularPolicyForHTTPS
|
||||
from synapse.crypto.keyring import Keyring
|
||||
from synapse.events.builder import EventBuilderFactory
|
||||
from synapse.events.presence_router import PresenceRouter
|
||||
from synapse.events.spamcheck import SpamChecker
|
||||
from synapse.events.third_party_rules import ThirdPartyEventRules
|
||||
from synapse.events.utils import EventClientSerializer
|
||||
@@ -423,6 +424,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
else:
|
||||
raise Exception("Workers cannot write typing")
|
||||
|
||||
@cache_in_self
|
||||
def get_presence_router(self) -> PresenceRouter:
|
||||
return PresenceRouter(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_typing_handler(self) -> FollowerTypingHandler:
|
||||
if self.config.worker.writers.typing == self.get_instance_name():
|
||||
|
||||
@@ -1906,6 +1906,7 @@ class DatabasePool:
|
||||
retcols: Iterable[str],
|
||||
filters: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Optional[Dict[str, Any]] = None,
|
||||
exclude_keyvalues: Optional[Dict[str, Any]] = None,
|
||||
order_direction: str = "ASC",
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
@@ -1929,7 +1930,10 @@ class DatabasePool:
|
||||
apply a WHERE ? LIKE ? clause.
|
||||
keyvalues:
|
||||
column names and values to select the rows with, or None to not
|
||||
apply a WHERE clause.
|
||||
apply a WHERE key = value clause.
|
||||
exclude_keyvalues:
|
||||
column names and values to exclude rows with, or None to not
|
||||
apply a WHERE key != value clause.
|
||||
order_direction: Whether the results should be ordered "ASC" or "DESC".
|
||||
|
||||
Returns:
|
||||
@@ -1938,7 +1942,7 @@ class DatabasePool:
|
||||
if order_direction not in ["ASC", "DESC"]:
|
||||
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
|
||||
|
||||
where_clause = "WHERE " if filters or keyvalues else ""
|
||||
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
|
||||
arg_list = [] # type: List[Any]
|
||||
if filters:
|
||||
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
|
||||
@@ -1947,6 +1951,9 @@ class DatabasePool:
|
||||
if keyvalues:
|
||||
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
|
||||
arg_list += list(keyvalues.values())
|
||||
if exclude_keyvalues:
|
||||
where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
|
||||
arg_list += list(exclude_keyvalues.values())
|
||||
|
||||
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
|
||||
", ".join(retcols),
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import List, Tuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||
@@ -157,5 +157,64 @@ class PresenceStore(SQLBaseStore):
|
||||
|
||||
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
||||
|
||||
async def get_presence_for_all_users(
|
||||
self,
|
||||
include_offline: bool = True,
|
||||
) -> Dict[str, UserPresenceState]:
|
||||
"""Retrieve the current presence state for all users.
|
||||
|
||||
Note that the presence_stream table is culled frequently, so it should only
|
||||
contain the latest presence state for each user.
|
||||
|
||||
Args:
|
||||
include_offline: Whether to include offline presence states
|
||||
|
||||
Returns:
|
||||
A dict of user IDs to their current UserPresenceState.
|
||||
"""
|
||||
users_to_state = {}
|
||||
|
||||
exclude_keyvalues = {}
|
||||
if not include_offline:
|
||||
# Exclude offline presence state
|
||||
exclude_keyvalues = {"state": "offline"}
|
||||
|
||||
# This may be a very heavy database query.
|
||||
# We paginate in order to not block a database connection.
|
||||
limit = 100
|
||||
offset = 0
|
||||
while True:
|
||||
rows = await self.db_pool.runInteraction(
|
||||
"get_presence_for_all_users",
|
||||
self.db_pool.simple_select_list_paginate_txn,
|
||||
"presence_stream",
|
||||
orderby="stream_id",
|
||||
start=offset,
|
||||
limit=limit,
|
||||
keyvalues={},
|
||||
exclude_keyvalues=exclude_keyvalues,
|
||||
retcols=(
|
||||
"user_id",
|
||||
"state",
|
||||
"last_active_ts",
|
||||
"last_federation_update_ts",
|
||||
"last_user_sync_ts",
|
||||
"status_msg",
|
||||
"currently_active",
|
||||
),
|
||||
order_direction="ASC",
|
||||
)
|
||||
|
||||
for row in rows:
|
||||
users_to_state[row["user_id"]] = UserPresenceState(**row)
|
||||
|
||||
# We've ran out of updates to query
|
||||
if len(rows) < limit:
|
||||
break
|
||||
|
||||
offset += limit
|
||||
|
||||
return users_to_state
|
||||
|
||||
def get_current_presence_token(self):
|
||||
return self._presence_id_gen.get_current_token()
|
||||
|
||||
224
tests/events/test_presence_router.py
Normal file
224
tests/events/test_presence_router.py
Normal file
@@ -0,0 +1,224 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Dict, Iterable, List, Optional, Set, Union
|
||||
|
||||
import attr
|
||||
from typing_extensions import Literal
|
||||
|
||||
from synapse.handlers.presence import UserPresenceState
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, presence, room
|
||||
from synapse.types import JsonDict, create_requester
|
||||
|
||||
from tests import unittest
|
||||
from tests.handlers.test_sync import generate_sync_config
|
||||
from tests.unittest import TestCase
|
||||
|
||||
|
||||
@attr.s
|
||||
class PresenceRouterTestConfig:
|
||||
users_who_should_receive_all_presence = attr.ib(type=List[str], default=[])
|
||||
|
||||
|
||||
class PresenceRouterTestModule:
|
||||
def __init__(self, config: PresenceRouterTestConfig, module_api: ModuleApi):
|
||||
self._config = config
|
||||
self._module_api = module_api
|
||||
|
||||
async def get_users_for_states(
|
||||
self, state_updates: Iterable[UserPresenceState]
|
||||
) -> Dict[str, Set[UserPresenceState]]:
|
||||
users_to_state = {
|
||||
user_id: set(state_updates)
|
||||
for user_id in self._config.users_who_should_receive_all_presence
|
||||
}
|
||||
return users_to_state
|
||||
|
||||
async def get_interested_users(
|
||||
self, user_id: str
|
||||
) -> Union[Set[str], Literal["ALL"]]:
|
||||
print()
|
||||
if user_id in self._config.users_who_should_receive_all_presence:
|
||||
return "ALL"
|
||||
|
||||
return set()
|
||||
|
||||
@staticmethod
|
||||
def parse_config(config_dict: dict) -> PresenceRouterTestConfig:
|
||||
"""Parse a configuration dictionary from the homeserver config, do
|
||||
some validation and return a typed PresenceRouterConfig.
|
||||
|
||||
Args:
|
||||
config_dict: The configuration dictionary.
|
||||
|
||||
Returns:
|
||||
A validated config object.
|
||||
"""
|
||||
# Initialise a typed config object
|
||||
config = PresenceRouterTestConfig()
|
||||
|
||||
config.users_who_should_receive_all_presence = config_dict.get(
|
||||
"users_who_should_receive_all_presence"
|
||||
)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
class PresenceRouterTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
presence.register_servlets,
|
||||
]
|
||||
|
||||
def default_config(self):
|
||||
config = super().default_config()
|
||||
config["presence"] = {
|
||||
"presence_router": {
|
||||
"module": __name__ + ".PresenceRouterTestModule",
|
||||
"config": {
|
||||
"users_who_should_receive_all_presence": ["@presence_gobbler:test"]
|
||||
},
|
||||
}
|
||||
}
|
||||
return config
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
self.sync_handler = self.hs.get_sync_handler()
|
||||
|
||||
# Create a user who should receive all presence of others
|
||||
self.presence_receiving_user_id = self.register_user(
|
||||
"presence_gobbler", "monkey"
|
||||
)
|
||||
self.presence_receiving_user_tok = self.login("presence_gobbler", "monkey")
|
||||
|
||||
# And two users who should not have any special routing
|
||||
self.other_user_one_id = self.register_user("other_user_one", "monkey")
|
||||
self.other_user_one_tok = self.login("other_user_one", "monkey")
|
||||
self.other_user_two_id = self.register_user("other_user_two", "monkey")
|
||||
self.other_user_two_tok = self.login("other_user_two", "monkey")
|
||||
|
||||
# Put the other two users in a room with each other
|
||||
self.room_id = self.helper.create_room_as(
|
||||
self.other_user_one_id, tok=self.other_user_one_tok
|
||||
)
|
||||
|
||||
self.helper.invite(
|
||||
self.room_id,
|
||||
self.other_user_one_id,
|
||||
self.other_user_two_id,
|
||||
tok=self.other_user_one_tok,
|
||||
)
|
||||
self.helper.join(
|
||||
self.room_id, self.other_user_two_id, tok=self.other_user_two_tok
|
||||
)
|
||||
|
||||
def test_receiving_all_presence(self):
|
||||
"""Test that a user that does not share a room with another other can receive
|
||||
presence for them, due to presence routing.
|
||||
"""
|
||||
# User one sends some presence
|
||||
send_presence_update(
|
||||
self,
|
||||
self.other_user_one_id,
|
||||
self.other_user_one_tok,
|
||||
"online",
|
||||
"boop",
|
||||
)
|
||||
|
||||
# Check that the presence receiving user gets user one's presence when syncing
|
||||
presence_updates = sync_presence(self, self.presence_receiving_user_id)
|
||||
self.assertEqual(len(presence_updates), 1)
|
||||
|
||||
presence_update = presence_updates[0] # type: UserPresenceState
|
||||
self.assertEqual(presence_update.user_id, self.other_user_one_id)
|
||||
self.assertEqual(presence_update.state, "online")
|
||||
self.assertEqual(presence_update.status_msg, "boop")
|
||||
|
||||
# Have all three users send presence
|
||||
send_presence_update(
|
||||
self,
|
||||
self.other_user_one_id,
|
||||
self.other_user_one_tok,
|
||||
"online",
|
||||
"user_one",
|
||||
)
|
||||
send_presence_update(
|
||||
self,
|
||||
self.other_user_two_id,
|
||||
self.other_user_two_tok,
|
||||
"online",
|
||||
"user_two",
|
||||
)
|
||||
send_presence_update(
|
||||
self,
|
||||
self.presence_receiving_user_id,
|
||||
self.presence_receiving_user_tok,
|
||||
"online",
|
||||
"presence_gobbler",
|
||||
)
|
||||
|
||||
# Check that the presence receiving user gets everyone's presence
|
||||
presence_updates = sync_presence(self, self.presence_receiving_user_id)
|
||||
self.assertEqual(len(presence_updates), 3)
|
||||
|
||||
# But that User One only get itself and User Two's presence
|
||||
presence_updates = sync_presence(self, self.other_user_one_id)
|
||||
self.assertEqual(len(presence_updates), 2)
|
||||
|
||||
found = False
|
||||
for update in presence_updates:
|
||||
if update.user_id == self.other_user_two_id:
|
||||
self.assertEqual(update.state, "online")
|
||||
self.assertEqual(update.status_msg, "user_two")
|
||||
found = True
|
||||
|
||||
self.assertTrue(found)
|
||||
|
||||
|
||||
def send_presence_update(
|
||||
testcase: TestCase,
|
||||
user_id: str,
|
||||
access_token: str,
|
||||
presence_state: str,
|
||||
status_message: Optional[str] = None,
|
||||
) -> JsonDict:
|
||||
# Build the presence body
|
||||
body = {"presence": presence_state}
|
||||
if status_message:
|
||||
body["status_msg"] = status_message
|
||||
|
||||
# Update the user's presence state
|
||||
channel = testcase.make_request(
|
||||
"PUT", "/presence/%s/status" % (user_id,), body, access_token=access_token
|
||||
)
|
||||
testcase.assertEqual(channel.code, 200)
|
||||
|
||||
return channel.json_body
|
||||
|
||||
|
||||
def sync_presence(
|
||||
testcase: TestCase,
|
||||
user_id: str,
|
||||
) -> List[UserPresenceState]:
|
||||
requester = create_requester(user_id)
|
||||
sync_config = generate_sync_config(requester.user.to_string())
|
||||
sync_result = testcase.get_success(
|
||||
testcase.sync_handler.wait_for_sync_for_user(requester, sync_config)
|
||||
)
|
||||
|
||||
return sync_result.presence
|
||||
@@ -37,7 +37,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
def test_wait_for_sync_for_user_auth_blocking(self):
|
||||
user_id1 = "@user1:test"
|
||||
user_id2 = "@user2:test"
|
||||
sync_config = self._generate_sync_config(user_id1)
|
||||
sync_config = generate_sync_config(user_id1)
|
||||
requester = create_requester(user_id1)
|
||||
|
||||
self.reactor.advance(100) # So we get not 0 time
|
||||
@@ -60,7 +60,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
|
||||
self.auth_blocking._hs_disabled = False
|
||||
|
||||
sync_config = self._generate_sync_config(user_id2)
|
||||
sync_config = generate_sync_config(user_id2)
|
||||
requester = create_requester(user_id2)
|
||||
|
||||
e = self.get_failure(
|
||||
@@ -69,11 +69,12 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
)
|
||||
self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
|
||||
|
||||
def _generate_sync_config(self, user_id):
|
||||
return SyncConfig(
|
||||
user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
|
||||
filter_collection=DEFAULT_FILTER_COLLECTION,
|
||||
is_guest=False,
|
||||
request_key="request_key",
|
||||
device_id="device_id",
|
||||
)
|
||||
|
||||
def generate_sync_config(user_id: str) -> SyncConfig:
|
||||
return SyncConfig(
|
||||
user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
|
||||
filter_collection=DEFAULT_FILTER_COLLECTION,
|
||||
is_guest=False,
|
||||
request_key="request_key",
|
||||
device_id="device_id",
|
||||
)
|
||||
|
||||
@@ -15,11 +15,17 @@
|
||||
from mock import Mock
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.presence import UserPresenceState
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
from synapse.rest.client.v1 import login, presence, room
|
||||
from synapse.types import create_requester
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
from tests.events.test_presence_router import (
|
||||
PresenceRouterTestModule,
|
||||
send_presence_update,
|
||||
sync_presence,
|
||||
)
|
||||
from tests.unittest import HomeserverTestCase, override_config
|
||||
|
||||
|
||||
class ModuleApiTestCase(HomeserverTestCase):
|
||||
@@ -27,6 +33,7 @@ class ModuleApiTestCase(HomeserverTestCase):
|
||||
admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
presence.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
@@ -205,3 +212,95 @@ class ModuleApiTestCase(HomeserverTestCase):
|
||||
)
|
||||
)
|
||||
self.assertFalse(is_in_public_rooms)
|
||||
|
||||
@override_config(
|
||||
{
|
||||
"presence": {
|
||||
"presence_router": {
|
||||
"module": "%s.%s"
|
||||
% (
|
||||
PresenceRouterTestModule.__module__,
|
||||
PresenceRouterTestModule.__name__,
|
||||
),
|
||||
"config": {
|
||||
"users_who_should_receive_all_presence": [
|
||||
"@presence_gobbler1:test",
|
||||
"@presence_gobbler2:test",
|
||||
]
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
def test_send_local_online_presence_to(self):
|
||||
"""Tests that send_local_presence_to_users sends local online presence to a set
|
||||
of specified local and remote users.
|
||||
"""
|
||||
self.sync_handler = self.hs.get_sync_handler()
|
||||
|
||||
# Create a user who will send presence updates
|
||||
self.other_user_id = self.register_user("other_user", "monkey")
|
||||
self.other_user_tok = self.login("other_user", "monkey")
|
||||
|
||||
# And another two users that will also send out presence updates, as well as receive
|
||||
# theirs and everyone else's
|
||||
self.presence_receiving_user_one_id = self.register_user(
|
||||
"presence_gobbler1", "monkey"
|
||||
)
|
||||
self.presence_receiving_user_one_tok = self.login("presence_gobbler1", "monkey")
|
||||
self.presence_receiving_user_two_id = self.register_user(
|
||||
"presence_gobbler2", "monkey"
|
||||
)
|
||||
self.presence_receiving_user_two_tok = self.login("presence_gobbler2", "monkey")
|
||||
|
||||
# Have all three users send some presence updates
|
||||
send_presence_update(
|
||||
self,
|
||||
self.other_user_id,
|
||||
self.other_user_tok,
|
||||
"online",
|
||||
"I'm online!",
|
||||
)
|
||||
send_presence_update(
|
||||
self,
|
||||
self.presence_receiving_user_one_id,
|
||||
self.presence_receiving_user_one_tok,
|
||||
"online",
|
||||
"I'm also online!",
|
||||
)
|
||||
send_presence_update(
|
||||
self,
|
||||
self.presence_receiving_user_two_id,
|
||||
self.presence_receiving_user_two_tok,
|
||||
"unavailable",
|
||||
"I'm in a meeting!",
|
||||
)
|
||||
|
||||
# Mark each presence-receiving user for receiving all user presence
|
||||
self.get_success(
|
||||
self.module_api.send_local_online_presence_to(
|
||||
[
|
||||
self.presence_receiving_user_one_id,
|
||||
self.presence_receiving_user_two_id,
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Perform a sync for each user
|
||||
|
||||
# The other user should only receive their own presence
|
||||
presence_updates = sync_presence(self, self.other_user_id)
|
||||
self.assertEqual(len(presence_updates), 1)
|
||||
|
||||
presence_update = presence_updates[0] # type: UserPresenceState
|
||||
self.assertEqual(presence_update.user_id, self.other_user_id)
|
||||
self.assertEqual(presence_update.state, "online")
|
||||
self.assertEqual(presence_update.status_msg, "I'm online!")
|
||||
|
||||
# Whereas both presence receiving users should receive everyone's presence updates
|
||||
presence_updates = sync_presence(self, self.presence_receiving_user_one_id)
|
||||
self.assertEqual(len(presence_updates), 3)
|
||||
presence_updates = sync_presence(self, self.presence_receiving_user_two_id)
|
||||
self.assertEqual(len(presence_updates), 3)
|
||||
|
||||
# TODO: Test sending to federated users
|
||||
|
||||
Reference in New Issue
Block a user