Compare commits

...

11 Commits

Author SHA1 Message Date
Andrew Morgan
6dc4fda9c7 Remove entries from 'account_data_undelivered_deletes' when account data is added/modified
This table should only be tracking deleted account data
entries.
2022-12-19 16:44:24 +00:00
Andrew Morgan
c8721fd25b Populate 'account_data_undelivered_deletes' table on delete 2022-12-19 16:44:24 +00:00
Andrew Morgan
fdf9e2f9d6 Add migration for table account_data_undelivered_deletes 2022-12-19 16:44:24 +00:00
Andrew Morgan
8e35bfc889 Prevent deleted account data items appearing in initial syncs 2022-12-19 16:44:24 +00:00
Andrew Morgan
47fe40b7ca Return a 404 if an account data type is empty 2022-12-19 16:44:24 +00:00
Andrew Morgan
5c7d2e05e8 Allow deleting account data by PUT'ing with empty content
MSC3391 specifies that for backwards compatibility purposes, setting an account data type's content to {}
should be equivalent to deleting that account data. That call should succeed regardless of whether the
account data existed previously or not.
2022-12-19 16:44:24 +00:00
Andrew Morgan
f112fd6c12 Add servlets, handler, storage functions for deleting user/room account data 2022-12-19 16:44:24 +00:00
Andrew Morgan
8c255cbb1d Set 'experimental_features.msc3391_enabled' config to true for complement tests 2022-12-19 16:44:24 +00:00
Andrew Morgan
f66d743eae Add replication methods for removing account data
Also rename existing account data methods, explicitly stating that they're for adding account data.
2022-12-19 16:44:24 +00:00
Andrew Morgan
16cccade57 Enable Complement tests for MSC3391 2022-12-19 16:44:24 +00:00
Andrew Morgan
c5765a480f Add experimental features flag 2022-12-19 16:44:24 +00:00
10 changed files with 708 additions and 19 deletions

View File

@@ -102,6 +102,8 @@ experimental_features:
{% endif %}
# Filtering /messages by relation type.
msc3874_enabled: true
# Enable removing account data support
msc3391_enabled: true
server_notices:
system_mxid_localpart: _server

View File

@@ -190,7 +190,7 @@ fi
extra_test_args=()
test_tags="synapse_blacklist,msc3787,msc3874"
test_tags="synapse_blacklist,msc3787,msc3874,msc3391"
# All environment variables starting with PASS_ will be shared.
# (The prefix is stripped off before reaching the container.)

View File

@@ -136,3 +136,6 @@ class ExperimentalConfig(Config):
# Enable room version (and thus applicable push rules from MSC3931/3932)
version_id = RoomVersions.MSC1767v10.identifier
KNOWN_ROOM_VERSIONS[version_id] = RoomVersions.MSC1767v10
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)

View File

