1
0

Change to updating the latest membership in the room

This commit is contained in:
Eric Eastwood
2024-08-07 16:37:17 -05:00
parent cb335805d4
commit 87d95615d4
2 changed files with 116 additions and 92 deletions

View File

@@ -42,7 +42,12 @@ import attr
from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.constants import (
EventContentFields,
EventTypes,
RelationTypes,
Membership,
)
from synapse.api.errors import PartialStateConflictError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
@@ -1392,102 +1397,119 @@ class PersistEventsStore:
# We now update `sliding_sync_non_join_memberships`. We do this regardless of
# whether the server is still in the room or not because we still want a row if
# a local user was just left/kicked or got banned from the room.
if to_delete:
txn.execute_batch(
"DELETE FROM sliding_sync_non_join_memberships"
" WHERE room_id = ? AND user_id = ?",
(
(room_id, state_key)
for event_type, state_key in to_delete
if event_type == EventTypes.Member and self.is_mine_id(state_key)
),
)
if to_insert:
membership_event_id_to_user_id_map: Dict[str, str] = {}
for state_key, event_id in to_insert.items():
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
membership_event_id_to_user_id_map[event_id] = state_key[1]
# Fetch the events from the database
#
# TODO: We should gather this data before we delete the
# `current_state_events` in a `no_longer_in_room` situation.
(
event_type_and_state_key_in_list_clause,
event_type_and_state_key_args,
) = make_tuple_in_list_sql_clause(
self.database_engine,
("type", "state_key"),
[
(EventTypes.Create, ""),
(EventTypes.RoomEncryption, ""),
(EventTypes.Name, ""),
],
)
txn.execute(
f"""
SELECT c.event_id, c.type, c.state_key, j.json
FROM current_state_events AS c
INNER JOIN event_json AS j USING (event_id)
WHERE
c.room_id = ?
AND {event_type_and_state_key_in_list_clause}
""",
[room_id] + event_type_and_state_key_args,
)
# Parse the raw event JSON
sliding_sync_non_joined_rooms_insert_map: Dict[
str, Optional[Union[str, bool]]
] = {}
for row in txn:
event_id, event_type, state_key, json = row
event_json = db_to_json(json)
if event_type == EventTypes.Create:
room_type = event_json.get("content", {}).get(
EventContentFields.ROOM_TYPE
)
sliding_sync_non_joined_rooms_insert_map["room_type"] = room_type
elif event_type == EventTypes.RoomEncryption:
is_encrypted = event_json.get("content", {}).get(
EventContentFields.ENCRYPTION_ALGORITHM
)
sliding_sync_non_joined_rooms_insert_map["is_encrypted"] = (
is_encrypted
)
elif event_type == EventTypes.Name:
room_name = event_json.get("content", {}).get(
EventContentFields.ROOM_NAME
)
sliding_sync_non_joined_rooms_insert_map["room_name"] = room_name
else:
raise AssertionError(
f"Unexpected event (we should not be fetching extra events): ({event_type}, {state_key})"
)
# Update the `sliding_sync_non_join_memberships` table
insert_keys = sliding_sync_non_joined_rooms_insert_map.keys()
insert_values = sliding_sync_non_joined_rooms_insert_map.values()
# We `DO NOTHING` on conflict because if the row is already in the database,
# we just assume that it was already processed (values should be the same anyways).
#
# TODO: Only do this for non-join membership
txn.execute_batch(
f"""
INSERT INTO sliding_sync_non_join_memberships
(room_id, membership_event_id, user_id, membership, event_stream_ordering, {", ".join(insert_keys)})
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?),
{", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id, membership_event_id)
DO NOTHING
""",
[
if len(membership_event_id_to_user_id_map) > 0:
# Fetch the events from the database
#
# TODO: We should gather this data before we delete the
# `current_state_events` in a `no_longer_in_room` situation.
(
event_type_and_state_key_in_list_clause,
event_type_and_state_key_args,
) = make_tuple_in_list_sql_clause(
self.database_engine,
("type", "state_key"),
[
room_id,
membership_event_id,
user_id,
membership_event_id,
membership_event_id,
]
+ list(insert_values)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
],
)
(EventTypes.Create, ""),
(EventTypes.RoomEncryption, ""),
(EventTypes.Name, ""),
],
)
txn.execute(
f"""
SELECT c.event_id, c.type, c.state_key, j.json
FROM current_state_events AS c
INNER JOIN event_json AS j USING (event_id)
WHERE
c.room_id = ?
AND {event_type_and_state_key_in_list_clause}
""",
[room_id] + event_type_and_state_key_args,
)
# Parse the raw event JSON
sliding_sync_non_joined_rooms_insert_map: Dict[
str, Optional[Union[str, bool]]
] = {}
for row in txn:
event_id, event_type, state_key, json = row
event_json = db_to_json(json)
if event_type == EventTypes.Create:
room_type = event_json.get("content", {}).get(
EventContentFields.ROOM_TYPE
)
sliding_sync_non_joined_rooms_insert_map["room_type"] = (
room_type
)
elif event_type == EventTypes.RoomEncryption:
is_encrypted = event_json.get("content", {}).get(
EventContentFields.ENCRYPTION_ALGORITHM
)
sliding_sync_non_joined_rooms_insert_map["is_encrypted"] = (
is_encrypted
)
elif event_type == EventTypes.Name:
room_name = event_json.get("content", {}).get(
EventContentFields.ROOM_NAME
)
sliding_sync_non_joined_rooms_insert_map["room_name"] = (
room_name
)
else:
raise AssertionError(
f"Unexpected event (we should not be fetching extra events): ({event_type}, {state_key})"
)
# Update the `sliding_sync_non_join_memberships` table
insert_keys = sliding_sync_non_joined_rooms_insert_map.keys()
insert_values = sliding_sync_non_joined_rooms_insert_map.values()
# TODO: Only do this for non-join membership
txn.execute_batch(
f"""
INSERT INTO sliding_sync_non_join_memberships
(room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)})
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?),
{", ".join("?" for _ in insert_values)}
)
ON CONFLICT (room_id, user_id)
DO UPDATE SET
membership_event_id = EXCLUDED.membership_event_id,
membership = EXCLUDED.membership,
event_stream_ordering = EXCLUDED.event_stream_ordering,
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
""",
[
[
room_id,
user_id,
membership_event_id,
membership_event_id,
membership_event_id,
]
+ list(insert_values)
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
],
)
txn.call_after(
self.store._curr_state_delta_stream_cache.entity_has_changed,

View File

@@ -36,8 +36,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
-- the membership event itself as the `bump_stamp`.
CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
room_id TEXT NOT NULL REFERENCES rooms(room_id),
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
user_id TEXT NOT NULL,
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
membership TEXT NOT NULL,
-- `stream_ordering` of the `membership_event_id`
event_stream_ordering BIGINT REFERENCES events(stream_ordering),
@@ -50,6 +50,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
-- time of the membership)
is_encrypted BOOLEAN,
PRIMARY KEY (room_id, membership_event_id)
PRIMARY KEY (room_id, user_id)
);
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_event_stream_ordering ON sliding_sync_non_join_memberships(event_stream_ordering);
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_membership_event_id ON sliding_sync_non_join_memberships(membership_event_id);