Add a thread ID to receipts.
This commit is contained in:
1
changelog.d/13202.feature
Normal file
1
changelog.d/13202.feature
Normal file
@@ -0,0 +1 @@
|
||||
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
|
||||
@@ -97,6 +97,7 @@ class ReceiptsHandler:
|
||||
receipt_type=receipt_type,
|
||||
user_id=user_id,
|
||||
event_ids=user_values["event_ids"],
|
||||
thread_id=None, # TODO
|
||||
data=user_values.get("data", {}),
|
||||
)
|
||||
)
|
||||
@@ -114,6 +115,7 @@ class ReceiptsHandler:
|
||||
receipt.receipt_type,
|
||||
receipt.user_id,
|
||||
receipt.event_ids,
|
||||
receipt.thread_id,
|
||||
receipt.data,
|
||||
)
|
||||
|
||||
@@ -146,7 +148,12 @@ class ReceiptsHandler:
|
||||
return True
|
||||
|
||||
async def received_client_receipt(
|
||||
self, room_id: str, receipt_type: str, user_id: str, event_id: str
|
||||
self,
|
||||
room_id: str,
|
||||
receipt_type: str,
|
||||
user_id: str,
|
||||
event_id: str,
|
||||
thread_id: Optional[str],
|
||||
) -> None:
|
||||
"""Called when a client tells us a local user has read up to the given
|
||||
event_id in the room.
|
||||
@@ -156,6 +163,7 @@ class ReceiptsHandler:
|
||||
receipt_type=receipt_type,
|
||||
user_id=user_id,
|
||||
event_ids=[event_id],
|
||||
thread_id=thread_id,
|
||||
data={"ts": int(self.clock.time_msec())},
|
||||
)
|
||||
|
||||
|
||||
@@ -423,7 +423,8 @@ class FederationSenderHandler:
|
||||
receipt.receipt_type,
|
||||
receipt.user_id,
|
||||
[receipt.event_id],
|
||||
receipt.data,
|
||||
thread_id=None, # TODO
|
||||
data=receipt.data,
|
||||
)
|
||||
await self.federation_sender.send_read_receipt(receipt_info)
|
||||
|
||||
|
||||
@@ -81,6 +81,7 @@ class ReadMarkerRestServlet(RestServlet):
|
||||
receipt_type,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=event_id,
|
||||
thread_id=None, # TODO
|
||||
)
|
||||
|
||||
return 200, {}
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
from typing import TYPE_CHECKING, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import ReceiptTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
@@ -34,7 +34,8 @@ class ReceiptRestServlet(RestServlet):
|
||||
PATTERNS = client_patterns(
|
||||
"/rooms/(?P<room_id>[^/]*)"
|
||||
"/receipt/(?P<receipt_type>[^/]*)"
|
||||
"/(?P<event_id>[^/]*)$"
|
||||
"/(?P<event_id>[^/]*)"
|
||||
"(/(?P<thread_id>[^/]*))?$"
|
||||
)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -51,7 +52,12 @@ class ReceiptRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str
|
||||
self,
|
||||
request: SynapseRequest,
|
||||
room_id: str,
|
||||
receipt_type: str,
|
||||
event_id: str,
|
||||
thread_id: Optional[str],
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
@@ -77,6 +83,7 @@ class ReceiptRestServlet(RestServlet):
|
||||
receipt_type,
|
||||
user_id=requester.user.to_string(),
|
||||
event_id=event_id,
|
||||
thread_id=thread_id,
|
||||
)
|
||||
|
||||
return 200, {}
|
||||
|
||||
@@ -613,6 +613,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
receipt_type: str,
|
||||
user_id: str,
|
||||
event_id: str,
|
||||
thread_id: Optional[str],
|
||||
data: JsonDict,
|
||||
stream_id: int,
|
||||
) -> Optional[int]:
|
||||
@@ -636,15 +637,18 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
stream_ordering = int(res["stream_ordering"]) if res else None
|
||||
rx_ts = res["received_ts"] if res else 0
|
||||
|
||||
# Convert None to a blank string.
|
||||
thread_id = thread_id or ""
|
||||
|
||||
# We don't want to clobber receipts for more recent events, so we
|
||||
# have to compare orderings of existing receipts
|
||||
if stream_ordering is not None:
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
" INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
|
||||
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
|
||||
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
|
||||
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND r.thread_id = ?"
|
||||
)
|
||||
txn.execute(sql, (room_id, receipt_type, user_id))
|
||||
txn.execute(sql, (room_id, receipt_type, user_id, thread_id))
|
||||
|
||||
for so, eid in txn:
|
||||
if int(so) >= stream_ordering:
|
||||
@@ -656,6 +660,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
)
|
||||
return None
|
||||
|
||||
# TODO
|
||||
txn.call_after(
|
||||
self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
|
||||
)
|
||||
@@ -671,6 +676,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
"room_id": room_id,
|
||||
"receipt_type": receipt_type,
|
||||
"user_id": user_id,
|
||||
"thread_id": thread_id,
|
||||
},
|
||||
values={
|
||||
"stream_id": stream_id,
|
||||
@@ -678,7 +684,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
"data": json_encoder.encode(data),
|
||||
},
|
||||
# receipts_linearized has a unique constraint on
|
||||
# (user_id, room_id, receipt_type), so no need to lock
|
||||
# (user_id, room_id, receipt_type, key), so no need to lock
|
||||
lock=False,
|
||||
)
|
||||
|
||||
@@ -728,6 +734,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
receipt_type: str,
|
||||
user_id: str,
|
||||
event_ids: List[str],
|
||||
thread_id: Optional[str],
|
||||
data: dict,
|
||||
) -> Optional[Tuple[int, int]]:
|
||||
"""Insert a receipt, either from local client or remote server.
|
||||
@@ -760,6 +767,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
receipt_type,
|
||||
user_id,
|
||||
linearized_event_id,
|
||||
thread_id,
|
||||
data,
|
||||
stream_id=stream_id,
|
||||
# Read committed is actually beneficial here because we check for a receipt with
|
||||
@@ -774,7 +782,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
|
||||
now = self._clock.time_msec()
|
||||
logger.debug(
|
||||
"RR for event %s in %s (%i ms old)",
|
||||
"Receipt %s for event %s in %s (%i ms old)",
|
||||
receipt_type,
|
||||
linearized_event_id,
|
||||
room_id,
|
||||
now - event_ts,
|
||||
@@ -787,6 +796,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
receipt_type,
|
||||
user_id,
|
||||
event_ids,
|
||||
thread_id,
|
||||
data,
|
||||
)
|
||||
|
||||
@@ -801,6 +811,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
receipt_type: str,
|
||||
user_id: str,
|
||||
event_ids: List[str],
|
||||
thread_id: Optional[str],
|
||||
data: JsonDict,
|
||||
) -> None:
|
||||
assert self._can_write_to_receipts
|
||||
@@ -812,6 +823,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
# FIXME: This shouldn't invalidate the whole cache
|
||||
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
|
||||
|
||||
# Convert None to a blank string.
|
||||
thread_id = thread_id or ""
|
||||
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="receipts_graph",
|
||||
@@ -819,6 +833,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
"room_id": room_id,
|
||||
"receipt_type": receipt_type,
|
||||
"user_id": user_id,
|
||||
"thread_id": thread_id,
|
||||
},
|
||||
)
|
||||
self.db_pool.simple_insert_txn(
|
||||
@@ -829,6 +844,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
"receipt_type": receipt_type,
|
||||
"user_id": user_id,
|
||||
"event_ids": json_encoder.encode(event_ids),
|
||||
"thread_id": thread_id,
|
||||
"data": json_encoder.encode(data),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Allow multiple receipts per user per room via a nullable thread_id column.
|
||||
ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT NOT NULL DEFAULT '';
|
||||
ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT NOT NULL DEFAULT '';
|
||||
|
||||
-- Rebuild the unique constraint with the thread_id.
|
||||
ALTER TABLE receipts_linearized DROP CONSTRAINT IF EXISTS receipts_linearized_uniqueness;
|
||||
ALTER TABLE receipts_linearized ADD CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id);
|
||||
|
||||
ALTER TABLE receipts_graph DROP CONSTRAINT IF EXISTS receipts_graph_uniqueness;
|
||||
ALTER TABLE receipts_graph ADD CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id);
|
||||
@@ -0,0 +1,67 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Allow multiple receipts per user per room via a nullable thread_id column.
|
||||
--
|
||||
-- SQLite doesn't support modifying constraints to an existing table, so it must
|
||||
-- be recreated.
|
||||
|
||||
-- Create the new tables.
|
||||
CREATE TABLE receipts_graph_new (
|
||||
room_id TEXT NOT NULL,
|
||||
receipt_type TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
event_ids TEXT NOT NULL,
|
||||
thread_id TEXT NOT NULL DEFAULT '',
|
||||
data TEXT NOT NULL,
|
||||
CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id)
|
||||
);
|
||||
|
||||
CREATE TABLE receipts_linearized_new (
|
||||
stream_id BIGINT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
receipt_type TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
event_id TEXT NOT NULL,
|
||||
thread_id TEXT NOT NULL DEFAULT '',
|
||||
data TEXT NOT NULL,
|
||||
CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id, thread_id)
|
||||
);
|
||||
|
||||
-- Drop the old indexes.
|
||||
DROP INDEX IF EXISTS receipts_linearized_id;
|
||||
DROP INDEX IF EXISTS receipts_linearized_room_stream;
|
||||
DROP INDEX IF EXISTS receipts_linearized_user;
|
||||
|
||||
-- Copy the data.
|
||||
INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
|
||||
SELECT room_id, receipt_type, user_id, event_ids, data
|
||||
FROM receipts_graph;
|
||||
INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data)
|
||||
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
|
||||
FROM receipts_linearized;
|
||||
|
||||
-- Drop the old tables.
|
||||
DROP TABLE receipts_graph;
|
||||
DROP TABLE receipts_linearized;
|
||||
|
||||
-- Rename the tables.
|
||||
ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
|
||||
ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
|
||||
|
||||
-- Create the indices.
|
||||
CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
|
||||
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
|
||||
CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
|
||||
@@ -830,6 +830,7 @@ class ReadReceipt:
|
||||
receipt_type: str
|
||||
user_id: str
|
||||
event_ids: List[str]
|
||||
thread_id: Optional[str]
|
||||
data: JsonDict
|
||||
|
||||
|
||||
|
||||
@@ -49,7 +49,12 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
receipt = ReadReceipt(
|
||||
"room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
|
||||
"room_id",
|
||||
"m.read",
|
||||
"user_id",
|
||||
["event_id"],
|
||||
thread_id=None,
|
||||
data={"ts": 1234},
|
||||
)
|
||||
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
|
||||
|
||||
@@ -89,7 +94,12 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
receipt = ReadReceipt(
|
||||
"room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
|
||||
"room_id",
|
||||
"m.read",
|
||||
"user_id",
|
||||
["event_id"],
|
||||
thread_id=None,
|
||||
data={"ts": 1234},
|
||||
)
|
||||
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
|
||||
|
||||
@@ -121,7 +131,12 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
||||
|
||||
# send the second RR
|
||||
receipt = ReadReceipt(
|
||||
"room_id", "m.read", "user_id", ["other_id"], {"ts": 1234}
|
||||
"room_id",
|
||||
"m.read",
|
||||
"user_id",
|
||||
["other_id"],
|
||||
thread_id=None,
|
||||
data={"ts": 1234},
|
||||
)
|
||||
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
|
||||
self.pump()
|
||||
|
||||
@@ -447,6 +447,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
|
||||
receipt_type="m.read",
|
||||
user_id=self.local_user,
|
||||
event_ids=[f"$eventid_{i}"],
|
||||
thread_id=None,
|
||||
data={},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -171,7 +171,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||
if send_receipt:
|
||||
self.get_success(
|
||||
self.master_store.insert_receipt(
|
||||
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {}
|
||||
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], None, {}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -33,7 +33,12 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
|
||||
# tell the master to send a new receipt
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.insert_receipt(
|
||||
"!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1}
|
||||
"!room:blue",
|
||||
"m.read",
|
||||
USER_ID,
|
||||
["$event:blue"],
|
||||
thread_id=None,
|
||||
data={"a": 1},
|
||||
)
|
||||
)
|
||||
self.replicate()
|
||||
@@ -57,7 +62,12 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.insert_receipt(
|
||||
"!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2}
|
||||
"!room2:blue",
|
||||
"m.read",
|
||||
USER_ID,
|
||||
["$event2:foo"],
|
||||
thread_id=None,
|
||||
data={"a": 2},
|
||||
)
|
||||
)
|
||||
self.replicate()
|
||||
|
||||
@@ -112,6 +112,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
|
||||
"m.read",
|
||||
user_id=user_id,
|
||||
event_ids=[event_id],
|
||||
thread_id=None,
|
||||
data={},
|
||||
)
|
||||
)
|
||||
@@ -258,6 +259,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
|
||||
"m.read",
|
||||
user_id=user_id,
|
||||
event_ids=[event_id],
|
||||
thread_id=None,
|
||||
data={},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -120,13 +120,18 @@ class ReceiptTestCase(HomeserverTestCase):
|
||||
# Send public read receipt for the first event
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {}
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {}
|
||||
)
|
||||
)
|
||||
# Send private read receipt for the second event
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {}
|
||||
self.room_id1,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
OUR_USER_ID,
|
||||
[event1_2_id],
|
||||
None,
|
||||
{},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -153,7 +158,7 @@ class ReceiptTestCase(HomeserverTestCase):
|
||||
# Test receipt updating
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {}
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {}
|
||||
)
|
||||
)
|
||||
res = self.get_success(
|
||||
@@ -169,7 +174,12 @@ class ReceiptTestCase(HomeserverTestCase):
|
||||
# Test new room is reflected in what the method returns
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {}
|
||||
self.room_id2,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
OUR_USER_ID,
|
||||
[event2_1_id],
|
||||
None,
|
||||
{},
|
||||
)
|
||||
)
|
||||
res = self.get_success(
|
||||
@@ -191,13 +201,18 @@ class ReceiptTestCase(HomeserverTestCase):
|
||||
# Send public read receipt for the first event
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {}
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {}
|
||||
)
|
||||
)
|
||||
# Send private read receipt for the second event
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {}
|
||||
self.room_id1,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
OUR_USER_ID,
|
||||
[event1_2_id],
|
||||
None,
|
||||
{},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -230,7 +245,7 @@ class ReceiptTestCase(HomeserverTestCase):
|
||||
# Test receipt updating
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {}
|
||||
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {}
|
||||
)
|
||||
)
|
||||
res = self.get_success(
|
||||
@@ -248,7 +263,12 @@ class ReceiptTestCase(HomeserverTestCase):
|
||||
# Test new room is reflected in what the method returns
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {}
|
||||
self.room_id2,
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
OUR_USER_ID,
|
||||
[event2_1_id],
|
||||
None,
|
||||
{},
|
||||
)
|
||||
)
|
||||
res = self.get_success(
|
||||
|
||||
Reference in New Issue
Block a user