@@ -17,10 +17,12 @@ import random
from typing import TYPE_CHECKING, Awaitable, Callable, Collection, List, Optional, Tuple
from synapse.replication.http.account_data import (
ReplicationAddRoomAccountDataRestServlet,
ReplicationAddTagRestServlet,
ReplicationAddUserAccountDataRestServlet,
ReplicationRemoveRoomAccountDataRestServlet,
ReplicationRemoveTagRestServlet,
ReplicationRoomAccountDataRestServlet,
ReplicationUserAccountDataRestServlet,
ReplicationRemoveUserAccountDataRestServlet,
)
from synapse.streams import EventSource
from synapse.types import JsonDict, StreamKeyType, UserID
@@ -41,8 +43,18 @@ class AccountDataHandler:
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
self._user_data_client = ReplicationUserAccountDataRestServlet.make_client(hs)
self._room_data_client = ReplicationRoomAccountDataRestServlet.make_client(hs)
self._add_user_data_client = (
ReplicationAddUserAccountDataRestServlet.make_client(hs)
)
self._remove_user_data_client = (
ReplicationRemoveUserAccountDataRestServlet.make_client(hs)
)
self._add_room_data_client = (
ReplicationAddRoomAccountDataRestServlet.make_client(hs)
)
self._remove_room_data_client = (
ReplicationRemoveRoomAccountDataRestServlet.make_client(hs)
)
self._add_tag_client = ReplicationAddTagRestServlet.make_client(hs)
self._remove_tag_client = ReplicationRemoveTagRestServlet.make_client(hs)
self._account_data_writers = hs.config.worker.writers.account_data
@@ -112,7 +124,7 @@ class AccountDataHandler:
return max_stream_id
else:
response = await self._room_data_client(
response = await self._add_room_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
@@ -121,15 +133,59 @@ class AccountDataHandler:
)
return response["max_stream_id"]
async def remove_account_data_for_room(
self, user_id: str, room_id: str, account_data_type: str
) -> Optional[int]:
"""
Deletes the room account data for the given user and account data type.
"Deleting" account data merely means setting the content of the account data
to an empty JSON object: {}.
Args:
user_id: The user ID to remove room account data for.
room_id: The room ID to target.
account_data_type: The account data type to remove.
Returns:
The maximum stream ID, or None if the room account data item did not exist.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.remove_account_data_for_room(
user_id, room_id, account_data_type
)
if max_stream_id is None:
# The referenced account data did not exist, so no delete occurred.
return None
self._notifier.on_new_event(
StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
# Notify Synapse modules that the content of the type has changed to an
# empty dictionary.
await self._notify_modules(user_id, room_id, account_data_type, {})
return max_stream_id
else:
response = await self._remove_room_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
account_data_type=account_data_type,
content={},
)
return response["max_stream_id"]
async def add_account_data_for_user(
self, user_id: str, account_data_type: str, content: JsonDict
) -> int:
"""Add some global account_data for a user.
Args:
user_id: The user to add a tag for.
user_id: The user to add some account data for.
account_data_type: The type of account_data to add.
content: A json object to associate with the tag.
content: The content json dictionary.
Returns:
The maximum stream ID.
@@ -148,7 +204,7 @@ class AccountDataHandler:
return max_stream_id
else:
response = await self._user_data_client(
response = await self._add_user_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
account_data_type=account_data_type,
@@ -156,6 +212,45 @@ class AccountDataHandler:
)
return response["max_stream_id"]
async def remove_account_data_for_user(
self, user_id: str, account_data_type: str
) -> Optional[int]:
"""Removes a piece of global account_data for a user.
Args:
user_id: The user to remove account data for.
account_data_type: The type of account_data to remove.
Returns:
The maximum stream ID, or None if the room account data item did not exist.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.remove_account_data_for_user(
user_id, account_data_type
)
if max_stream_id is None:
# The referenced account data did not exist, so no delete occurred.
return None
self._notifier.on_new_event(
StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id]
)
# Notify Synapse modules that the content of the type has changed to an
# empty dictionary.
await self._notify_modules(user_id, None, account_data_type, {})
return max_stream_id
else:
response = await self._remove_user_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
account_data_type=account_data_type,
content={},
)
return response["max_stream_id"]
async def add_tag_to_room(
self, user_id: str, room_id: str, tag: str, content: JsonDict
) -> int:

View File

@@ -28,7 +28,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
class ReplicationAddUserAccountDataRestServlet(ReplicationEndpoint):
"""Add user account data on the appropriate account data worker.
Request format:
@@ -73,7 +73,46 @@ class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
return 200, {"max_stream_id": max_stream_id}
class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
class ReplicationRemoveUserAccountDataRestServlet(ReplicationEndpoint):
"""Remove user account data on the appropriate account data worker.
Request format:
POST /_synapse/replication/remove_user_account_data/:user_id/:type
{
"content": { ... },
}
"""
NAME = "remove_user_account_data"
PATH_ARGS = ("user_id", "account_data_type")
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str, account_data_type: str
) -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
self, request: Request, user_id: str, account_data_type: str
) -> Tuple[int, JsonDict]:
max_stream_id = await self.handler.remove_account_data_for_user(
user_id, account_data_type
)
return 200, {"max_stream_id": max_stream_id}
class ReplicationAddRoomAccountDataRestServlet(ReplicationEndpoint):
"""Add room account data on the appropriate account data worker.
Request format:
@@ -118,6 +157,45 @@ class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
return 200, {"max_stream_id": max_stream_id}
class ReplicationRemoveRoomAccountDataRestServlet(ReplicationEndpoint):
"""Remove room account data on the appropriate account data worker.
Request format:
POST /_synapse/replication/remove_room_account_data/:user_id/:room_id/:account_data_type
{
"content": { ... },
}
"""
NAME = "remove_room_account_data"
PATH_ARGS = ("user_id", "room_id", "account_data_type")
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str, room_id: str, account_data_type: str, content: JsonDict
) -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
self, request: Request, user_id: str, room_id: str, account_data_type: str
) -> Tuple[int, JsonDict]:
max_stream_id = await self.handler.remove_account_data_for_room(
user_id, room_id, account_data_type
)
return 200, {"max_stream_id": max_stream_id}
class ReplicationAddTagRestServlet(ReplicationEndpoint):
"""Add tag on the appropriate account data worker.
@@ -206,7 +284,9 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserAccountDataRestServlet(hs).register(http_server)
ReplicationRoomAccountDataRestServlet(hs).register(http_server)
ReplicationAddUserAccountDataRestServlet(hs).register(http_server)
ReplicationRemoveUserAccountDataRestServlet(hs).register(http_server)
ReplicationAddRoomAccountDataRestServlet(hs).register(http_server)
ReplicationRemoveRoomAccountDataRestServlet(hs).register(http_server)
ReplicationAddTagRestServlet(hs).register(http_server)
ReplicationRemoveTagRestServlet(hs).register(http_server)

