1
0

Compare commits

...

13 Commits

Author SHA1 Message Date
Andrew Morgan
f42fae4fb3 wip 2021-05-12 13:41:16 +01:00
Andrew Morgan
a9687fb14c Merge branch 'develop' of github.com:matrix-org/synapse into anoa/module_api_full_presence_fix_wip 2021-05-11 18:28:06 +01:00
Andrew Morgan
bdf5ec745a force_notify nonsense 2021-05-11 18:26:25 +01:00
Andrew Morgan
60dc246634 Fix stream token multiple devices 2021-05-11 17:50:13 +01:00
Andrew Morgan
e083f0fd89 wip tests 2021-05-11 17:50:10 +01:00
Andrew Morgan
46eccf1826 Add presence_stream_id column to users_to_send_full_presence_to table
We now return full presence in a /sync response based on the provided sync token,
rather than if the user has received the response already. This is necessary in
circumstances where users have more than one device - which they often do!

This comment also modifies the spot in PresenceHandler where we check if
the user should receive all presence. We only need to make the check if
from_key is not None. If from_key is None, the user will be receiving
all presence states anyways.
2021-05-05 16:02:38 +01:00
Andrew Morgan
caee7dae44 Remove users_to_send_full_presence_to table culling; add fk for user_id 2021-05-05 14:43:17 +01:00
Andrew Morgan
b8d85c9b77 Merge branch 'develop' of github.com:matrix-org/synapse into anoa/module_api_full_presence_fix 2021-05-04 11:49:14 +01:00
Andrew Morgan
fc341f8b11 Changelog 2021-04-16 18:13:35 +01:00
Andrew Morgan
16b3789033 Modify SyncHandler to pull from the new table 2021-04-16 18:13:35 +01:00
Andrew Morgan
2ea5dbdf41 Modify ModuleApi to upsert entries into our new table 2021-04-16 18:13:35 +01:00
Andrew Morgan
9c243b7312 Add migration and storage methods for users_to_send_full_presence_to table 2021-04-16 18:13:33 +01:00
Andrew Morgan
d725923068 Add a util file for non API-specific constants, and move ONE_HOUR to it
(This constant is used in the next commit.)

So there's not much here right now, and perhaps this could fit better in
another file. Though I didn't see anything quite appropriate.
2021-04-15 16:57:15 +01:00
12 changed files with 390 additions and 150 deletions

1
changelog.d/9823.misc Normal file
View File

