1
0

Work on background update for sliding_sync_membership_snapshots

This commit is contained in:
Eric Eastwood
2024-08-15 00:21:35 -05:00
parent d113e743ae
commit 4b42e44ef9
2 changed files with 288 additions and 142 deletions

View File

@@ -1243,8 +1243,10 @@ class PersistEventsStore:
str, Optional[Union[str, bool]]
] = {}
if current_state_map:
sliding_sync_membership_snapshots_insert_map = self._get_sliding_sync_insert_values_according_to_current_state_map_txn(
txn, current_state_map
sliding_sync_membership_snapshots_insert_map = (
self._get_sliding_sync_insert_values_from_current_state_map_txn(
txn, current_state_map
)
)
# We have current state to work from
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
@@ -1279,19 +1281,19 @@ class PersistEventsStore:
f"""
INSERT INTO sliding_sync_membership_snapshots
(room_id, user_id, membership_event_id, membership, event_stream_ordering
{"," + (", ".join(insert_keys)) if insert_keys else ""})
{("," + ", ".join(insert_keys)) if insert_keys else ""})
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
{"," + (", ".join("?" for _ in insert_values)) if insert_values else ""}
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
)
ON CONFLICT (room_id, user_id)
DO UPDATE SET
membership_event_id = EXCLUDED.membership_event_id,
membership = EXCLUDED.membership,
event_stream_ordering = EXCLUDED.event_stream_ordering
{"," + (", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""}
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""}
""",
[
[
@@ -1636,7 +1638,7 @@ class PersistEventsStore:
return current_state_map
@classmethod
def _get_sliding_sync_insert_values_according_to_current_state_map_txn(
def _get_sliding_sync_insert_values_from_current_state_map_txn(
cls, txn: LoggingTransaction, current_state_map: StateMap[str]
) -> Dict[str, Optional[Union[str, bool]]]:
"""
@@ -2448,91 +2450,27 @@ class PersistEventsStore:
"membership": event.membership,
"event_stream_ordering": event.internal_metadata.stream_ordering,
}
if raw_stripped_state_events is not None:
stripped_state_map: MutableStateMap[StrippedStateEvent] = {}
if isinstance(raw_stripped_state_events, list):
for raw_stripped_event in raw_stripped_state_events:
stripped_state_event = parse_stripped_state_event(
raw_stripped_event
)
if stripped_state_event is not None:
stripped_state_map[
(
stripped_state_event.type,
stripped_state_event.state_key,
)
] = stripped_state_event
# If there is some stripped state, we assume the remote server passed *all*
# of the potential stripped state events for the room.
create_stripped_event = stripped_state_map.get(
(EventTypes.Create, "")
if event.membership == Membership.LEAVE:
# Inherit the meta data from the remote invite/knock. When using
# sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
pass
elif event.membership in (Membership.INVITE, Membership.KNOCK):
extra_insert_values = (
self._get_sliding_sync_insert_values_from_stripped_state_txn(
txn, raw_stripped_state_events
)
)
# Sanity check that we at-least have the create event
if create_stripped_event is not None:
insert_values["has_known_state"] = True
# Find the room_type
insert_values["room_type"] = (
create_stripped_event.content.get(
EventContentFields.ROOM_TYPE
)
if create_stripped_event is not None
else None
)
# Find whether the room is_encrypted
encryption_stripped_event = stripped_state_map.get(
(EventTypes.RoomEncryption, "")
)
encryption = (
encryption_stripped_event.content.get(
EventContentFields.ENCRYPTION_ALGORITHM
)
if encryption_stripped_event is not None
else None
)
insert_values["is_encrypted"] = encryption is not None
# Find the room_name
room_name_stripped_event = stripped_state_map.get(
(EventTypes.Name, "")
)
insert_values["room_name"] = (
room_name_stripped_event.content.get(
EventContentFields.ROOM_NAME
)
if room_name_stripped_event is not None
else None
)
else:
# No strip state provided
insert_values["has_known_state"] = False
insert_values["room_type"] = None
insert_values["room_name"] = None
insert_values["is_encrypted"] = False
insert_values.update(extra_insert_values)
else:
if event.membership == Membership.LEAVE:
# Inherit the meta data from the remote invite/knock. When using
# sliding sync filters, this will prevent the room from
# disappearing/appearing just because you left the room.
pass
elif event.membership in (Membership.INVITE, Membership.KNOCK):
# No strip state provided
insert_values["has_known_state"] = False
insert_values["room_type"] = None
insert_values["room_name"] = None
insert_values["is_encrypted"] = False
else:
# We don't know how to handle this type of membership yet
#
# FIXME: We should use `assert_never` here but for some reason
# the exhaustive matching doesn't recognize the `Never` here.
# assert_never(event.membership)
raise AssertionError(
f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet"
)
# We don't know how to handle this type of membership yet
#
# FIXME: We should use `assert_never` here but for some reason
# the exhaustive matching doesn't recognize the `Never` here.
# assert_never(event.membership)
raise AssertionError(
f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet"
)
self.db_pool.simple_upsert_txn(
txn,
@@ -2544,6 +2482,85 @@ class PersistEventsStore:
values=insert_values,
)
@classmethod
def _get_sliding_sync_insert_values_from_stripped_state_txn(
cls, txn: LoggingTransaction, unsigned_stripped_state_events: Any
) -> Dict[str, Optional[Union[str, bool]]]:
"""
TODO
Returns:
Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant
state values needed to insert into the `sliding_sync_membership_snapshots` tables.
"""
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_insert_map: Dict[str, Optional[Union[str, bool]]] = {}
if unsigned_stripped_state_events is not None:
stripped_state_map: MutableStateMap[StrippedStateEvent] = {}
if isinstance(unsigned_stripped_state_events, list):
for raw_stripped_event in unsigned_stripped_state_events:
stripped_state_event = parse_stripped_state_event(
raw_stripped_event
)
if stripped_state_event is not None:
stripped_state_map[
(
stripped_state_event.type,
stripped_state_event.state_key,
)
] = stripped_state_event
# If there is some stripped state, we assume the remote server passed *all*
# of the potential stripped state events for the room.
create_stripped_event = stripped_state_map.get((EventTypes.Create, ""))
# Sanity check that we at-least have the create event
if create_stripped_event is not None:
sliding_sync_insert_map["has_known_state"] = True
# Find the room_type
sliding_sync_insert_map["room_type"] = (
create_stripped_event.content.get(EventContentFields.ROOM_TYPE)
if create_stripped_event is not None
else None
)
# Find whether the room is_encrypted
encryption_stripped_event = stripped_state_map.get(
(EventTypes.RoomEncryption, "")
)
encryption = (
encryption_stripped_event.content.get(
EventContentFields.ENCRYPTION_ALGORITHM
)
if encryption_stripped_event is not None
else None
)
sliding_sync_insert_map["is_encrypted"] = encryption is not None
# Find the room_name
room_name_stripped_event = stripped_state_map.get((EventTypes.Name, ""))
sliding_sync_insert_map["room_name"] = (
room_name_stripped_event.content.get(EventContentFields.ROOM_NAME)
if room_name_stripped_event is not None
else None
)
else:
# No strip state provided
sliding_sync_insert_map["has_known_state"] = False
sliding_sync_insert_map["room_type"] = None
sliding_sync_insert_map["room_name"] = None
sliding_sync_insert_map["is_encrypted"] = False
else:
# No strip state provided
sliding_sync_insert_map["has_known_state"] = False
sliding_sync_insert_map["room_type"] = None
sliding_sync_insert_map["room_name"] = None
sliding_sync_insert_map["is_encrypted"] = False
return sliding_sync_insert_map
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:

View File

@@ -20,11 +20,12 @@
#
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast
import attr
from typing_extensions import assert_never
from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.constants import EventContentFields, Membership, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -1598,7 +1599,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
return 0
for (room_id,) in rooms_to_update_rows:
logger.info("asdf Working on room %s", room_id)
current_state_map = PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn(
txn, room_id
)
@@ -1606,7 +1606,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# so we should have some current state for each room
assert current_state_map
sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_according_to_current_state_map_txn(
sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_current_state_map_txn(
txn, current_state_map
)
# We should have some insert values for each room, even if they are `None`
@@ -1679,62 +1679,191 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
"""
Handles backfilling the `sliding_sync_membership_snapshots` table.
"""
# last_event_stream_ordering = progress.get("last_event_stream_ordering", "")
last_event_stream_ordering = progress.get(
"last_event_stream_ordering", -(1 << 31)
)
def _txn(txn: LoggingTransaction) -> int:
# # Fetch the set of event IDs that we want to update
# txn.execute(
# """
# SELECT room_id, user_id, event_id FROM local_current_membership
# WHERE event_stream_ordering > ?
# ORDER BY event_stream_ordering ASC
# LIMIT ?
# """,
# (last_event_stream_ordering, batch_size),
# )
# Fetch the set of event IDs that we want to update
txn.execute(
"""
SELECT
c.room_id,
c.user_id,
c.event_id,
c.membership,
c.event_stream_ordering,
e.outlier
FROM local_current_membership as c
INNER JOIN events AS e USING (event_id)
WHERE event_stream_ordering > ?
ORDER BY event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)
# rows = txn.fetchall()
# if not rows:
# return 0
memberships_to_update_rows = txn.fetchall()
if not memberships_to_update_rows:
return 0
# # Update the redactions with the received_ts.
# #
# # Note: Not all events have an associated received_ts, so we
# # fallback to using origin_server_ts. If we for some reason don't
# # have an origin_server_ts, lets just use the current timestamp.
# #
# # We don't want to leave it null, as then we'll never try and
# # censor those redactions.
# txn.execute_batch(
# f"""
# INSERT INTO sliding_sync_membership_snapshots
# (room_id, user_id, membership_event_id, membership, event_stream_ordering
# {"," + (", ".join(insert_keys)) if insert_keys else ""})
# VALUES (
# ?, ?, ?,
# (SELECT membership FROM room_memberships WHERE event_id = ?),
# (SELECT stream_ordering FROM events WHERE event_id = ?)
# {"," + (", ".join("?" for _ in insert_values)) if insert_values else ""}
# )
# ON CONFLICT (room_id, user_id)
# DO UPDATE SET
# membership_event_id = EXCLUDED.membership_event_id,
# membership = EXCLUDED.membership,
# event_stream_ordering = EXCLUDED.event_stream_ordering
# {"," + (", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""}
# """,
# (TODO,),
# )
for (
room_id,
user_id,
membership_event_id,
membership,
_membership_event_stream_ordering,
is_outlier,
) in memberships_to_update_rows:
# We don't know how to handle `membership` values other than these. The
# code below would need to be updated.
assert membership in (
Membership.JOIN,
Membership.INVITE,
Membership.KNOCK,
Membership.LEAVE,
Membership.BAN,
)
# self.db_pool.updates._background_update_progress_txn(
# txn, "redactions_received_ts", {"last_event_id": upper_event_id}
# )
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_membership_snapshots_insert_map: Dict[
str, Optional[Union[str, bool]]
] = {}
if membership == Membership.JOIN:
# If we're still joined, we can pull from current state
current_state_map = PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn(
txn, room_id
)
# We're iterating over rooms that we are joined to so they should
# have `current_state_events` and we should have some current state
# for each room
assert current_state_map
# return len(rows)
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_current_state_map_txn(
txn, current_state_map
)
# We should have some insert values for each room, even if they are `None`
assert sliding_sync_membership_snapshots_insert_map
# TODO
# return len(rows)
return 0
# We have current state to work from
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
True
)
elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership == Membership.LEAVE and is_outlier
):
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
# If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from
# the stripped state on the previous invite/knock event. This gives
# us a consistent view of the room state regardless of your
# membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave).
if membership == Membership.LEAVE and is_outlier:
# Find the previous invite/knock event before the leave event
txn.execute(
"""
SELECT event_id, membership
FROM room_memberships
WHERE
room_id = ?
AND user_id = ?
AND event_stream_ordering < ?
ORDER BY event_stream_ordering DESC
LIMIT 1
"""
)
row = txn.fetchone()
# We should see a corresponding previous invite/knock event
assert row is not None
invite_or_knock_event_id, invite_or_knock_membership = row
# Pull from the stripped state on the invite/knock event
txn.execute(
"""
SELECT json FROM event_json
WHERE event_id = ?
""",
(invite_or_knock_event_id),
)
row = txn.fetchone()
# We should find a corresponding event
assert row is not None
json = row[0]
event_json = db_to_json(json)
raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = event_json.get("unsigned").get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = event_json.get("unsigned").get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state_txn(
txn, raw_stripped_state_events
)
# We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record
# that we have no known state
assert sliding_sync_membership_snapshots_insert_map
elif membership == Membership.BAN:
# Pull from historical state
# TODO
pass
else:
assert_never(membership)
# Pulling keys/values separately is safe and will produce congruent
# lists
insert_keys = sliding_sync_membership_snapshots_insert_map.keys()
insert_values = sliding_sync_membership_snapshots_insert_map.values()
# We don't need to do anything `ON CONFLICT` because we never partially
# insert/update the snapshots
txn.execute(
f"""
INSERT INTO sliding_sync_membership_snapshots
(room_id, user_id, membership_event_id, membership, event_stream_ordering
{("," + ", ".join(insert_keys)) if insert_keys else ""})
VALUES (
?, ?, ?, ?,
(SELECT stream_ordering FROM events WHERE event_id = ?)
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
)
ON CONFLICT (room_id, user_id)
DO NOTHING
""",
[
room_id,
user_id,
membership_event_id,
membership,
membership_event_id,
]
+ list(insert_values),
)
(
_room_id,
_user_id,
_membership_event_id,
_membership,
membership_event_stream_ordering,
_is_outlier,
) = memberships_to_update_rows[-1]
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BACKFILL,
{"last_event_stream_ordering": membership_event_stream_ordering},
)
return len(memberships_to_update_rows)
count = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_backfill", _txn