View File

@@ -41,6 +41,7 @@ class AccountDataServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.handler = hs.get_account_data_handler()
@@ -54,6 +55,16 @@ class AccountDataServlet(RestServlet):
body = parse_json_object_from_request(request)
# If experimental support for MSC3391 is enabled, then providing an empty dict
# as the value for an account data type should be functionally equivalent to
# calling the DELETE method on the same type.
if self._hs.config.experimental.msc3391_enabled:
if body == {}:
await self.handler.remove_account_data_for_user(
user_id, account_data_type
)
return 200, {}
await self.handler.add_account_data_for_user(user_id, account_data_type, body)
return 200, {}
@@ -72,9 +83,49 @@ class AccountDataServlet(RestServlet):
if event is None:
raise NotFoundError("Account data not found")
# If experimental support for MSC3391 is enabled, then this endpoint should
# return a 404 if the content for an account data type is an empty dict.
if self._hs.config.experimental.msc3391_enabled and event == {}:
raise NotFoundError("Account data not found")
return 200, event
class UnstableAccountDataServlet(RestServlet):
"""
Contains an unstable endpoint for removing user account data, as specified by
MSC3391. If that MSC is accepted, this code should have unstable prefixes removed
and become incorporated into AccountDataServlet above.
"""
PATTERNS = client_patterns(
"/org.matrix.msc3391/user/(?P<user_id>[^/]*)"
"/account_data/(?P<account_data_type>[^/]*)",
unstable=True,
releases=(),
)
def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.handler = hs.get_account_data_handler()
async def on_DELETE(
self,
request: SynapseRequest,
user_id: str,
account_data_type: str,
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
if user_id != requester.user.to_string():
raise AuthError(403, "Cannot delete account data for other users.")
await self.handler.remove_account_data_for_user(user_id, account_data_type)
return 200, {}
class RoomAccountDataServlet(RestServlet):
"""
PUT /user/{user_id}/rooms/{room_id}/account_data/{account_dataType} HTTP/1.1
@@ -89,6 +140,7 @@ class RoomAccountDataServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.handler = hs.get_account_data_handler()
@@ -121,6 +173,16 @@ class RoomAccountDataServlet(RestServlet):
Codes.BAD_JSON,
)
# If experimental support for MSC3391 is enabled, then providing an empty dict
# as the value for an account data type should be functionally equivalent to
# calling the DELETE method on the same type.
if self._hs.config.experimental.msc3391_enabled:
if body == {}:
await self.handler.remove_account_data_for_room(
user_id, room_id, account_data_type
)
return 200, {}
await self.handler.add_account_data_to_room(
user_id, room_id, account_data_type, body
)
@@ -152,9 +214,64 @@ class RoomAccountDataServlet(RestServlet):
if event is None:
raise NotFoundError("Room account data not found")
# If experimental support for MSC3391 is enabled, then this endpoint should
# return a 404 if the content for an account data type is an empty dict.
if self._hs.config.experimental.msc3391_enabled and event == {}:
raise NotFoundError("Room account data not found")
return 200, event
class UnstableRoomAccountDataServlet(RestServlet):
"""
Contains an unstable endpoint for removing room account data, as specified by
MSC3391. If that MSC is accepted, this code should have unstable prefixes removed
and become incorporated into RoomAccountDataServlet above.
"""
PATTERNS = client_patterns(
"/org.matrix.msc3391/user/(?P<user_id>[^/]*)"
"/rooms/(?P<room_id>[^/]*)"
"/account_data/(?P<account_data_type>[^/]*)",
unstable=True,
releases=(),
)
def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.handler = hs.get_account_data_handler()
async def on_DELETE(
self,
request: SynapseRequest,
user_id: str,
room_id: str,
account_data_type: str,
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
if user_id != requester.user.to_string():
raise AuthError(403, "Cannot delete account data for other users.")
if not RoomID.is_valid(room_id):
raise SynapseError(
400,
f"{room_id} is not a valid room ID",
Codes.INVALID_PARAM,
)
await self.handler.remove_account_data_for_room(
user_id, room_id, account_data_type
)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
AccountDataServlet(hs).register(http_server)
RoomAccountDataServlet(hs).register(http_server)
if hs.config.experimental.msc3391_enabled:
UnstableAccountDataServlet(hs).register(http_server)
UnstableRoomAccountDataServlet(hs).register(http_server)

View File

@@ -1898,6 +1898,19 @@ class DatabasePool:
updatevalues: Dict[str, Any],
desc: str,
) -> int:
"""
Update rows in the given database table.
If the given keyvalues don't match anything, nothing will be updated.
Args:
table: The database table to update.
keyvalues: A mapping of column name to value to match rows on.
updatevalues: A mapping of column name to value to replace in any matched rows.
desc: description of the transaction, for logging and metrics.
Returns:
The number of rows that were updated. Will be 0 if no matching rows were found.
"""
return await self.runInteraction(
desc, self.simple_update_txn, table, keyvalues, updatevalues
)
@@ -1909,6 +1922,19 @@ class DatabasePool:
keyvalues: Dict[str, Any],
updatevalues: Dict[str, Any],
) -> int:
"""
Update rows in the given database table.
If the given keyvalues don't match anything, nothing will be updated.
Args:
txn: The database transaction object.
table: The database table to update.
keyvalues: A mapping of column name to value to match rows on.
updatevalues: A mapping of column name to value to replace in any matched rows.
Returns:
The number of rows that were updated. Will be 0 if no matching rows were found.
"""
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
else:

View File

@@ -35,6 +35,7 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
@@ -123,7 +124,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
async def get_account_data_for_user(
self, user_id: str
) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]:
"""Get all the client account_data for a user.
"""
Get all the client account_data for a user.
If experimental MSC3391 support is enabled, any entries with an empty
content body are excluded; as this means they have been deleted.
Args:
user_id: The user to get the account_data for.
@@ -142,6 +147,12 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
["account_data_type", "content"],
)
# If experimental MSC3391 support is enabled, then account data entries
# with an empty content are considered "deleted". So skip adding them to
# the results.
if self.hs.config.experimental.msc3391_enabled:
rows = [row for row in rows if row["content"] != "{}"]
global_account_data = {
row["account_data_type"]: db_to_json(row["content"]) for row in rows
}
@@ -156,6 +167,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
by_room: Dict[str, Dict[str, JsonDict]] = {}
for row in rows:
room_data = by_room.setdefault(row["room_id"], {})
# If experimental MSC3391 support is enabled, then account data entries
# with an empty content are considered "deleted". So skip adding them to
# the results.
if (
self.hs.config.experimental.msc3391_enabled
and row["content"] == "{}"
):
continue
room_data[row["account_data_type"]] = db_to_json(row["content"])
return global_account_data, by_room
@@ -448,9 +469,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
content_json = json_encoder.encode(content)
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.simple_upsert(
desc="add_room_account_data",
def _add_account_data_to_room(txn: LoggingTransaction, next_id: int) -> None:
self.db_pool.simple_upsert_txn(
txn,
table="room_account_data",
keyvalues={
"user_id": user_id,
@@ -460,6 +481,18 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
values={"stream_id": next_id, "content": content_json},
)
# Clear any previous record that this user account data type was deleted.
self._remove_entries_from_account_data_undelivered_deletes_for_type_txn(
txn, account_data_type, room_id, user_id
)
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"add_account_data_to_room",
_add_account_data_to_room,
next_id,
)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
@@ -469,6 +502,85 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
return self._account_data_id_gen.get_current_token()
async def remove_account_data_for_room(
self, user_id: str, room_id: str, account_data_type: str
) -> Optional[int]:
"""Delete the room account data for the user of a given type.
Args:
user_id: The user to remove account_data for.
room_id: The room ID to scope the request to.
account_data_type: The account data type to delete.
Returns:
The maximum stream position, or None if there was no matching room account
data to delete.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)
def _remove_account_data_for_room_txn(
txn: LoggingTransaction, next_id: int
) -> bool:
"""
Args:
txn: The transaction object.
next_id: The stream_id to update any existing rows to.
Returns:
True if an entry in room_account_data had its content set to '{}',
otherwise False. This informs callers of whether there actually was an
existing room account data entry to delete, or if the call was a no-op.
"""
sql = """
UPDATE room_account_data
SET stream_id = ?, content = '{}'
WHERE user_id = ?
AND room_id = ?
AND account_data_type = ?
AND content != '{}'
"""
txn.execute(
sql,
(next_id, user_id, room_id, account_data_type),
)
if txn.rowcount == 0:
# We didn't update any rows. This means that there was no matching room
# account data entry to delete in the first place.
return False
# Record that this account data was deleted along with the devices that
# have yet to see it. Once all devices have later seen the delete, we can
# fully purge the row from `room_account_data`.
self._add_entries_to_account_data_undelivered_deletes_txn(
txn,
stream_id=next_id,
account_data_type=account_data_type,
room_id=room_id,
user_id=user_id,
)
return True
async with self._account_data_id_gen.get_next() as next_id:
row_updated = await self.db_pool.runInteraction(
"remove_account_data_for_room",
_remove_account_data_for_room_txn,
next_id,
)
if not row_updated:
return None
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), {}
)
return self._account_data_id_gen.get_current_token()
async def add_account_data_for_user(
self, user_id: str, account_data_type: str, content: JsonDict
) -> int:
@@ -520,6 +632,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
values={"stream_id": next_id, "content": content_json},
)
# Clear any previous record that this user account data type was deleted.
self._remove_entries_from_account_data_undelivered_deletes_for_type_txn(
txn, account_data_type, room_id=None, user_id=user_id
)
# Ignored users get denormalized into a separate table as an optimisation.
if account_data_type != AccountDataTypes.IGNORED_USER_LIST:
return
@@ -569,6 +686,192 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
async def remove_account_data_for_user(
self,
user_id: str,
account_data_type: str,
) -> Optional[int]:
"""
Delete a single piece of user account data by type.
A "delete" is performed by updating a potentially existing row in the
"account_data" database table for (user_id, account_data_type) and
setting its content to "{}".
Args:
user_id: The user ID to modify the account data of.
account_data_type: The type to remove.
Returns:
The maximum stream position, or None if there was no matching account data
to delete.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)
def _remove_account_data_for_user_txn(
txn: LoggingTransaction, next_id: int
) -> bool:
"""
Args:
txn: The transaction object.
next_id: The stream_id to update any existing rows to.
Returns:
True if an entry in account_data had its content set to '{}', otherwise
False. This informs callers of whether there actually was an existing
account data entry to delete, or if the call was a no-op.
"""
sql = """
UPDATE account_data
SET stream_id = ?, content = '{}'
WHERE user_id = ?
AND account_data_type = ?
AND content != '{}'
"""
txn.execute(sql, (next_id, user_id, account_data_type))
if txn.rowcount == 0:
# We didn't update any rows. This means that there was no matching room
# account data entry to delete in the first place.
return False
# Record that this account data was deleted along with the devices that
# have yet to see it. Once all devices have later seen the delete, we can
# fully purge the row from `account_data`.
self._add_entries_to_account_data_undelivered_deletes_txn(
txn,
stream_id=next_id,
account_data_type=account_data_type,
room_id=None,
user_id=user_id,
)
# Ignored users get denormalized into a separate table as an optimisation.
if account_data_type == AccountDataTypes.IGNORED_USER_LIST:
# If this method was called with the ignored users account data type, we
# simply delete all ignored users.
# First pull all the users that this user ignores.
previously_ignored_users = set(
self.db_pool.simple_select_onecol_txn(
txn,
table="ignored_users",
keyvalues={"ignorer_user_id": user_id},
retcol="ignored_user_id",
)
)
# Then delete them from the database.
self.db_pool.simple_delete_txn(
txn,
table="ignored_users",
keyvalues={"ignorer_user_id": user_id},
)
# Invalidate the cache for ignored users which were removed.
for ignored_user_id in previously_ignored_users:
self._invalidate_cache_and_stream(
txn, self.ignored_by, (ignored_user_id,)
)
# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
return True
async with self._account_data_id_gen.get_next() as next_id:
row_updated = await self.db_pool.runInteraction(
"remove_account_data_for_user",
_remove_account_data_for_user_txn,
next_id,
)
if not row_updated:
return None
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.prefill(
(user_id, account_data_type), {}
)
return self._account_data_id_gen.get_current_token()
def _add_entries_to_account_data_undelivered_deletes_txn(
self,
txn: LoggingTransaction,
stream_id: int,
account_data_type: str,
room_id: Optional[str],
user_id: str,
) -> None:
"""
Adds an entry per device of the given user to the
'account_data_undelivered_deletes' table, which tracks the devices that have
yet to be informed of a deleted entry in either the 'account_data' or
'room_account_data' tables.
Entries for hidden devices will not be created.
Args:
txn: The transaction that is handling the delete from (room)_account_data.
stream_id: The stream_id of the delete entry in the (room)_account_data table.
account_data_type: The type of {room,user} account data that was deleted.
room_id: The ID of the room if this refers to room account data, otherwise
this should be None.
user_id: The ID of the user this account data is related to.
"""
# TODO: Is this too gross? :P
# Alternative is to duplicate the code in get_devices_by_user
user_device_dicts = DeviceWorkerStore.get_devices_by_user_txn(
txn, self.db_pool, user_id
)
# Create an entry in the deleted account data table for each device ID.
self.db_pool.simple_insert_many_txn(
txn,
table="account_data_undelivered_deletes",
keys=("stream_id", "type", "room_id", "user_id", "device_id"),
values=(
(stream_id, account_data_type, room_id, user_id, device_id)
for device_id in user_device_dicts.keys()
),
)
def _remove_entries_from_account_data_undelivered_deletes_for_type_txn(
self,
txn: LoggingTransaction,
account_data_type: str,
room_id: Optional[str],
user_id: str,
) -> None:
"""
Removes all entries from the 'account_data_undelivered_deletes' table for a given
{user,room} account data entry.
This should be called when adding/updating an account data entry, as the entry
will no longer be in a deleted state.
Args:
txn: The transaction that is handling the addition/modification to the
relevant account data type.
account_data_type: The type of {room,user} account data that was modified.
room_id: The ID of the room if this refers to room account data, otherwise
this should be None.
user_id: The ID of the user this account data is related to.
"""
# Remove all entries pertaining to this account data type as it is
# no longer deleted!
self.db_pool.simple_delete_txn(
txn,
table="account_data_undelivered_deletes",
keyvalues={
"type": account_data_type,
"room_id": room_id,
"user_id": user_id,
},
)
async def purge_account_data_for_user(self, user_id: str) -> None:
"""
Removes ALL the account data for a user.