@@ -0,0 +1 @@
Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`.

View File

@@ -24,6 +24,7 @@ from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.constants import HOUR_IN_MS
if TYPE_CHECKING:
from synapse.appservice import ApplicationService
@@ -46,8 +47,6 @@ sent_events_counter = Counter(
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
)
HOUR_IN_MS = 60 * 60 * 1000
APP_SERVICE_PREFIX = "/_matrix/app/unstable"

View File

@@ -222,7 +222,7 @@ class BasePresenceHandler(abc.ABC):
@abc.abstractmethod
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False
) -> None:
"""Set the presence state of the user. """
@@ -296,6 +296,30 @@ class BasePresenceHandler(abc.ABC):
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)
async def resend_current_presence_for_users(self, user_ids: Iterable[str]):
"""
Grabs the current presence state for a given set of users and adds it
to the top of the presence stream.
Args:
user_ids: The IDs of the users to use.
"""
# Get the current presence state for each user (defaults to offline if not found)
current_presence_for_users = await self.current_state_for_users(user_ids)
for user_id, current_presence_state in current_presence_for_users.items():
# Convert the UserPresenceState object into a serializable dict
state = {
"presence": current_presence_state.state,
"status_message": current_presence_state.status_msg,
}
# Copy the presence state to the tip of the presence stream
print(f"Adding a presence update for {user_id}: {state}")
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
print("bla")
class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
@@ -480,6 +504,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> None:
"""Set the presence state of the user."""
presence = state["presence"]
@@ -508,6 +533,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
)
async def bump_presence_active_time(self, user: UserID) -> None:
@@ -677,7 +703,7 @@ class PresenceHandler(BasePresenceHandler):
[self.user_to_current_state[user_id] for user_id in unpersisted]
)
async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
async def _update_states(self, new_states: Iterable[UserPresenceState], force_notify: bool = False) -> None:
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
should be sent to clients/servers.
@@ -720,6 +746,9 @@ class PresenceHandler(BasePresenceHandler):
now=now,
)
if force_notify:
should_notify = True
self.user_to_current_state[user_id] = new_state
if should_notify:
@@ -1058,7 +1087,7 @@ class PresenceHandler(BasePresenceHandler):
await self._update_states(updates)
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False
) -> None:
"""Set the presence state of the user."""
status_msg = state.get("status_msg", None)
@@ -1091,7 +1120,7 @@ class PresenceHandler(BasePresenceHandler):
):
new_fields["last_active_ts"] = self.clock.time_msec()
await self._update_states([prev_state.copy_and_replace(**new_fields)])
await self._update_states([prev_state.copy_and_replace(**new_fields)], force_notify=force_notify)
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
"""Returns whether a user can see another user's presence."""
@@ -1389,11 +1418,10 @@ class PresenceEventSource:
#
# Presence -> Notifier -> PresenceEventSource -> Presence
#
# Same with get_module_api, get_presence_router
# Same with get_presence_router:
#
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.get_presence_handler = hs.get_presence_handler
self.get_module_api = hs.get_module_api
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -1424,16 +1452,21 @@ class PresenceEventSource:
stream_change_cache = self.store.presence_stream_cache
with Measure(self.clock, "presence.get_new_events"):
if user_id in self.get_module_api()._send_full_presence_to_local_users:
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False
if from_key is not None:
from_key = int(from_key)
# Check if this user should receive all current, online user presence. We only
# bother to do this if from_key is set, as otherwise the user will receive all
# user presence anyways.
if await self.store.should_user_receive_full_presence_with_token(
user_id, from_key
):
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False
max_token = self.store.get_current_presence_token()
if from_key == max_token:
# This is necessary as due to the way stream ID generators work
@@ -1467,12 +1500,6 @@ class PresenceEventSource:
user_id, include_offline, from_key
)
# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(
user_id
)
return presence_updates, max_token
# Make mypy happy. users_interested_in should now be a set
@@ -1522,10 +1549,6 @@ class PresenceEventSource:
)
presence_updates = list(users_to_state.values())
# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
if not include_offline:
# Filter out offline presence states
presence_updates = self._filter_offline_presence_state(presence_updates)

View File

@@ -56,14 +56,6 @@ class ModuleApi:
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)
# The next time these users sync, they will receive the current presence
# state of all local users. Users are added by send_local_online_presence_to,
# and removed after a successful sync.
#
# We make this a private variable to deter modules from accessing it directly,
# though other classes in Synapse will still do so.
self._send_full_presence_to_local_users = set()
@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
@@ -408,36 +400,39 @@ class ModuleApi:
Note that this method can only be run on the main or federation_sender worker
processes.
"""
if not self._hs.should_send_federation():
# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()
local_users = set()
remote_users = set()
for user in users:
if self._hs.is_mine_id(user):
local_users.add(user)
else:
remote_users.add(user)
if remote_users and not self._hs.should_send_federation():
raise Exception(
"send_local_online_presence_to can only be run "
"send_local_online_presence_to can only be called with remote users "
"on processes that send federation",
)
for user in users:
if self._hs.is_mine_id(user):
# Modify SyncHandler._generate_sync_entry_for_presence to call
# presence_source.get_new_events with an empty `from_key` if
# that user's ID were in a list modified by ModuleApi somewhere.
# That user would then get all presence state on next incremental sync.
if local_users:
# Force a presence initial_sync for these users next time they sync.
await self._store.add_users_to_send_full_presence_to(local_users)
# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
else:
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)
for user in remote_users:
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)
# Send to remote destinations.
# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()
await presence_handler.maybe_send_presence_to_interested_destinations(
presence_events
)
# Send to remote destinations.
await presence_handler.maybe_send_presence_to_interested_destinations(
presence_events
)
class PublicRoomListManager:

View File

@@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
{
"state": { ... },
"ignore_status_msg": false,
"force_notify": false
}
200 OK
@@ -91,10 +92,11 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
self._presence_handler = hs.get_presence_handler()
@staticmethod
async def _serialize_payload(user_id, state, ignore_status_msg=False):
async def _serialize_payload(user_id, state, ignore_status_msg=False, force_notify=False):
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
}
async def _handle_request(self, request, user_id):

View File

@@ -54,7 +54,6 @@ class SendServerNoticeServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.txns = HttpTransactionCache(hs)
self.snm = hs.get_server_notices_manager()
def register(self, json_resource: HttpServer):
PATTERN = "/send_server_notice"
@@ -77,7 +76,7 @@ class SendServerNoticeServlet(RestServlet):
event_type = body.get("type", EventTypes.Message)
state_key = body.get("state_key")
if not self.snm.is_enabled():
if not self.hs.get_server_notices_manager().is_enabled():
raise SynapseError(400, "Server notices are not enabled on this server")
user_id = body["user_id"]
@@ -85,7 +84,7 @@ class SendServerNoticeServlet(RestServlet):
if not self.hs.is_mine_id(user_id):
raise SynapseError(400, "Server notices can only be sent to local users")
event = await self.snm.send_notice(
event = await self.hs.get_server_notices_manager().send_notice(
user_id=body["user_id"],
type=event_type,
state_key=state_key,

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Dict, List, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
db_conn, "presence_stream", "stream_id"
)
self.hs = hs
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
@@ -210,6 +211,70 @@ class PresenceStore(SQLBaseStore):
return {row["user_id"]: UserPresenceState(**row) for row in rows}
async def should_user_receive_full_presence_with_token(
self,
user_id: str,
from_token: int,
) -> bool:
"""Check whether the given user should receive full presence using the stream token
they're updating from.
Args:
user_id: The ID of the user to check.
from_token: The stream token included in their /sync token.
Returns:
True if the user should have full presence sent to them, False otherwise.
"""
def _should_user_receive_full_presence_with_token_txn(txn):
sql = """
SELECT 1 FROM users_to_send_full_presence_to
WHERE user_id = ?
AND presence_stream_id >= ?
"""
txn.execute(sql, (user_id, from_token))
return bool(txn.fetchone())
return await self.db_pool.runInteraction(
"should_user_receive_full_presence_with_token",
_should_user_receive_full_presence_with_token_txn,
)
async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
"""Adds to the list of users who should receive a full snapshot of presence
upon their next sync.
Args:
user_ids: An iterable of user IDs.
"""
# Add user entries to the table, updating the presence_stream_id column if the user already
# exists in the table.
await self.db_pool.simple_upsert_many(
table="users_to_send_full_presence_to",
key_names=("user_id",),
key_values=[(user_id,) for user_id in user_ids],
value_names=("presence_stream_id",),
# We save the current presence stream ID token along with the user ID entry so
# that when a user /sync's, even if they syncing multiple times across separate
# devices at different times, each device will receive full presence once - when
# the presence stream ID in their sync token is less than the one in the table
# for their user ID.
value_values=(
(self._presence_id_gen.get_current_token(),) for _ in user_ids
),
desc="add_users_to_send_full_presence_to",
)
# Add a new entry to the presence stream. Since we use stream tokens to determine whether a
# local user should receive a full snapshot presence when they sync, we need to bump the
# presence stream so that subsequent syncs with no presence activity in between won't result
# in the client receiving multiple full snapshots of presence.
# If we bump the stream ID, then the user will get a higher stream token next sync, and thus
# won't receive another snapshot.
await self.hs.get_presence_handler().resend_current_presence_for_users(user_ids)
async def get_presence_for_all_users(
self,
include_offline: bool = True,

View File

@@ -0,0 +1,34 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a table that keeps track of a list of users who should, upon their next
-- sync request, receive presence for all currently online users that they are
-- "interested" in.
-- The motivation for a DB table over an in-memory list is so that this list
-- can be added to and retrieved from by any worker. Specifically, we don't
-- want to duplicate work across multiple sync workers.
CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to(
-- The user ID to send full presence to.
user_id TEXT PRIMARY KEY,
-- A presence stream ID token - the current presence stream token when the row was last upserted.
-- If a user calls /sync and this token is part of the update they're to receive, we also include
-- full user presence in the response.
-- This allows multiple devices for a user to receive full presence whenever they next call /sync.
presence_stream_id BIGINT,
FOREIGN KEY (user_id)
REFERENCES users (name)
);

15
synapse/util/constants.py Normal file
View File

@@ -0,0 +1,15 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
HOUR_IN_MS = 60 * 60 * 1000

View File

@@ -19,14 +19,16 @@ from synapse.federation.units import Transaction
from synapse.handlers.presence import UserPresenceState
from synapse.rest import admin
from synapse.rest.client.v1 import login, presence, room
from synapse.types import create_requester
from synapse.types import create_requester, StreamToken
from tests.events.test_presence_router import send_presence_update, sync_presence
from tests.test_utils.event_injection import inject_member_event
from tests.unittest import FederatingHomeserverTestCase, override_config
from tests.unittest import HomeserverTestCase, override_config
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import USE_POSTGRES_FOR_TESTS
class ModuleApiTestCase(FederatingHomeserverTestCase):
class ModuleApiTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
@@ -217,90 +219,9 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
)
self.assertFalse(is_in_public_rooms)
# The ability to send federation is required by send_local_online_presence_to.
@override_config({"send_federation": True})
def test_send_local_online_presence_to(self):
"""Tests that send_local_presence_to_users sends local online presence to local users."""
# Create a user who will send presence updates
self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
self.presence_receiver_tok = self.login("presence_receiver", "monkey")
# And another user that will send presence updates out
self.presence_sender_id = self.register_user("presence_sender", "monkey")
self.presence_sender_tok = self.login("presence_sender", "monkey")
# Put them in a room together so they will receive each other's presence updates
room_id = self.helper.create_room_as(
self.presence_receiver_id,
tok=self.presence_receiver_tok,
)
self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
# Presence sender comes online
send_presence_update(
self,
self.presence_sender_id,
self.presence_sender_tok,
"online",
"I'm online!",
)
# Presence receiver should have received it
presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
self.assertEqual(len(presence_updates), 1)
presence_update = presence_updates[0] # type: UserPresenceState
self.assertEqual(presence_update.user_id, self.presence_sender_id)
self.assertEqual(presence_update.state, "online")
# Syncing again should result in no presence updates
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
self.assertEqual(len(presence_updates), 0)
# Trigger sending local online presence
self.get_success(
self.module_api.send_local_online_presence_to(
[
self.presence_receiver_id,
]
)
)
# Presence receiver should have received online presence again
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
self.assertEqual(len(presence_updates), 1)
presence_update = presence_updates[0] # type: UserPresenceState
self.assertEqual(presence_update.user_id, self.presence_sender_id)
self.assertEqual(presence_update.state, "online")
# Presence sender goes offline
send_presence_update(
self,
self.presence_sender_id,
self.presence_sender_tok,
"offline",
"I slink back into the darkness.",
)
# Trigger sending local online presence
self.get_success(
self.module_api.send_local_online_presence_to(
[
self.presence_receiver_id,
]
)
)
# Presence receiver should *not* have received offline state
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
self.assertEqual(len(presence_updates), 0)
# Test sending local online presence to users from the main process
_test_sending_local_online_presence_to_local_user(self, test_with_workers=False)
@override_config({"send_federation": True})
def test_send_local_online_presence_to_federation(self):
@@ -374,3 +295,189 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
found_update = True
self.assertTrue(found_update)
class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
"""For testing ModuleApi functionality in a multi-worker setup"""
# Testing stream ID replication from the main to worker processes requires postgres
# (due to needing `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
presence.register_servlets,
]
def prepare(self, reactor, clock, homeserver):
self.module_api = homeserver.get_module_api()
self.sync_handler = homeserver.get_sync_handler()
def make_homeserver(self, reactor, clock):
config = self.default_config()
# This isn't a real configuration option but is used to provide the main
# homeserver and worker homeserver different options.
main_replication_secret = config.pop("main_replication_secret", None)
if main_replication_secret:
config["worker_replication_secret"] = main_replication_secret
return self.setup_test_homeserver(config=config)
def _get_worker_hs_config(self) -> dict:
config = self.default_config()
config["worker_app"] = "synapse.app.generic_worker"
config["worker_replication_host"] = "testserv"
config["worker_replication_http_port"] = "8765"
return config
def test_send_local_online_presence_to_workers(self):
# Test sending local online presence to users from a worker process
_test_sending_local_online_presence_to_local_user(self, test_with_workers=True)
def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, test_with_workers: bool = False):
"""Tests that send_local_presence_to_users sends local online presence to local users.
Args:
test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a
worker process. The test users will still sync with the main process. The purpose of testing
with a worker is to check whether a Synapse module running on a worker can inform other workers/
the main process that they should include additional presence when a user next syncs.
"""
if test_with_workers:
# Create a worker process to make module_api calls against
worker_hs = self.make_worker_hs("synapse.app.generic_worker")
# Create a user who will send presence updates
self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
self.presence_receiver_tok = self.login("presence_receiver", "monkey")
# And another user that will send presence updates out
self.presence_sender_id = self.register_user("presence_sender", "monkey")
self.presence_sender_tok = self.login("presence_sender", "monkey")
# Put them in a room together so they will receive each other's presence updates
room_id = self.helper.create_room_as(
self.presence_receiver_id,
tok=self.presence_receiver_tok,
)
self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
# Presence sender comes online
send_presence_update(
self,
self.presence_sender_id,
self.presence_sender_tok,
"online",
"I'm online!",
)
# Presence receiver should have received it
presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
self.assertEqual(len(presence_updates), 1)
presence_update = presence_updates[0] # type: UserPresenceState
self.assertEqual(presence_update.user_id, self.presence_sender_id)
self.assertEqual(presence_update.state, "online")
if test_with_workers:
# Replicate the current sync presence token from the main process to the worker process.
# We need to do this so that the worker process knows the current presence stream ID to
# insert into the database when we call ModuleApi.send_local_online_presence_to.
self.replicate()
# Syncing again should result in no presence updates
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
self.assertEqual(len(presence_updates), 0)
# We do an (initial) sync with a second "device" now, getting a new sync token.
# We'll use this in a moment.
_, sync_token_second_device = sync_presence(self, self.presence_receiver_id)
# Determine on which worker to call ModuleApi.send_local_online_presence_to on
if test_with_workers:
module_api_to_use = worker_hs.get_module_api()
else:
module_api_to_use = self.module_api
# Trigger sending local online presence on the worker process. We expect this information
# to be saved to the database where all other workers can access it.
d = module_api_to_use.send_local_online_presence_to(
[
self.presence_receiver_id,
]
)
if test_with_workers:
self.replicate()
print(d)
self.get_success(d)
# The presence receiver should have received online presence again.
print("Sync token initially:", sync_token)
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
self.assertEqual(len(presence_updates), 1)
print("Sync token after a sync:", sync_token)
presence_update = presence_updates[0] # type: UserPresenceState
self.assertEqual(presence_update.user_id, self.presence_sender_id)
self.assertEqual(presence_update.state, "online")
# We attempt to sync with the second sync token we received above - just to check that
# multiple syncing devices will each receive the necessary online presence.
presence_updates, sync_token_second_device = sync_presence(
self, self.presence_receiver_id, sync_token_second_device
)
self.assertEqual(len(presence_updates), 1)
presence_update = presence_updates[0] # type: UserPresenceState
self.assertEqual(presence_update.user_id, self.presence_sender_id)
self.assertEqual(presence_update.state, "online")
# However, if we now sync with either "device", we won't receive another burst of online presence
# until the API is called again sometime in the future
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
print("Sync token after the second sync:", sync_token)
print(presence_updates)
# Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to.
# Presence sender goes offline
send_presence_update(
self,
self.presence_sender_id,
self.presence_sender_tok,
"offline",
"I slink back into the darkness.",
)
# Presence receiver should have received the updated, offline state
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
print("Sync token after the third sync:", sync_token)
self.assertEqual(len(presence_updates), 1)
# Now trigger sending local online presence.
self.get_success(
self.module_api.send_local_online_presence_to(
[
self.presence_receiver_id,
]
)
)
# Presence receiver should *not* have received offline state
presence_updates, sync_token = sync_presence(
self, self.presence_receiver_id, sync_token
)
self.assertEqual(len(presence_updates), 0)

View File

@@ -30,7 +30,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
"""Checks event persisting sharding works"""
# Event persister sharding requires postgres (due to needing
# `MutliWriterIdGenerator`).
# `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"

View File

@@ -236,7 +236,7 @@ def setup_test_homeserver(
else:
database_config = {
"name": "sqlite3",
"args": {"database": ":memory:", "cp_min": 1, "cp_max": 1},
"args": {"database": "test.db", "cp_min": 1, "cp_max": 1},
}
database = DatabaseConnectionConfig("master", database_config)