Modify ModuleApi to upsert entries into our new table
This commit is contained in:
@@ -55,14 +55,6 @@ class ModuleApi:
|
||||
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
||||
self._public_room_list_manager = PublicRoomListManager(hs)
|
||||
|
||||
# The next time these users sync, they will receive the current presence
|
||||
# state of all local users. Users are added by send_local_online_presence_to,
|
||||
# and removed after a successful sync.
|
||||
#
|
||||
# We make this a private variable to deter modules from accessing it directly,
|
||||
# though other classes in Synapse will still do so.
|
||||
self._send_full_presence_to_local_users = set()
|
||||
|
||||
@property
|
||||
def http_client(self):
|
||||
"""Allows making outbound HTTP requests to remote resources.
|
||||
@@ -413,28 +405,31 @@ class ModuleApi:
|
||||
"on processes that send federation",
|
||||
)
|
||||
|
||||
local_users = set()
|
||||
remote_users = set()
|
||||
for user in users:
|
||||
if self._hs.is_mine_id(user):
|
||||
# Modify SyncHandler._generate_sync_entry_for_presence to call
|
||||
# presence_source.get_new_events with an empty `from_key` if
|
||||
# that user's ID were in a list modified by ModuleApi somewhere.
|
||||
# That user would then get all presence state on next incremental sync.
|
||||
|
||||
# Force a presence initial_sync for this user next time
|
||||
self._send_full_presence_to_local_users.add(user)
|
||||
local_users.add(user)
|
||||
else:
|
||||
# Retrieve presence state for currently online users that this user
|
||||
# is considered interested in
|
||||
presence_events, _ = await self._presence_stream.get_new_events(
|
||||
UserID.from_string(user), from_key=None, include_offline=False
|
||||
)
|
||||
remote_users.add(user)
|
||||
|
||||
# Send to remote destinations
|
||||
await make_deferred_yieldable(
|
||||
# We pull the federation sender here as we can only do so on workers
|
||||
# that support sending presence
|
||||
self._hs.get_federation_sender().send_presence(presence_events)
|
||||
)
|
||||
if local_users:
|
||||
# Force a presence initial_sync for these users next time they sync.
|
||||
await self._store.add_users_to_send_full_presence_to(local_users)
|
||||
|
||||
for user in remote_users:
|
||||
# Retrieve presence state for currently online users that this user
|
||||
# is considered interested in
|
||||
presence_events, _ = await self._presence_stream.get_new_events(
|
||||
UserID.from_string(user), from_key=None, include_offline=False
|
||||
)
|
||||
|
||||
# Send to remote destinations
|
||||
await make_deferred_yieldable(
|
||||
# We pull the federation sender here as we can only do so on workers
|
||||
# that support sending presence
|
||||
self._hs.get_federation_sender().send_presence(presence_events)
|
||||
)
|
||||
|
||||
|
||||
class PublicRoomListManager:
|
||||
|
||||
@@ -212,7 +212,7 @@ class PresenceStore(SQLBaseStore):
|
||||
WHERE added_ms < ?
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (time_now - USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS)
|
||||
sql, (time_now - USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS,)
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
|
||||
Reference in New Issue
Block a user