View File

@@ -276,11 +276,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
A mapping from device_id to a dict containing "device_id", "user_id"
and "display_name" for each device.
"""
devices = await self.db_pool.simple_select_list(
return await self.db_pool.runInteraction(
"get_devices_by_user",
self.get_devices_by_user_txn,
self.db_pool,
user_id,
)
@staticmethod
def get_devices_by_user_txn(
txn: LoggingTransaction, db_pool: DatabasePool, user_id: str
) -> Dict[str, Dict[str, str]]:
devices = db_pool.simple_select_list_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "hidden": False},
retcols=("user_id", "device_id", "display_name"),
desc="get_devices_by_user",
)
return {d["device_id"]: d for d in devices}

View File

@@ -0,0 +1,52 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Postgres truncates index names to 64 characters. Otherwise this table
-- would have a better name.
CREATE TABLE IF NOT EXISTS account_data_undelivered_deletes (
-- The stream_id of the delete in `account_data` or `room_account_data`.
-- Note that this value is unique across both `account_data` and
-- `room_account_data` tables.
stream_id BIGINT NOT NULL,
-- The account data type identifier.
type TEXT NOT NULL,
-- The room ID, if this is referring to `room_account_data`.
room_id TEXT,
-- The user that owns this device.
user_id TEXT NOT NULL,
-- A device ID that has not yet seen this delete.
device_id TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(name),
FOREIGN KEY (room_id) REFERENCES rooms(room_id),
FOREIGN KEY (user_id, device_id) REFERENCES devices(user_id, device_id)
);
-- Ensure there is only one entry per (stream_id, user_id, device_id) tuple.
CREATE UNIQUE INDEX IF NOT EXISTS
account_data_undelivered_deletes_stream_id_user_id_device_id
ON account_data_undelivered_deletes(stream_id, user_id, device_id);
-- This is used to delete any rows for a given
-- (account_data_type, room_id, user_id, device_id) tuple when an account data entry
-- is added again.
CREATE UNIQUE INDEX IF NOT EXISTS
account_data_undelivered_deletes_type_room_id_user_id_device_id
ON account_data_undelivered_deletes(type, room_id, user_id, device_id);
-- This is used to delete all rows for a given (user_id, device_id) pair
-- when a device is deleted.
CREATE INDEX IF NOT EXISTS
account_data_undelivered_deletes_user_id_device_id
ON account_data_undelivered_deletes(user_id, device_id);