Fixes #18356
This commit is contained in:
1
changelog.d/18372.misc
Normal file
1
changelog.d/18372.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Add a column `participant` to `room_memberships` table.
|
||||||
@@ -1693,93 +1693,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
|||||||
columns=["user_id", "room_id"],
|
columns=["user_id", "room_id"],
|
||||||
)
|
)
|
||||||
|
|
||||||
self.db_pool.updates.register_background_update_handler(
|
|
||||||
"populate_participant_bg_update", self._populate_participant
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int:
|
|
||||||
"""
|
|
||||||
Background update to populate column `participant` on `room_memberships` table
|
|
||||||
|
|
||||||
A 'participant' is someone who is currently joined to a room and has sent at least
|
|
||||||
one `m.room.message` or `m.room.encrypted` event.
|
|
||||||
|
|
||||||
This background update will set the `participant` column across all rows in
|
|
||||||
`room_memberships` based on the user's *current* join status, and if
|
|
||||||
they've *ever* sent a message or encrypted event. Therefore one should
|
|
||||||
never assume the `participant` column's value is based solely on whether
|
|
||||||
the user participated in a previous "session" (where a "session" is defined
|
|
||||||
as a period between the user joining and leaving). See
|
|
||||||
https://github.com/element-hq/synapse/pull/18068#discussion_r1931070291
|
|
||||||
for further detail.
|
|
||||||
"""
|
|
||||||
stream_token = progress.get("last_stream_token", None)
|
|
||||||
|
|
||||||
def _get_max_stream_token_txn(txn: LoggingTransaction) -> int:
|
|
||||||
sql = """
|
|
||||||
SELECT event_stream_ordering from room_memberships
|
|
||||||
ORDER BY event_stream_ordering DESC
|
|
||||||
LIMIT 1;
|
|
||||||
"""
|
|
||||||
txn.execute(sql)
|
|
||||||
res = txn.fetchone()
|
|
||||||
if not res or not res[0]:
|
|
||||||
return 0
|
|
||||||
return res[0]
|
|
||||||
|
|
||||||
def _background_populate_participant_txn(
|
|
||||||
txn: LoggingTransaction, stream_token: str
|
|
||||||
) -> None:
|
|
||||||
sql = """
|
|
||||||
UPDATE room_memberships
|
|
||||||
SET participant = True
|
|
||||||
FROM (
|
|
||||||
SELECT DISTINCT c.state_key, e.room_id
|
|
||||||
FROM current_state_events AS c
|
|
||||||
INNER JOIN events AS e ON c.room_id = e.room_id
|
|
||||||
WHERE c.membership = 'join'
|
|
||||||
AND c.state_key = e.sender
|
|
||||||
AND (
|
|
||||||
e.type = 'm.room.message'
|
|
||||||
OR e.type = 'm.room.encrypted'
|
|
||||||
)
|
|
||||||
) AS subquery
|
|
||||||
WHERE room_memberships.user_id = subquery.state_key
|
|
||||||
AND room_memberships.room_id = subquery.room_id
|
|
||||||
AND room_memberships.event_stream_ordering <= ?
|
|
||||||
AND room_memberships.event_stream_ordering > ?;
|
|
||||||
"""
|
|
||||||
batch = int(stream_token) - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
|
|
||||||
txn.execute(sql, (stream_token, batch))
|
|
||||||
|
|
||||||
if stream_token is None:
|
|
||||||
stream_token = await self.db_pool.runInteraction(
|
|
||||||
"_get_max_stream_token", _get_max_stream_token_txn
|
|
||||||
)
|
|
||||||
|
|
||||||
if stream_token < 0:
|
|
||||||
await self.db_pool.updates._end_background_update(
|
|
||||||
"populate_participant_bg_update"
|
|
||||||
)
|
|
||||||
return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
|
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"_background_populate_participant_txn",
|
|
||||||
_background_populate_participant_txn,
|
|
||||||
stream_token,
|
|
||||||
)
|
|
||||||
|
|
||||||
progress["last_stream_token"] = (
|
|
||||||
stream_token - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
|
|
||||||
)
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"populate_participant_bg_update",
|
|
||||||
self.db_pool.updates._background_update_progress_txn,
|
|
||||||
"populate_participant_bg_update",
|
|
||||||
progress,
|
|
||||||
)
|
|
||||||
return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
|
|
||||||
|
|
||||||
async def _background_add_membership_profile(
|
async def _background_add_membership_profile(
|
||||||
self, progress: JsonDict, batch_size: int
|
self, progress: JsonDict, batch_size: int
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|||||||
@@ -13,8 +13,4 @@
|
|||||||
|
|
||||||
-- Add a column `participant` to `room_memberships` table to track whether a room member has sent
|
-- Add a column `participant` to `room_memberships` table to track whether a room member has sent
|
||||||
-- a `m.room.message` or `m.room.encrypted` event into a room they are a member of
|
-- a `m.room.message` or `m.room.encrypted` event into a room they are a member of
|
||||||
ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE;
|
ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE;
|
||||||
|
|
||||||
-- Add a background update to populate `participant` column
|
|
||||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
|
||||||
(9001, 'populate_participant_bg_update', '{}');
|
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2025 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
-- Remove the background update if it was scheduled, as it is not rollback-safe
|
||||||
|
-- See https://github.com/element-hq/synapse/issues/18356 for context
|
||||||
|
DELETE FROM background_updates
|
||||||
|
WHERE update_name = 'populate_participant_bg_update';
|
||||||
Reference in New Issue
Block a user