Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 43131b266d | |||
| f11ccf7d7d | |||
| 2d656e7b7f | |||
| 18f50fb85d | |||
| 847cc492d8 | |||
| e5b7f22953 | |||
| b9dedcaa65 | |||
| 0c49a14aec | |||
| 5a43256e93 | |||
| bf24bd797c | |||
| de669a2abd |
+5
-1
@@ -384,7 +384,11 @@ massive excess of outgoing federation requests (see `discussion
|
|||||||
indicate that your server is also issuing far more outgoing federation
|
indicate that your server is also issuing far more outgoing federation
|
||||||
requests than can be accounted for by your users' activity, this is a
|
requests than can be accounted for by your users' activity, this is a
|
||||||
likely cause. The misbehavior can be worked around by setting
|
likely cause. The misbehavior can be worked around by setting
|
||||||
``use_presence: false`` in the Synapse config file.
|
the following in the Synapse config file:
|
||||||
|
|
||||||
|
.. code-block:: yaml
|
||||||
|
presence:
|
||||||
|
enabled: false
|
||||||
|
|
||||||
People can't accept room invitations from me
|
People can't accept room invitations from me
|
||||||
--------------------------------------------
|
--------------------------------------------
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
Add a Synapse module for routing presence updates between users.
|
||||||
@@ -0,0 +1,156 @@
|
|||||||
|
# Presence Routing Module
|
||||||
|
|
||||||
|
Synapse supports configuring a module that can specify additional users
|
||||||
|
(local or remote) to should receive certain presence updates from local
|
||||||
|
users.
|
||||||
|
|
||||||
|
The presence routing module is implemented as a Python class, which will be imported by
|
||||||
|
the running Synapse.
|
||||||
|
|
||||||
|
## Python Presence Router Class
|
||||||
|
|
||||||
|
The Python class is instantiated with two objects:
|
||||||
|
|
||||||
|
* A configuration object of some type (see below).
|
||||||
|
* An instance of `synapse.module_api.ModuleApi`.
|
||||||
|
|
||||||
|
It then implements methods related to presence routing.
|
||||||
|
|
||||||
|
Note that one method of `ModuleApi` that may be useful is:
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
|
||||||
|
```
|
||||||
|
|
||||||
|
which can be given a list of local or remote MXIDs to broadcast known, online user
|
||||||
|
presence to (for those users that the receiving user is considered interested in).
|
||||||
|
It does not include state for users who are currently offline.
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
Below is an example implementation of a presence router class.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from typing import Dict, Iterable, Set, Union
|
||||||
|
from typing_extensions import Literal
|
||||||
|
from synapse.handlers.presence import UserPresenceState
|
||||||
|
from synapse.module_api import ModuleApi
|
||||||
|
class PresenceRouterConfig:
|
||||||
|
def __init__(self):
|
||||||
|
# Config options with their defaults
|
||||||
|
# A list of users to always send all user presence updates to
|
||||||
|
self.always_send_to_users = [] # type: List[str]
|
||||||
|
|
||||||
|
# A list of users to ignore presence updates for. Does not affect
|
||||||
|
# shared-room presence relationships
|
||||||
|
self.blacklisted_users = [] # type: List[str]
|
||||||
|
class ExamplePresenceRouter:
|
||||||
|
"""An example implementation of synapse.presence_router.PresenceRouter.
|
||||||
|
Supports routing all presence to a configured set of users, or a subset
|
||||||
|
of presence from certain users to members of certain rooms.
|
||||||
|
Args:
|
||||||
|
config: A configuration object.
|
||||||
|
module_api: An instance of Synapse's ModuleApi.
|
||||||
|
"""
|
||||||
|
def __init__(self, config: PresenceRouterConfig, module_api: ModuleApi):
|
||||||
|
self._config = config
|
||||||
|
self._module_api = module_api
|
||||||
|
@staticmethod
|
||||||
|
def parse_config(config_dict: dict) -> PresenceRouterConfig:
|
||||||
|
"""Parse a configuration dictionary from the homeserver config, do
|
||||||
|
some validation and return a typed PresenceRouterConfig.
|
||||||
|
Args:
|
||||||
|
config_dict: The configuration dictionary.
|
||||||
|
Returns:
|
||||||
|
A validated config object.
|
||||||
|
"""
|
||||||
|
# Initialise a typed config object
|
||||||
|
config = PresenceRouterConfig()
|
||||||
|
always_send_to_users = config_dict.get("always_send_to_users")
|
||||||
|
blacklisted_users = config_dict.get("blacklisted_users")
|
||||||
|
# Do some validation of config options... otherwise raise a
|
||||||
|
# synapse.config.ConfigError.
|
||||||
|
config.always_send_to_users = always_send_to_users
|
||||||
|
config.blacklisted_users = blacklisted_users
|
||||||
|
return config
|
||||||
|
async def get_users_for_states(
|
||||||
|
self,
|
||||||
|
state_updates: Iterable[UserPresenceState],
|
||||||
|
) -> Dict[str, Set[UserPresenceState]]:
|
||||||
|
"""Given an iterable of user presence updates, determine where each one
|
||||||
|
needs to go. Returned results will not affect presence updates that are
|
||||||
|
sent between users who share a room.
|
||||||
|
Args:
|
||||||
|
state_updates: An iterable of user presence state updates.
|
||||||
|
Returns:
|
||||||
|
A dictionary of user_id -> set of UserPresenceState that the user should
|
||||||
|
receive.
|
||||||
|
"""
|
||||||
|
destination_users = {} # type: Dict[str, Set[UserPresenceState]
|
||||||
|
# Ignore any updates for blacklisted users
|
||||||
|
desired_updates = set()
|
||||||
|
for update in state_updates:
|
||||||
|
if update.state_key not in self._config.blacklisted_users:
|
||||||
|
desired_updates.add(update)
|
||||||
|
# Send all presence updates to specific users
|
||||||
|
for user_id in self._config.always_send_to_users:
|
||||||
|
destination_users[user_id] = desired_updates
|
||||||
|
return destination_users
|
||||||
|
async def get_interested_users(self, user_id: str) -> Union[Set[str], Literal["ALL"]]:
|
||||||
|
"""
|
||||||
|
Retrieve a list of users that `user_id` is interested in receiving the
|
||||||
|
presence of. This will be in addition to those they share a room with.
|
||||||
|
Optionally, the literal str "ALL" can be returned to indicate that this user
|
||||||
|
should receive all incoming local and remote presence updates.
|
||||||
|
Note that this method will only be called for local users.
|
||||||
|
Args:
|
||||||
|
user_id: A user requesting presence updates.
|
||||||
|
Returns:
|
||||||
|
A set of user IDs to return additional presence updates for, or "ALL" to return
|
||||||
|
presence updates for all other users.
|
||||||
|
"""
|
||||||
|
if user_id in self._config.always_send_to_users:
|
||||||
|
return "ALL"
|
||||||
|
return set()
|
||||||
|
```
|
||||||
|
|
||||||
|
#### A note on `get_users_for_states` and `get_interested_users`
|
||||||
|
|
||||||
|
Both of these methods are effectively two different sides of the same coin. The logic
|
||||||
|
regarding which users should receive updates for other users should be the same
|
||||||
|
between them.
|
||||||
|
|
||||||
|
`get_users_for_states` is called when presence updates come in from either federation
|
||||||
|
or local users, and is used to either direct local presence to remote users, or to
|
||||||
|
wake up the sync streams of local users to collect remote presence.
|
||||||
|
|
||||||
|
In contrast, `get_interested_users` is used to determine the users that presence should
|
||||||
|
be fetched for when a local user is syncing. This presence in then retrieved, before
|
||||||
|
being fed through `get_users_for_states` once again, with only the syncing user's
|
||||||
|
routing information pulled from the resulting dictionary.
|
||||||
|
|
||||||
|
Thus, if one method is implemented, the other should be as well. Their routing logic
|
||||||
|
should line up as well, else you may run into unintended behaviour.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Once you've crafted your module and installed it into the same Python environment as
|
||||||
|
Synapse, amend your homeserver config file with the following.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
presence:
|
||||||
|
routing_module:
|
||||||
|
module: my_module.ExamplePresenceRouter
|
||||||
|
config:
|
||||||
|
# Any configuration options for your module. The below is an example.
|
||||||
|
# of setting options for ExamplePresenceRouter.
|
||||||
|
always_send_to_users: ["@presence_gobbler:example.org"]
|
||||||
|
blacklisted_users:
|
||||||
|
- "@alice:example.com"
|
||||||
|
- "@bob:example.com"
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
The contents of `config` will be passed as a Python dictionary to the static
|
||||||
|
`parse_config` method of your class. The object returned by this method will
|
||||||
|
then be passed to the `__init__` method of your module as `config`.
|
||||||
+21
-2
@@ -82,9 +82,28 @@ pid_file: DATADIR/homeserver.pid
|
|||||||
#
|
#
|
||||||
#soft_file_limit: 0
|
#soft_file_limit: 0
|
||||||
|
|
||||||
# Set to false to disable presence tracking on this homeserver.
|
# Presence tracking allows users to see the state (e.g online/offline)
|
||||||
|
# of other local and remote users.
|
||||||
#
|
#
|
||||||
#use_presence: false
|
presence:
|
||||||
|
# Uncomment to disable presence tracking on this homeserver. This option
|
||||||
|
# replaces the previous top-level 'use_presence' option.
|
||||||
|
#
|
||||||
|
#enabled: false
|
||||||
|
|
||||||
|
# Presence routers are third-party modules that can specify additional logic
|
||||||
|
# to where presence updates from users are routed.
|
||||||
|
#
|
||||||
|
presence_router:
|
||||||
|
# The custom module's class. Uncomment to use a custom presence router module.
|
||||||
|
#
|
||||||
|
#module: "my_custom_router.PresenceRouter"
|
||||||
|
|
||||||
|
# Configuration options of the custom module. Refer to your module's
|
||||||
|
# documentation for available options.
|
||||||
|
#
|
||||||
|
#config:
|
||||||
|
# example_option: 'something'
|
||||||
|
|
||||||
# Whether to require authentication to retrieve profile data (avatars,
|
# Whether to require authentication to retrieve profile data (avatars,
|
||||||
# display names) of other users through the client API. Defaults to
|
# display names) of other users through the client API. Defaults to
|
||||||
|
|||||||
@@ -282,6 +282,7 @@ class GenericWorkerPresence(BasePresenceHandler):
|
|||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
|
self.presence_router = hs.get_presence_router()
|
||||||
self._presence_enabled = hs.config.use_presence
|
self._presence_enabled = hs.config.use_presence
|
||||||
|
|
||||||
# The number of ongoing syncs on this process, by user id.
|
# The number of ongoing syncs on this process, by user id.
|
||||||
@@ -394,7 +395,7 @@ class GenericWorkerPresence(BasePresenceHandler):
|
|||||||
return _user_syncing()
|
return _user_syncing()
|
||||||
|
|
||||||
async def notify_from_replication(self, states, stream_id):
|
async def notify_from_replication(self, states, stream_id):
|
||||||
parties = await get_interested_parties(self.store, states)
|
parties = await get_interested_parties(self.store, self.presence_router, states)
|
||||||
room_ids_to_states, users_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import yaml
|
|||||||
from netaddr import AddrFormatError, IPNetwork, IPSet
|
from netaddr import AddrFormatError, IPNetwork, IPSet
|
||||||
|
|
||||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||||
|
from synapse.util.module_loader import load_module
|
||||||
from synapse.util.stringutils import parse_and_validate_server_name
|
from synapse.util.stringutils import parse_and_validate_server_name
|
||||||
|
|
||||||
from ._base import Config, ConfigError
|
from ._base import Config, ConfigError
|
||||||
@@ -238,7 +239,20 @@ class ServerConfig(Config):
|
|||||||
self.public_baseurl = config.get("public_baseurl")
|
self.public_baseurl = config.get("public_baseurl")
|
||||||
|
|
||||||
# Whether to enable user presence.
|
# Whether to enable user presence.
|
||||||
self.use_presence = config.get("use_presence", True)
|
presence_config = config.get("presence") or {}
|
||||||
|
self.use_presence = presence_config.get("enabled")
|
||||||
|
if self.use_presence is None:
|
||||||
|
self.use_presence = config.get("use_presence", True)
|
||||||
|
|
||||||
|
# Custom presence router module
|
||||||
|
self.presence_router_module_class = None
|
||||||
|
self.presence_router_config = None
|
||||||
|
presence_router_config = presence_config.get("presence_router")
|
||||||
|
if presence_router_config:
|
||||||
|
(
|
||||||
|
self.presence_router_module_class,
|
||||||
|
self.presence_router_config,
|
||||||
|
) = load_module(presence_router_config, ("presence", "presence_router"))
|
||||||
|
|
||||||
# Whether to update the user directory or not. This should be set to
|
# Whether to update the user directory or not. This should be set to
|
||||||
# false only if we are updating the user directory in a worker
|
# false only if we are updating the user directory in a worker
|
||||||
@@ -834,9 +848,28 @@ class ServerConfig(Config):
|
|||||||
#
|
#
|
||||||
#soft_file_limit: 0
|
#soft_file_limit: 0
|
||||||
|
|
||||||
# Set to false to disable presence tracking on this homeserver.
|
# Presence tracking allows users to see the state (e.g online/offline)
|
||||||
|
# of other local and remote users.
|
||||||
#
|
#
|
||||||
#use_presence: false
|
presence:
|
||||||
|
# Uncomment to disable presence tracking on this homeserver. This option
|
||||||
|
# replaces the previous top-level 'use_presence' option.
|
||||||
|
#
|
||||||
|
#enabled: false
|
||||||
|
|
||||||
|
# Presence routers are third-party modules that can specify additional logic
|
||||||
|
# to where presence updates from users are routed.
|
||||||
|
#
|
||||||
|
presence_router:
|
||||||
|
# The custom module's class. Uncomment to use a custom presence router module.
|
||||||
|
#
|
||||||
|
#module: "my_custom_router.PresenceRouter"
|
||||||
|
|
||||||
|
# Configuration options of the custom module. Refer to your module's
|
||||||
|
# documentation for available options.
|
||||||
|
#
|
||||||
|
#config:
|
||||||
|
# example_option: 'something'
|
||||||
|
|
||||||
# Whether to require authentication to retrieve profile data (avatars,
|
# Whether to require authentication to retrieve profile data (avatars,
|
||||||
# display names) of other users through the client API. Defaults to
|
# display names) of other users through the client API. Defaults to
|
||||||
|
|||||||
@@ -0,0 +1,96 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING, Dict, Iterable, Set, Union
|
||||||
|
|
||||||
|
from typing_extensions import Literal
|
||||||
|
|
||||||
|
from synapse.api.presence import UserPresenceState
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceRouter:
|
||||||
|
"""
|
||||||
|
A module that the homeserver will call upon to help route user presence updates to
|
||||||
|
additional destinations. If a custom presence router is configured, calls will be
|
||||||
|
passed to that instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
self.custom_presence_router = None
|
||||||
|
|
||||||
|
# Check whether a custom presence router module has been configured
|
||||||
|
if hs.config.presence_router_module_class:
|
||||||
|
# Initialise the module
|
||||||
|
self.custom_presence_router = hs.config.presence_router_module_class(
|
||||||
|
config=hs.config.presence_router_config, module_api=hs.get_module_api()
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_users_for_states(
|
||||||
|
self,
|
||||||
|
state_updates: Iterable[UserPresenceState],
|
||||||
|
) -> Dict[str, Set[UserPresenceState]]:
|
||||||
|
"""
|
||||||
|
Given an iterable of user presence updates, determine where each one
|
||||||
|
needs to go.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
state_updates: An iterable of user presence state updates.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary of user_id -> set of UserPresenceState, indicating which
|
||||||
|
presence updates each user should receive.
|
||||||
|
"""
|
||||||
|
if self.custom_presence_router is not None and hasattr(
|
||||||
|
self.custom_presence_router, "get_users_for_states"
|
||||||
|
):
|
||||||
|
# Ask the custom module
|
||||||
|
return await self.custom_presence_router.get_users_for_states(
|
||||||
|
state_updates=state_updates
|
||||||
|
)
|
||||||
|
|
||||||
|
# Don't include any extra destinations for presence updates
|
||||||
|
return {}
|
||||||
|
|
||||||
|
async def get_interested_users(
|
||||||
|
self, user_id: str
|
||||||
|
) -> Union[Set[str], Literal["ALL"]]:
|
||||||
|
"""
|
||||||
|
Retrieve a list of users that the provided user is interested in receiving the presence
|
||||||
|
of. Optionally, the str "ALL" can be returned to mean that this user should receive all
|
||||||
|
local and remote incoming presence.
|
||||||
|
|
||||||
|
Note that this method will only be called for local users.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: A user requesting presence updates.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A set of user IDs to return presence updates for, or ALL to return all
|
||||||
|
known updates.
|
||||||
|
"""
|
||||||
|
if self.custom_presence_router is not None and hasattr(
|
||||||
|
self.custom_presence_router, "get_interested_users"
|
||||||
|
):
|
||||||
|
# Ask the custom module for interested users
|
||||||
|
return await self.custom_presence_router.get_interested_users(
|
||||||
|
user_id=user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# A custom presence router is not defined, or doesn't implement any relevant function.
|
||||||
|
# Don't report any additional interested users.
|
||||||
|
return set()
|
||||||
@@ -14,7 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
||||||
|
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
|
||||||
@@ -43,6 +43,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
|||||||
from synapse.types import ReadReceipt, RoomStreamToken
|
from synapse.types import ReadReceipt, RoomStreamToken
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure, measure_func
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
sent_pdus_destination_dist_count = Counter(
|
sent_pdus_destination_dist_count = Counter(
|
||||||
@@ -66,7 +69,7 @@ CATCH_UP_STARTUP_INTERVAL_SEC = 5
|
|||||||
|
|
||||||
|
|
||||||
class FederationSender:
|
class FederationSender:
|
||||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
|
|
||||||
@@ -76,6 +79,7 @@ class FederationSender:
|
|||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
|
self._presence_router = hs.get_presence_router()
|
||||||
self._transaction_manager = TransactionManager(hs)
|
self._transaction_manager = TransactionManager(hs)
|
||||||
|
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
@@ -498,7 +502,9 @@ class FederationSender:
|
|||||||
"""Given a list of states populate self.pending_presence_by_dest and
|
"""Given a list of states populate self.pending_presence_by_dest and
|
||||||
poke to send a new transaction to each destination
|
poke to send a new transaction to each destination
|
||||||
"""
|
"""
|
||||||
hosts_and_states = await get_interested_remotes(self.store, states, self.state)
|
hosts_and_states = await get_interested_remotes(
|
||||||
|
self.store, self._presence_router, states, self.state
|
||||||
|
)
|
||||||
|
|
||||||
for destinations, states in hosts_and_states:
|
for destinations, states in hosts_and_states:
|
||||||
for destination in destinations:
|
for destination in destinations:
|
||||||
|
|||||||
+214
-45
@@ -25,15 +25,26 @@ The methods that define policy are:
|
|||||||
import abc
|
import abc
|
||||||
import logging
|
import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Dict,
|
||||||
|
FrozenSet,
|
||||||
|
Iterable,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Set,
|
||||||
|
Tuple,
|
||||||
|
Union,
|
||||||
|
)
|
||||||
|
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
from typing_extensions import ContextManager
|
from typing_extensions import ContextManager, Literal
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.constants import EventTypes, Membership, PresenceState
|
from synapse.api.constants import EventTypes, Membership, PresenceState
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
|
from synapse.events.presence_router import PresenceRouter
|
||||||
from synapse.logging.context import run_in_background
|
from synapse.logging.context import run_in_background
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
@@ -42,7 +53,7 @@ from synapse.state import StateHandler
|
|||||||
from synapse.storage.databases.main import DataStore
|
from synapse.storage.databases.main import DataStore
|
||||||
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
|
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import _CacheContext, cached
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.util.wheel_timer import WheelTimer
|
from synapse.util.wheel_timer import WheelTimer
|
||||||
|
|
||||||
@@ -207,6 +218,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.federation = hs.get_federation_sender()
|
self.federation = hs.get_federation_sender()
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
|
self.presence_router = hs.get_presence_router()
|
||||||
self._presence_enabled = hs.config.use_presence
|
self._presence_enabled = hs.config.use_presence
|
||||||
|
|
||||||
federation_registry = hs.get_federation_registry()
|
federation_registry = hs.get_federation_registry()
|
||||||
@@ -651,7 +663,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
"""
|
"""
|
||||||
stream_id, max_token = await self.store.update_presence(states)
|
stream_id, max_token = await self.store.update_presence(states)
|
||||||
|
|
||||||
parties = await get_interested_parties(self.store, states)
|
parties = await get_interested_parties(self.store, self.presence_router, states)
|
||||||
room_ids_to_states, users_to_states = parties
|
room_ids_to_states, users_to_states = parties
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
@@ -1033,7 +1045,12 @@ class PresenceEventSource:
|
|||||||
#
|
#
|
||||||
# Presence -> Notifier -> PresenceEventSource -> Presence
|
# Presence -> Notifier -> PresenceEventSource -> Presence
|
||||||
#
|
#
|
||||||
|
# Same with get_module_api, get_presence_router
|
||||||
|
#
|
||||||
|
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
|
||||||
self.get_presence_handler = hs.get_presence_handler
|
self.get_presence_handler = hs.get_presence_handler
|
||||||
|
self.get_module_api = hs.get_module_api
|
||||||
|
self.get_presence_router = hs.get_presence_router
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
@@ -1047,7 +1064,7 @@ class PresenceEventSource:
|
|||||||
include_offline=True,
|
include_offline=True,
|
||||||
explicit_room_id=None,
|
explicit_room_id=None,
|
||||||
**kwargs
|
**kwargs
|
||||||
):
|
) -> Tuple[List[UserPresenceState], int]:
|
||||||
# The process for getting presence events are:
|
# The process for getting presence events are:
|
||||||
# 1. Get the rooms the user is in.
|
# 1. Get the rooms the user is in.
|
||||||
# 2. Get the list of user in the rooms.
|
# 2. Get the list of user in the rooms.
|
||||||
@@ -1060,7 +1077,17 @@ class PresenceEventSource:
|
|||||||
# We don't try and limit the presence updates by the current token, as
|
# We don't try and limit the presence updates by the current token, as
|
||||||
# sending down the rare duplicate is not a concern.
|
# sending down the rare duplicate is not a concern.
|
||||||
|
|
||||||
|
user_id = user.to_string()
|
||||||
|
stream_change_cache = self.store.presence_stream_cache
|
||||||
|
|
||||||
with Measure(self.clock, "presence.get_new_events"):
|
with Measure(self.clock, "presence.get_new_events"):
|
||||||
|
if user_id in self.get_module_api().send_full_presence_to_local_users:
|
||||||
|
# This user has been specified by a module to receive all current, online
|
||||||
|
# user presence. Removing from_key and setting include_offline to false
|
||||||
|
# will do effectively this.
|
||||||
|
from_key = None
|
||||||
|
include_offline = False
|
||||||
|
|
||||||
if from_key is not None:
|
if from_key is not None:
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
|
|
||||||
@@ -1083,59 +1110,186 @@ class PresenceEventSource:
|
|||||||
# doesn't return. C.f. #5503.
|
# doesn't return. C.f. #5503.
|
||||||
return [], max_token
|
return [], max_token
|
||||||
|
|
||||||
presence = self.get_presence_handler()
|
# Figure out which other users this user should receive updates for
|
||||||
stream_change_cache = self.store.presence_stream_cache
|
|
||||||
|
|
||||||
users_interested_in = await self._get_interested_in(user, explicit_room_id)
|
users_interested_in = await self._get_interested_in(user, explicit_room_id)
|
||||||
|
|
||||||
user_ids_changed = set() # type: Collection[str]
|
# We have a set of users that we're interested in the presence of. We want to
|
||||||
changed = None
|
# cross-reference that with the users that have actually changed their presence.
|
||||||
if from_key:
|
|
||||||
changed = stream_change_cache.get_all_entities_changed(from_key)
|
|
||||||
|
|
||||||
if changed is not None and len(changed) < 500:
|
# Check whether this user should see all user updates
|
||||||
assert isinstance(user_ids_changed, set)
|
|
||||||
|
|
||||||
# For small deltas, its quicker to get all changes and then
|
|
||||||
# work out if we share a room or they're in our presence list
|
|
||||||
get_updates_counter.labels("stream").inc()
|
|
||||||
for other_user_id in changed:
|
|
||||||
if other_user_id in users_interested_in:
|
|
||||||
user_ids_changed.add(other_user_id)
|
|
||||||
else:
|
|
||||||
# Too many possible updates. Find all users we can see and check
|
|
||||||
# if any of them have changed.
|
|
||||||
get_updates_counter.labels("full").inc()
|
|
||||||
|
|
||||||
|
if users_interested_in == "ALL":
|
||||||
if from_key:
|
if from_key:
|
||||||
user_ids_changed = stream_change_cache.get_entities_changed(
|
# We need to return all new presence updates to this user, regardless of whether
|
||||||
users_interested_in, from_key
|
# they share a room with that user
|
||||||
|
return await self._filter_all_presence_updates_for_user(
|
||||||
|
user_id, max_token, from_key, include_offline
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
user_ids_changed = users_interested_in
|
# This user should receive all user presence, and hasn't provided a from_key.
|
||||||
|
# Send all currently known user presence states.
|
||||||
|
users_to_state = await self.store.get_presence_for_all_users(
|
||||||
|
include_offline=include_offline
|
||||||
|
)
|
||||||
|
|
||||||
updates = await presence.current_state_for_users(user_ids_changed)
|
return list(users_to_state.values()), max_token
|
||||||
|
|
||||||
if include_offline:
|
# The set of users that we're interested in and that have had a presence update.
|
||||||
return (list(updates.values()), max_token)
|
# We'll actually pull the presence updates for these users at the end.
|
||||||
else:
|
interested_and_updated_users = (
|
||||||
return (
|
set()
|
||||||
[s for s in updates.values() if s.state != PresenceState.OFFLINE],
|
) # type: Union[Set[str], FrozenSet[str]]
|
||||||
max_token,
|
|
||||||
|
if from_key:
|
||||||
|
# First get all users that have had a presence update
|
||||||
|
updated_users = stream_change_cache.get_all_entities_changed(from_key)
|
||||||
|
|
||||||
|
# Cross-reference users we're interested in with those that have had updates.
|
||||||
|
# Use a slightly-optimised method for processing smaller sets of updates.
|
||||||
|
if updated_users is not None and len(updated_users) < 500:
|
||||||
|
# For small deltas, it's quicker to get all changes and then
|
||||||
|
# cross-reference with the users we're interested in
|
||||||
|
get_updates_counter.labels("stream").inc()
|
||||||
|
for other_user_id in updated_users:
|
||||||
|
if other_user_id in users_interested_in:
|
||||||
|
# mypy thinks this variable could be a FrozenSet as it's possibly set
|
||||||
|
# to one in the `get_entities_changed` call below, and `add()` is not
|
||||||
|
# method on a FrozenSet. That doesn't affect us here though, as
|
||||||
|
# `interested_and_updated_users` is clearly a set() above.
|
||||||
|
interested_and_updated_users.add(other_user_id) # type: ignore
|
||||||
|
else:
|
||||||
|
# Too many possible updates. Find all users we can see and check
|
||||||
|
# if any of them have changed.
|
||||||
|
get_updates_counter.labels("full").inc()
|
||||||
|
|
||||||
|
interested_and_updated_users = (
|
||||||
|
stream_change_cache.get_entities_changed(
|
||||||
|
users_interested_in, from_key
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No from_key has been specified. Return the presence for all users
|
||||||
|
# this user is interested in
|
||||||
|
interested_and_updated_users = users_interested_in
|
||||||
|
|
||||||
|
# Retrieve the current presence state for each user
|
||||||
|
users_to_state = await self.get_presence_handler().current_state_for_users(
|
||||||
|
interested_and_updated_users
|
||||||
)
|
)
|
||||||
|
presence_updates = list(users_to_state.values())
|
||||||
|
|
||||||
|
# Remove the user from the list of users to receive all presence
|
||||||
|
if user_id in self.get_module_api().send_full_presence_to_local_users:
|
||||||
|
self.get_module_api().send_full_presence_to_local_users.remove(user_id)
|
||||||
|
|
||||||
|
if not include_offline:
|
||||||
|
# Filter out offline presence states
|
||||||
|
presence_updates = self._filter_offline_presence_state(presence_updates)
|
||||||
|
|
||||||
|
return presence_updates, max_token
|
||||||
|
|
||||||
|
async def _filter_all_presence_updates_for_user(
|
||||||
|
self,
|
||||||
|
user_id: str,
|
||||||
|
max_token: int,
|
||||||
|
from_key: int,
|
||||||
|
include_offline: bool,
|
||||||
|
) -> Tuple[List[UserPresenceState], int]:
|
||||||
|
# Only return updates since the last sync
|
||||||
|
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
|
||||||
|
from_key
|
||||||
|
)
|
||||||
|
if not updated_users:
|
||||||
|
updated_users = []
|
||||||
|
|
||||||
|
# Get the actual presence update for each change
|
||||||
|
users_to_state = await self.get_presence_handler().current_state_for_users(
|
||||||
|
updated_users
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
|
||||||
|
# module for information on a number of users when we then only take the info
|
||||||
|
# for a single user
|
||||||
|
|
||||||
|
# Filter through the presence router
|
||||||
|
users_to_state_set = await self.get_presence_router().get_users_for_states(
|
||||||
|
users_to_state.values()
|
||||||
|
)
|
||||||
|
|
||||||
|
# We only want the mapping for the syncing user
|
||||||
|
presence_updates = list(users_to_state_set[user_id])
|
||||||
|
|
||||||
|
# Remove the user from the list of users to receive all presence
|
||||||
|
if user_id in self.get_module_api().send_full_presence_to_local_users:
|
||||||
|
self.get_module_api().send_full_presence_to_local_users.remove(user_id)
|
||||||
|
|
||||||
|
if not include_offline:
|
||||||
|
# Filter out offline states
|
||||||
|
presence_updates = self._filter_offline_presence_state(presence_updates)
|
||||||
|
|
||||||
|
# Return presence updates for all users since the last sync
|
||||||
|
return presence_updates, max_token
|
||||||
|
|
||||||
|
def _filter_offline_presence_state(
|
||||||
|
self, presence_updates: Iterable[UserPresenceState]
|
||||||
|
) -> List[UserPresenceState]:
|
||||||
|
"""Given an iterable containing user presence updates, return a list with any offline
|
||||||
|
presence states removed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
presence_updates: Presence states to filter
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new list with any offline presence states removed.
|
||||||
|
"""
|
||||||
|
return [
|
||||||
|
update
|
||||||
|
for update in presence_updates
|
||||||
|
if update.state != PresenceState.OFFLINE
|
||||||
|
]
|
||||||
|
|
||||||
def get_current_key(self):
|
def get_current_key(self):
|
||||||
return self.store.get_current_presence_token()
|
return self.store.get_current_presence_token()
|
||||||
|
|
||||||
@cached(num_args=2, cache_context=True)
|
@cached(num_args=2, cache_context=True)
|
||||||
async def _get_interested_in(self, user, explicit_room_id, cache_context):
|
async def _get_interested_in(
|
||||||
|
self,
|
||||||
|
user: UserID,
|
||||||
|
explicit_room_id: Optional[str] = None,
|
||||||
|
cache_context: Optional[_CacheContext] = None,
|
||||||
|
) -> Union[Set[str], Literal["ALL"]]:
|
||||||
"""Returns the set of users that the given user should see presence
|
"""Returns the set of users that the given user should see presence
|
||||||
updates for
|
updates for.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user: The user to retrieve presence updates for.
|
||||||
|
explicit_room_id: A
|
||||||
"""
|
"""
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
users_interested_in = set()
|
users_interested_in = set()
|
||||||
users_interested_in.add(user_id) # So that we receive our own presence
|
users_interested_in.add(user_id) # So that we receive our own presence
|
||||||
|
|
||||||
|
# cache_context isn't likely to ever be None due to the @cached decorator,
|
||||||
|
# but we can't have a non-optional argument after the optional argument
|
||||||
|
# explicit_room_id either. Assert cache_context is not None so we can use it
|
||||||
|
# without mypy complaining.
|
||||||
|
assert cache_context
|
||||||
|
|
||||||
|
# Check with the presence router whether we should poll additional users for
|
||||||
|
# their presence information
|
||||||
|
additional_users = await self.get_presence_router().get_interested_users(
|
||||||
|
user.to_string()
|
||||||
|
)
|
||||||
|
if additional_users == "ALL":
|
||||||
|
# If the module requested that this user see the presence updates of *all*
|
||||||
|
# users, then simply return that instead of calculating what rooms this
|
||||||
|
# user shares
|
||||||
|
return "ALL"
|
||||||
|
|
||||||
|
# Add the additional users from the router
|
||||||
|
users_interested_in.update(additional_users)
|
||||||
|
|
||||||
|
# Find the users who share a room with this user
|
||||||
users_who_share_room = await self.store.get_users_who_share_room_with_user(
|
users_who_share_room = await self.store.get_users_who_share_room_with_user(
|
||||||
user_id, on_invalidate=cache_context.invalidate
|
user_id, on_invalidate=cache_context.invalidate
|
||||||
)
|
)
|
||||||
@@ -1306,14 +1460,15 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
|||||||
|
|
||||||
|
|
||||||
async def get_interested_parties(
|
async def get_interested_parties(
|
||||||
store: DataStore, states: List[UserPresenceState]
|
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
|
||||||
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
|
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
|
||||||
"""Given a list of states return which entities (rooms, users)
|
"""Given a list of states return which entities (rooms, users)
|
||||||
are interested in the given states.
|
are interested in the given states.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
store
|
store: The homeserver's data store.
|
||||||
states
|
presence_router: A module for augmenting the destinations for presence updates.
|
||||||
|
states: A list of incoming user presence updates.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A 2-tuple of `(room_ids_to_states, users_to_states)`,
|
A 2-tuple of `(room_ids_to_states, users_to_states)`,
|
||||||
@@ -1329,11 +1484,22 @@ async def get_interested_parties(
|
|||||||
# Always notify self
|
# Always notify self
|
||||||
users_to_states.setdefault(state.user_id, []).append(state)
|
users_to_states.setdefault(state.user_id, []).append(state)
|
||||||
|
|
||||||
|
# Ask a presence routing module for any additional parties if one
|
||||||
|
# is loaded.
|
||||||
|
router_users_to_states = await presence_router.get_users_for_states(states)
|
||||||
|
|
||||||
|
# Update the dictionaries with additional destinations and state to send
|
||||||
|
for user_id, user_states in router_users_to_states.items():
|
||||||
|
users_to_states.setdefault(user_id, []).extend(user_states)
|
||||||
|
|
||||||
return room_ids_to_states, users_to_states
|
return room_ids_to_states, users_to_states
|
||||||
|
|
||||||
|
|
||||||
async def get_interested_remotes(
|
async def get_interested_remotes(
|
||||||
store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
|
store: DataStore,
|
||||||
|
presence_router: PresenceRouter,
|
||||||
|
states: List[UserPresenceState],
|
||||||
|
state_handler: StateHandler,
|
||||||
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
|
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
|
||||||
"""Given a list of presence states figure out which remote servers
|
"""Given a list of presence states figure out which remote servers
|
||||||
should be sent which.
|
should be sent which.
|
||||||
@@ -1341,9 +1507,10 @@ async def get_interested_remotes(
|
|||||||
All the presence states should be for local users only.
|
All the presence states should be for local users only.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
store
|
store: The homeserver's data store.
|
||||||
states
|
presence_router: A module for augmenting the destinations for presence updates.
|
||||||
state_handler
|
states: A list of incoming user presence updates.
|
||||||
|
state_handler:
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A list of 2-tuples of destinations and states, where for
|
A list of 2-tuples of destinations and states, where for
|
||||||
@@ -1355,7 +1522,9 @@ async def get_interested_remotes(
|
|||||||
# First we look up the rooms each user is in (as well as any explicit
|
# First we look up the rooms each user is in (as well as any explicit
|
||||||
# subscriptions), then for each distinct room we look up the remote
|
# subscriptions), then for each distinct room we look up the remote
|
||||||
# hosts in those rooms.
|
# hosts in those rooms.
|
||||||
room_ids_to_states, users_to_states = await get_interested_parties(store, states)
|
room_ids_to_states, users_to_states = await get_interested_parties(
|
||||||
|
store, presence_router, states
|
||||||
|
)
|
||||||
|
|
||||||
for room_id, states in room_ids_to_states.items():
|
for room_id, states in room_ids_to_states.items():
|
||||||
hosts = await state_handler.get_current_hosts_in_room(room_id)
|
hosts = await state_handler.get_current_hosts_in_room(room_id)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Generator, Iterable, Optional, Tuple
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
from synapse.federation.sender import FederationSender
|
||||||
from synapse.http.client import SimpleHttpClient
|
from synapse.http.client import SimpleHttpClient
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
@@ -50,11 +51,17 @@ class ModuleApi:
|
|||||||
self._auth = hs.get_auth()
|
self._auth = hs.get_auth()
|
||||||
self._auth_handler = auth_handler
|
self._auth_handler = auth_handler
|
||||||
self._server_name = hs.hostname
|
self._server_name = hs.hostname
|
||||||
|
self._presence_stream = hs.get_event_sources().sources["presence"]
|
||||||
|
|
||||||
# We expose these as properties below in order to attach a helpful docstring.
|
# We expose these as properties below in order to attach a helpful docstring.
|
||||||
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
||||||
self._public_room_list_manager = PublicRoomListManager(hs)
|
self._public_room_list_manager = PublicRoomListManager(hs)
|
||||||
|
|
||||||
|
# The next time these users sync, they will receive the current presence
|
||||||
|
# state of all local users. Users are added by send_local_online_presence_to,
|
||||||
|
# and removed after a successful sync.
|
||||||
|
self.send_full_presence_to_local_users = set()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def http_client(self):
|
def http_client(self):
|
||||||
"""Allows making outbound HTTP requests to remote resources.
|
"""Allows making outbound HTTP requests to remote resources.
|
||||||
@@ -385,6 +392,34 @@ class ModuleApi:
|
|||||||
|
|
||||||
return event
|
return event
|
||||||
|
|
||||||
|
async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
|
||||||
|
"""
|
||||||
|
Forces the equivalent of a presence initial_sync for a set of local or remote
|
||||||
|
users. The users will receive presence for all currently online users that they
|
||||||
|
are considered interested in.
|
||||||
|
|
||||||
|
Updates to remote users will be sent immediately, whereas local users will receive
|
||||||
|
them on their next sync attempt.
|
||||||
|
"""
|
||||||
|
for user in users:
|
||||||
|
if self._hs.is_mine_id(user):
|
||||||
|
# Modify SyncHandler._generate_sync_entry_for_presence to call
|
||||||
|
# presence_source.get_new_events with an empty `from_key` if
|
||||||
|
# that user's ID were in a list modified by ModuleApi somewhere.
|
||||||
|
# That user would then get all presence state on next incremental sync.
|
||||||
|
|
||||||
|
# Force a presence initial_sync for this user next time
|
||||||
|
self.send_full_presence_to_local_users.add(user)
|
||||||
|
else:
|
||||||
|
# Retrieve presence state for currently online users that this user
|
||||||
|
# is considered interested in
|
||||||
|
presence_events = await self._presence_stream.get_new_events(
|
||||||
|
user, from_key=None, include_offline=False
|
||||||
|
)
|
||||||
|
await FederationSender.send_presence(
|
||||||
|
[ev.state for ev in presence_events]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class PublicRoomListManager:
|
class PublicRoomListManager:
|
||||||
"""Contains methods for adding to, removing from and querying whether a room
|
"""Contains methods for adding to, removing from and querying whether a room
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ from synapse.crypto import context_factory
|
|||||||
from synapse.crypto.context_factory import RegularPolicyForHTTPS
|
from synapse.crypto.context_factory import RegularPolicyForHTTPS
|
||||||
from synapse.crypto.keyring import Keyring
|
from synapse.crypto.keyring import Keyring
|
||||||
from synapse.events.builder import EventBuilderFactory
|
from synapse.events.builder import EventBuilderFactory
|
||||||
|
from synapse.events.presence_router import PresenceRouter
|
||||||
from synapse.events.spamcheck import SpamChecker
|
from synapse.events.spamcheck import SpamChecker
|
||||||
from synapse.events.third_party_rules import ThirdPartyEventRules
|
from synapse.events.third_party_rules import ThirdPartyEventRules
|
||||||
from synapse.events.utils import EventClientSerializer
|
from synapse.events.utils import EventClientSerializer
|
||||||
@@ -423,6 +424,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
else:
|
else:
|
||||||
raise Exception("Workers cannot write typing")
|
raise Exception("Workers cannot write typing")
|
||||||
|
|
||||||
|
@cache_in_self
|
||||||
|
def get_presence_router(self) -> PresenceRouter:
|
||||||
|
return PresenceRouter(self)
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_typing_handler(self) -> FollowerTypingHandler:
|
def get_typing_handler(self) -> FollowerTypingHandler:
|
||||||
if self.config.worker.writers.typing == self.get_instance_name():
|
if self.config.worker.writers.typing == self.get_instance_name():
|
||||||
|
|||||||
@@ -1906,6 +1906,7 @@ class DatabasePool:
|
|||||||
retcols: Iterable[str],
|
retcols: Iterable[str],
|
||||||
filters: Optional[Dict[str, Any]] = None,
|
filters: Optional[Dict[str, Any]] = None,
|
||||||
keyvalues: Optional[Dict[str, Any]] = None,
|
keyvalues: Optional[Dict[str, Any]] = None,
|
||||||
|
exclude_keyvalues: Optional[Dict[str, Any]] = None,
|
||||||
order_direction: str = "ASC",
|
order_direction: str = "ASC",
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
@@ -1929,7 +1930,10 @@ class DatabasePool:
|
|||||||
apply a WHERE ? LIKE ? clause.
|
apply a WHERE ? LIKE ? clause.
|
||||||
keyvalues:
|
keyvalues:
|
||||||
column names and values to select the rows with, or None to not
|
column names and values to select the rows with, or None to not
|
||||||
apply a WHERE clause.
|
apply a WHERE key = value clause.
|
||||||
|
exclude_keyvalues:
|
||||||
|
column names and values to exclude rows with, or None to not
|
||||||
|
apply a WHERE key != value clause.
|
||||||
order_direction: Whether the results should be ordered "ASC" or "DESC".
|
order_direction: Whether the results should be ordered "ASC" or "DESC".
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -1938,7 +1942,7 @@ class DatabasePool:
|
|||||||
if order_direction not in ["ASC", "DESC"]:
|
if order_direction not in ["ASC", "DESC"]:
|
||||||
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
|
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
|
||||||
|
|
||||||
where_clause = "WHERE " if filters or keyvalues else ""
|
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
|
||||||
arg_list = [] # type: List[Any]
|
arg_list = [] # type: List[Any]
|
||||||
if filters:
|
if filters:
|
||||||
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
|
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
|
||||||
@@ -1947,6 +1951,9 @@ class DatabasePool:
|
|||||||
if keyvalues:
|
if keyvalues:
|
||||||
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
|
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
|
||||||
arg_list += list(keyvalues.values())
|
arg_list += list(keyvalues.values())
|
||||||
|
if exclude_keyvalues:
|
||||||
|
where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
|
||||||
|
arg_list += list(exclude_keyvalues.values())
|
||||||
|
|
||||||
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
|
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
|
||||||
", ".join(retcols),
|
", ".join(retcols),
|
||||||
|
|||||||
@@ -13,7 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from typing import List, Tuple
|
from typing import Dict, List, Tuple
|
||||||
|
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||||
@@ -157,5 +157,64 @@ class PresenceStore(SQLBaseStore):
|
|||||||
|
|
||||||
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
||||||
|
|
||||||
|
async def get_presence_for_all_users(
|
||||||
|
self,
|
||||||
|
include_offline: bool = True,
|
||||||
|
) -> Dict[str, UserPresenceState]:
|
||||||
|
"""Retrieve the current presence state for all users.
|
||||||
|
|
||||||
|
Note that the presence_stream table is culled frequently, so it should only
|
||||||
|
contain the latest presence state for each user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
include_offline: Whether to include offline presence states
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dict of user IDs to their current UserPresenceState.
|
||||||
|
"""
|
||||||
|
users_to_state = {}
|
||||||
|
|
||||||
|
exclude_keyvalues = {}
|
||||||
|
if not include_offline:
|
||||||
|
# Exclude offline presence state
|
||||||
|
exclude_keyvalues = {"state": "offline"}
|
||||||
|
|
||||||
|
# This may be a very heavy database query.
|
||||||
|
# We paginate in order to not block a database connection.
|
||||||
|
limit = 100
|
||||||
|
offset = 0
|
||||||
|
while True:
|
||||||
|
rows = await self.db_pool.runInteraction(
|
||||||
|
"get_presence_for_all_users",
|
||||||
|
self.db_pool.simple_select_list_paginate_txn,
|
||||||
|
"presence_stream",
|
||||||
|
orderby="stream_id",
|
||||||
|
start=offset,
|
||||||
|
limit=limit,
|
||||||
|
keyvalues={},
|
||||||
|
exclude_keyvalues=exclude_keyvalues,
|
||||||
|
retcols=(
|
||||||
|
"user_id",
|
||||||
|
"state",
|
||||||
|
"last_active_ts",
|
||||||
|
"last_federation_update_ts",
|
||||||
|
"last_user_sync_ts",
|
||||||
|
"status_msg",
|
||||||
|
"currently_active",
|
||||||
|
),
|
||||||
|
order_direction="ASC",
|
||||||
|
)
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
users_to_state[row["user_id"]] = UserPresenceState(**row)
|
||||||
|
|
||||||
|
# We've ran out of updates to query
|
||||||
|
if len(rows) < limit:
|
||||||
|
break
|
||||||
|
|
||||||
|
offset += limit
|
||||||
|
|
||||||
|
return users_to_state
|
||||||
|
|
||||||
def get_current_presence_token(self):
|
def get_current_presence_token(self):
|
||||||
return self._presence_id_gen.get_current_token()
|
return self._presence_id_gen.get_current_token()
|
||||||
|
|||||||
@@ -0,0 +1,224 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
from typing import Dict, Iterable, List, Optional, Set, Union
|
||||||
|
|
||||||
|
import attr
|
||||||
|
from typing_extensions import Literal
|
||||||
|
|
||||||
|
from synapse.handlers.presence import UserPresenceState
|
||||||
|
from synapse.module_api import ModuleApi
|
||||||
|
from synapse.rest import admin
|
||||||
|
from synapse.rest.client.v1 import login, presence, room
|
||||||
|
from synapse.types import JsonDict, create_requester
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
from tests.handlers.test_sync import generate_sync_config
|
||||||
|
from tests.unittest import TestCase
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class PresenceRouterTestConfig:
|
||||||
|
users_who_should_receive_all_presence = attr.ib(type=List[str], default=[])
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceRouterTestModule:
|
||||||
|
def __init__(self, config: PresenceRouterTestConfig, module_api: ModuleApi):
|
||||||
|
self._config = config
|
||||||
|
self._module_api = module_api
|
||||||
|
|
||||||
|
async def get_users_for_states(
|
||||||
|
self, state_updates: Iterable[UserPresenceState]
|
||||||
|
) -> Dict[str, Set[UserPresenceState]]:
|
||||||
|
users_to_state = {
|
||||||
|
user_id: set(state_updates)
|
||||||
|
for user_id in self._config.users_who_should_receive_all_presence
|
||||||
|
}
|
||||||
|
return users_to_state
|
||||||
|
|
||||||
|
async def get_interested_users(
|
||||||
|
self, user_id: str
|
||||||
|
) -> Union[Set[str], Literal["ALL"]]:
|
||||||
|
print()
|
||||||
|
if user_id in self._config.users_who_should_receive_all_presence:
|
||||||
|
return "ALL"
|
||||||
|
|
||||||
|
return set()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_config(config_dict: dict) -> PresenceRouterTestConfig:
|
||||||
|
"""Parse a configuration dictionary from the homeserver config, do
|
||||||
|
some validation and return a typed PresenceRouterConfig.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config_dict: The configuration dictionary.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A validated config object.
|
||||||
|
"""
|
||||||
|
# Initialise a typed config object
|
||||||
|
config = PresenceRouterTestConfig()
|
||||||
|
|
||||||
|
config.users_who_should_receive_all_presence = config_dict.get(
|
||||||
|
"users_who_should_receive_all_presence"
|
||||||
|
)
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
class PresenceRouterTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
presence.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def default_config(self):
|
||||||
|
config = super().default_config()
|
||||||
|
config["presence"] = {
|
||||||
|
"presence_router": {
|
||||||
|
"module": __name__ + ".PresenceRouterTestModule",
|
||||||
|
"config": {
|
||||||
|
"users_who_should_receive_all_presence": ["@presence_gobbler:test"]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return config
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, homeserver):
|
||||||
|
self.sync_handler = self.hs.get_sync_handler()
|
||||||
|
|
||||||
|
# Create a user who should receive all presence of others
|
||||||
|
self.presence_receiving_user_id = self.register_user(
|
||||||
|
"presence_gobbler", "monkey"
|
||||||
|
)
|
||||||
|
self.presence_receiving_user_tok = self.login("presence_gobbler", "monkey")
|
||||||
|
|
||||||
|
# And two users who should not have any special routing
|
||||||
|
self.other_user_one_id = self.register_user("other_user_one", "monkey")
|
||||||
|
self.other_user_one_tok = self.login("other_user_one", "monkey")
|
||||||
|
self.other_user_two_id = self.register_user("other_user_two", "monkey")
|
||||||
|
self.other_user_two_tok = self.login("other_user_two", "monkey")
|
||||||
|
|
||||||
|
# Put the other two users in a room with each other
|
||||||
|
self.room_id = self.helper.create_room_as(
|
||||||
|
self.other_user_one_id, tok=self.other_user_one_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
self.helper.invite(
|
||||||
|
self.room_id,
|
||||||
|
self.other_user_one_id,
|
||||||
|
self.other_user_two_id,
|
||||||
|
tok=self.other_user_one_tok,
|
||||||
|
)
|
||||||
|
self.helper.join(
|
||||||
|
self.room_id, self.other_user_two_id, tok=self.other_user_two_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_receiving_all_presence(self):
|
||||||
|
"""Test that a user that does not share a room with another other can receive
|
||||||
|
presence for them, due to presence routing.
|
||||||
|
"""
|
||||||
|
# User one sends some presence
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.other_user_one_id,
|
||||||
|
self.other_user_one_tok,
|
||||||
|
"online",
|
||||||
|
"boop",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that the presence receiving user gets user one's presence when syncing
|
||||||
|
presence_updates = sync_presence(self, self.presence_receiving_user_id)
|
||||||
|
self.assertEqual(len(presence_updates), 1)
|
||||||
|
|
||||||
|
presence_update = presence_updates[0] # type: UserPresenceState
|
||||||
|
self.assertEqual(presence_update.user_id, self.other_user_one_id)
|
||||||
|
self.assertEqual(presence_update.state, "online")
|
||||||
|
self.assertEqual(presence_update.status_msg, "boop")
|
||||||
|
|
||||||
|
# Have all three users send presence
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.other_user_one_id,
|
||||||
|
self.other_user_one_tok,
|
||||||
|
"online",
|
||||||
|
"user_one",
|
||||||
|
)
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.other_user_two_id,
|
||||||
|
self.other_user_two_tok,
|
||||||
|
"online",
|
||||||
|
"user_two",
|
||||||
|
)
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.presence_receiving_user_id,
|
||||||
|
self.presence_receiving_user_tok,
|
||||||
|
"online",
|
||||||
|
"presence_gobbler",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that the presence receiving user gets everyone's presence
|
||||||
|
presence_updates = sync_presence(self, self.presence_receiving_user_id)
|
||||||
|
self.assertEqual(len(presence_updates), 3)
|
||||||
|
|
||||||
|
# But that User One only get itself and User Two's presence
|
||||||
|
presence_updates = sync_presence(self, self.other_user_one_id)
|
||||||
|
self.assertEqual(len(presence_updates), 2)
|
||||||
|
|
||||||
|
found = False
|
||||||
|
for update in presence_updates:
|
||||||
|
if update.user_id == self.other_user_two_id:
|
||||||
|
self.assertEqual(update.state, "online")
|
||||||
|
self.assertEqual(update.status_msg, "user_two")
|
||||||
|
found = True
|
||||||
|
|
||||||
|
self.assertTrue(found)
|
||||||
|
|
||||||
|
|
||||||
|
def send_presence_update(
|
||||||
|
testcase: TestCase,
|
||||||
|
user_id: str,
|
||||||
|
access_token: str,
|
||||||
|
presence_state: str,
|
||||||
|
status_message: Optional[str] = None,
|
||||||
|
) -> JsonDict:
|
||||||
|
# Build the presence body
|
||||||
|
body = {"presence": presence_state}
|
||||||
|
if status_message:
|
||||||
|
body["status_msg"] = status_message
|
||||||
|
|
||||||
|
# Update the user's presence state
|
||||||
|
channel = testcase.make_request(
|
||||||
|
"PUT", "/presence/%s/status" % (user_id,), body, access_token=access_token
|
||||||
|
)
|
||||||
|
testcase.assertEqual(channel.code, 200)
|
||||||
|
|
||||||
|
return channel.json_body
|
||||||
|
|
||||||
|
|
||||||
|
def sync_presence(
|
||||||
|
testcase: TestCase,
|
||||||
|
user_id: str,
|
||||||
|
) -> List[UserPresenceState]:
|
||||||
|
requester = create_requester(user_id)
|
||||||
|
sync_config = generate_sync_config(requester.user.to_string())
|
||||||
|
sync_result = testcase.get_success(
|
||||||
|
testcase.sync_handler.wait_for_sync_for_user(requester, sync_config)
|
||||||
|
)
|
||||||
|
|
||||||
|
return sync_result.presence
|
||||||
+11
-10
@@ -37,7 +37,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
|||||||
def test_wait_for_sync_for_user_auth_blocking(self):
|
def test_wait_for_sync_for_user_auth_blocking(self):
|
||||||
user_id1 = "@user1:test"
|
user_id1 = "@user1:test"
|
||||||
user_id2 = "@user2:test"
|
user_id2 = "@user2:test"
|
||||||
sync_config = self._generate_sync_config(user_id1)
|
sync_config = generate_sync_config(user_id1)
|
||||||
requester = create_requester(user_id1)
|
requester = create_requester(user_id1)
|
||||||
|
|
||||||
self.reactor.advance(100) # So we get not 0 time
|
self.reactor.advance(100) # So we get not 0 time
|
||||||
@@ -60,7 +60,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
|||||||
|
|
||||||
self.auth_blocking._hs_disabled = False
|
self.auth_blocking._hs_disabled = False
|
||||||
|
|
||||||
sync_config = self._generate_sync_config(user_id2)
|
sync_config = generate_sync_config(user_id2)
|
||||||
requester = create_requester(user_id2)
|
requester = create_requester(user_id2)
|
||||||
|
|
||||||
e = self.get_failure(
|
e = self.get_failure(
|
||||||
@@ -69,11 +69,12 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
|||||||
)
|
)
|
||||||
self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
|
self.assertEquals(e.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
|
||||||
|
|
||||||
def _generate_sync_config(self, user_id):
|
|
||||||
return SyncConfig(
|
def generate_sync_config(user_id: str) -> SyncConfig:
|
||||||
user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
|
return SyncConfig(
|
||||||
filter_collection=DEFAULT_FILTER_COLLECTION,
|
user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
|
||||||
is_guest=False,
|
filter_collection=DEFAULT_FILTER_COLLECTION,
|
||||||
request_key="request_key",
|
is_guest=False,
|
||||||
device_id="device_id",
|
request_key="request_key",
|
||||||
)
|
device_id="device_id",
|
||||||
|
)
|
||||||
|
|||||||
@@ -15,11 +15,17 @@
|
|||||||
from mock import Mock
|
from mock import Mock
|
||||||
|
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
from synapse.handlers.presence import UserPresenceState
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client.v1 import login, room
|
from synapse.rest.client.v1 import login, presence, room
|
||||||
from synapse.types import create_requester
|
from synapse.types import create_requester
|
||||||
|
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.events.test_presence_router import (
|
||||||
|
PresenceRouterTestModule,
|
||||||
|
send_presence_update,
|
||||||
|
sync_presence,
|
||||||
|
)
|
||||||
|
from tests.unittest import HomeserverTestCase, override_config
|
||||||
|
|
||||||
|
|
||||||
class ModuleApiTestCase(HomeserverTestCase):
|
class ModuleApiTestCase(HomeserverTestCase):
|
||||||
@@ -27,6 +33,7 @@ class ModuleApiTestCase(HomeserverTestCase):
|
|||||||
admin.register_servlets,
|
admin.register_servlets,
|
||||||
login.register_servlets,
|
login.register_servlets,
|
||||||
room.register_servlets,
|
room.register_servlets,
|
||||||
|
presence.register_servlets,
|
||||||
]
|
]
|
||||||
|
|
||||||
def prepare(self, reactor, clock, homeserver):
|
def prepare(self, reactor, clock, homeserver):
|
||||||
@@ -205,3 +212,95 @@ class ModuleApiTestCase(HomeserverTestCase):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.assertFalse(is_in_public_rooms)
|
self.assertFalse(is_in_public_rooms)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{
|
||||||
|
"presence": {
|
||||||
|
"presence_router": {
|
||||||
|
"module": "%s.%s"
|
||||||
|
% (
|
||||||
|
PresenceRouterTestModule.__module__,
|
||||||
|
PresenceRouterTestModule.__name__,
|
||||||
|
),
|
||||||
|
"config": {
|
||||||
|
"users_who_should_receive_all_presence": [
|
||||||
|
"@presence_gobbler1:test",
|
||||||
|
"@presence_gobbler2:test",
|
||||||
|
]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
def test_send_local_online_presence_to(self):
|
||||||
|
"""Tests that send_local_presence_to_users sends local online presence to a set
|
||||||
|
of specified local and remote users.
|
||||||
|
"""
|
||||||
|
self.sync_handler = self.hs.get_sync_handler()
|
||||||
|
|
||||||
|
# Create a user who will send presence updates
|
||||||
|
self.other_user_id = self.register_user("other_user", "monkey")
|
||||||
|
self.other_user_tok = self.login("other_user", "monkey")
|
||||||
|
|
||||||
|
# And another two users that will also send out presence updates, as well as receive
|
||||||
|
# theirs and everyone else's
|
||||||
|
self.presence_receiving_user_one_id = self.register_user(
|
||||||
|
"presence_gobbler1", "monkey"
|
||||||
|
)
|
||||||
|
self.presence_receiving_user_one_tok = self.login("presence_gobbler1", "monkey")
|
||||||
|
self.presence_receiving_user_two_id = self.register_user(
|
||||||
|
"presence_gobbler2", "monkey"
|
||||||
|
)
|
||||||
|
self.presence_receiving_user_two_tok = self.login("presence_gobbler2", "monkey")
|
||||||
|
|
||||||
|
# Have all three users send some presence updates
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.other_user_id,
|
||||||
|
self.other_user_tok,
|
||||||
|
"online",
|
||||||
|
"I'm online!",
|
||||||
|
)
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.presence_receiving_user_one_id,
|
||||||
|
self.presence_receiving_user_one_tok,
|
||||||
|
"online",
|
||||||
|
"I'm also online!",
|
||||||
|
)
|
||||||
|
send_presence_update(
|
||||||
|
self,
|
||||||
|
self.presence_receiving_user_two_id,
|
||||||
|
self.presence_receiving_user_two_tok,
|
||||||
|
"unavailable",
|
||||||
|
"I'm in a meeting!",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mark each presence-receiving user for receiving all user presence
|
||||||
|
self.get_success(
|
||||||
|
self.module_api.send_local_online_presence_to(
|
||||||
|
[
|
||||||
|
self.presence_receiving_user_one_id,
|
||||||
|
self.presence_receiving_user_two_id,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Perform a sync for each user
|
||||||
|
|
||||||
|
# The other user should only receive their own presence
|
||||||
|
presence_updates = sync_presence(self, self.other_user_id)
|
||||||
|
self.assertEqual(len(presence_updates), 1)
|
||||||
|
|
||||||
|
presence_update = presence_updates[0] # type: UserPresenceState
|
||||||
|
self.assertEqual(presence_update.user_id, self.other_user_id)
|
||||||
|
self.assertEqual(presence_update.state, "online")
|
||||||
|
self.assertEqual(presence_update.status_msg, "I'm online!")
|
||||||
|
|
||||||
|
# Whereas both presence receiving users should receive everyone's presence updates
|
||||||
|
presence_updates = sync_presence(self, self.presence_receiving_user_one_id)
|
||||||
|
self.assertEqual(len(presence_updates), 3)
|
||||||
|
presence_updates = sync_presence(self, self.presence_receiving_user_two_id)
|
||||||
|
self.assertEqual(len(presence_updates), 3)
|
||||||
|
|
||||||
|
# TODO: Test sending to federated users
|
||||||
|
|||||||
Reference in New Issue
Block a user