Add test_gaps_going_backwards
This commit is contained in:
@@ -229,19 +229,23 @@ class EventGapEntry:
|
||||
From MSC3871: Gappy timeline
|
||||
"""
|
||||
|
||||
prev_token: RoomStreamToken
|
||||
"""
|
||||
The token position before the target `event_id`
|
||||
"""
|
||||
|
||||
event_id: str
|
||||
"""
|
||||
The target event ID which we see a gap before or after.
|
||||
"""
|
||||
|
||||
prev_token: RoomStreamToken
|
||||
"""
|
||||
The token position before the target `event_id`
|
||||
|
||||
Remember: tokens are positions between events
|
||||
"""
|
||||
|
||||
next_token: RoomStreamToken
|
||||
"""
|
||||
The token position after the target `event_id`
|
||||
|
||||
Remember: tokens are positions between events
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -48,13 +48,13 @@ from synapse.storage.database import (
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.storage.databases.main.stream import (
|
||||
generate_next_token,
|
||||
generate_pagination_bounds,
|
||||
generate_pagination_where_clause,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.tokens import generate_next_token
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
@@ -604,7 +604,7 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
|
||||
return self.instance_map.get(instance_name, self.stream)
|
||||
|
||||
def is_before_or_eq(self, other_token: Self) -> bool:
|
||||
"""Wether this token is before the other token, i.e. every constituent
|
||||
"""Whether this token is before the other token, i.e. every constituent
|
||||
part is before the other.
|
||||
|
||||
Essentially it is `self <= other`.
|
||||
@@ -694,7 +694,7 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
|
||||
|
||||
---
|
||||
|
||||
Historic tokens start with a "t" followed by the `depth`
|
||||
Historical tokens start with a "t" followed by the `depth`
|
||||
(`topological_ordering` in the event graph) of the event that comes before
|
||||
the position of the token, followed by "-", followed by the
|
||||
`stream_ordering` of the event that comes before the position of the token.
|
||||
@@ -827,17 +827,15 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
|
||||
|
||||
return self.topological, self.stream
|
||||
|
||||
def get_stream_pos_for_instance(self, instance_name: str) -> int:
|
||||
"""Get the stream position that the given writer was at at this token.
|
||||
def is_before_or_eq(self, other_token: Self) -> bool:
|
||||
is_before_or_eq_stream_ordering = super().is_before_or_eq(other_token)
|
||||
if not is_before_or_eq_stream_ordering:
|
||||
return False
|
||||
|
||||
This only makes sense for "live" tokens that may have a vector clock
|
||||
component, and so asserts that this is a "live" token.
|
||||
"""
|
||||
assert self.topological is None
|
||||
if self.topological is not None and other_token.topological is not None:
|
||||
return self.topological <= other_token.topological
|
||||
|
||||
# If we don't have an entry for the instance we can assume that it was
|
||||
# at `self.stream`.
|
||||
return self.instance_map.get(instance_name, self.stream)
|
||||
return True
|
||||
|
||||
async def to_string(self, store: "DataStore") -> str:
|
||||
"""See class level docstring for information about the format."""
|
||||
|
||||
@@ -39,4 +39,9 @@ def generate_next_token(
|
||||
# when we are going backwards so we subtract one from the
|
||||
# stream part.
|
||||
last_stream_ordering -= 1
|
||||
|
||||
# TODO: Is this okay to do? Kinda seems more correct
|
||||
if last_topo_ordering is not None:
|
||||
last_topo_ordering -= 1
|
||||
|
||||
return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering)
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
"""Tests REST events for /rooms paths."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union
|
||||
from unittest.mock import AsyncMock, Mock, call, patch
|
||||
@@ -59,7 +60,14 @@ from synapse.rest.client import (
|
||||
sync,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, RoomAlias, UserID, create_requester
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
RoomAlias,
|
||||
StreamKeyType,
|
||||
StreamToken,
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
@@ -70,6 +78,8 @@ from tests.test_utils.event_injection import create_event
|
||||
from tests.unittest import override_config
|
||||
from tests.utils import default_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PATH_PREFIX = b"/_matrix/client/api/v1"
|
||||
|
||||
|
||||
@@ -2242,6 +2252,11 @@ class RoomMessageListTestCase(RoomBase):
|
||||
user_id = "@sid1:red"
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = self.hs.get_datastores().main
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
self.room_id = self.helper.create_room_as(self.user_id)
|
||||
|
||||
def test_topo_token_is_accepted(self) -> None:
|
||||
@@ -2371,6 +2386,195 @@ class RoomMessageListTestCase(RoomBase):
|
||||
channel.json_body["errcode"], Codes.NOT_JSON, channel.json_body
|
||||
)
|
||||
|
||||
def _setup_gappy_timeline(self) -> Tuple[Dict[str, str], Dict[str, str]]:
|
||||
"""
|
||||
Set up a gappy timeline for testing.
|
||||
|
||||
We create a chain of events but only persist every other event so we have gaps
|
||||
everywhere.
|
||||
|
||||
(`p` means the event was persisted and known to this local server)
|
||||
```
|
||||
p p p p p
|
||||
old history <- foo -> bar <- baz -> qux <- corge <- grault <- garply <- waldo <- fred
|
||||
```
|
||||
|
||||
We also have some <primordial events from room creation> that are persisted at
|
||||
the beginning of the room but that's just a quirk of how we set this test
|
||||
fixture up. The "old history" is supposed to represent the point that we've
|
||||
actually back-paginated so far from our server.
|
||||
|
||||
Returns:
|
||||
Tuple of:
|
||||
1. Mapping from message to event IDs.
|
||||
2. Mapping from event IDs to messages.
|
||||
"""
|
||||
|
||||
message_list = [
|
||||
"old history",
|
||||
"foo",
|
||||
"bar",
|
||||
"baz",
|
||||
"qux",
|
||||
"corge",
|
||||
"grault",
|
||||
"garply",
|
||||
"waldo",
|
||||
"fred",
|
||||
]
|
||||
message_to_event_id_map = {}
|
||||
event_id_to_message_map = {}
|
||||
|
||||
# Make a straight line of events where only every other is persisted
|
||||
forward_extremity_event_ids = list(
|
||||
self.get_success(
|
||||
self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
)
|
||||
previous_depth = 0
|
||||
for message_index, message_text in enumerate(message_list):
|
||||
event, event_context = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=forward_extremity_event_ids,
|
||||
type=EventTypes.Message,
|
||||
content={"body": message_text, "msgtype": "m.text"},
|
||||
sender=self.user_id,
|
||||
room_id=self.room_id,
|
||||
room_version=self.get_success(
|
||||
self.store.get_room_version_id(self.room_id)
|
||||
),
|
||||
)
|
||||
)
|
||||
message_to_event_id_map[message_text] = event.event_id
|
||||
event_id_to_message_map[event.event_id] = message_text
|
||||
# Update the forward extremity to the new event
|
||||
forward_extremity_event_ids = [
|
||||
event.event_id,
|
||||
# Because we only persist every other event, if we just give Synapse a
|
||||
# unknown event ID as a `prev_event_id`, it wont' be able to calculate
|
||||
# `depth` in the DAG and will just default it to a `depth` of 1.
|
||||
#
|
||||
# Let's just connect it to one of the previous-previous events so that
|
||||
# Synapse has some known `prev_event_id` to calculate the `depth` from.
|
||||
forward_extremity_event_ids[0],
|
||||
]
|
||||
|
||||
# Persist every other event (do the odds, so we start with *not* persisting
|
||||
# the event representing the "old history")
|
||||
if message_index % 2 == 1:
|
||||
event, _, _ = self.get_success(
|
||||
self.persistence.persist_event(event, event_context)
|
||||
)
|
||||
# For sanity sake because `/messages` uses topological ordering, let's
|
||||
# assert that the `depth` is increasing.
|
||||
self.assertGreater(
|
||||
event.depth,
|
||||
previous_depth,
|
||||
"Expected event depth to increase as we persist events",
|
||||
)
|
||||
previous_depth = event.depth
|
||||
|
||||
return message_to_event_id_map, event_id_to_message_map
|
||||
|
||||
def test_gaps_going_backwards(self) -> None:
|
||||
message_to_event_id_map, event_id_to_message_map = self._setup_gappy_timeline()
|
||||
|
||||
# Craft a token the represents the position just after the "corge" event.
|
||||
# When looking backwards, we should see the "corge" event.
|
||||
corge_room_stream_token = self.get_success(
|
||||
self.store.get_topological_token_for_event(message_to_event_id_map["corge"])
|
||||
)
|
||||
current_token = self.hs.get_event_sources().get_current_token()
|
||||
corge_token = self.get_success(
|
||||
current_token.copy_and_replace(
|
||||
StreamKeyType.ROOM,
|
||||
corge_room_stream_token,
|
||||
).to_string(self.store)
|
||||
)
|
||||
|
||||
messages_type_filter = '{"types": ["m.room.message"]}'
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/rooms/%s/messages?dir=b&from=%s&filter=%s"
|
||||
% (self.room_id, corge_token, messages_type_filter),
|
||||
)
|
||||
self.assertEqual(HTTPStatus.OK, channel.code)
|
||||
logger.info("asdf %s", channel.json_body)
|
||||
|
||||
# Make sure the timeline includes everything from "corge" backwards (inclusive)
|
||||
#
|
||||
actual_messages = [
|
||||
event_id_to_message_map.get(event["event_id"], event["event_id"])
|
||||
for event in channel.json_body["chunk"]
|
||||
]
|
||||
expected_messages = [
|
||||
"corge",
|
||||
# "qux",
|
||||
"baz",
|
||||
# "bar",
|
||||
"foo",
|
||||
# "old history",
|
||||
]
|
||||
# Because the `assertEquals` assertion to assert exact order gives horrible diff
|
||||
# output when it fails, let's use `assertIncludes` as a first step to sanity
|
||||
# check everything is there before we assert the exact order.
|
||||
self.assertIncludes(
|
||||
set(actual_messages),
|
||||
set(expected_messages),
|
||||
exact=True,
|
||||
)
|
||||
# Asser the actual order
|
||||
self.assertEqual(actual_messages, expected_messages)
|
||||
|
||||
# Make sure the gaps are correct
|
||||
actual_gaps = [
|
||||
event_id_to_message_map.get(gap["event_id"], gap["event_id"])
|
||||
for gap in channel.json_body["gaps"]
|
||||
]
|
||||
expected_gaps = expected_messages
|
||||
# We only need to assert gaps are in the list (the order doesn't matter)
|
||||
self.assertIncludes(
|
||||
set(actual_gaps),
|
||||
set(expected_gaps),
|
||||
exact=True,
|
||||
)
|
||||
# Ensure that the tokens point to the correct positions
|
||||
for gap in channel.json_body["gaps"]:
|
||||
event_room_stream_token = self.get_success(
|
||||
self.store.get_topological_token_for_event(gap["event_id"])
|
||||
)
|
||||
|
||||
# Make sure that the `prev_pagination_token` points to the position before
|
||||
# the event
|
||||
prev_pagination_token = self.get_success(
|
||||
StreamToken.from_string(self.store, gap["prev_pagination_token"])
|
||||
)
|
||||
assert prev_pagination_token.room_key.topological is not None, (
|
||||
"expected `gap.prev_pagination_token` to be a topological token since it was returned from `/messages`"
|
||||
)
|
||||
assert prev_pagination_token.room_key.is_before_or_eq(
|
||||
event_room_stream_token
|
||||
), (
|
||||
"expected the `gap.prev_pagination_token` to point to the position before the event"
|
||||
)
|
||||
|
||||
# Make sure that the `next_pagination_token` points to the position after
|
||||
# the event
|
||||
next_pagination_token = self.get_success(
|
||||
StreamToken.from_string(self.store, gap["next_pagination_token"])
|
||||
)
|
||||
assert next_pagination_token.room_key.topological is not None, (
|
||||
"expected `gap.next_pagination_token` to be a topological token since it was returned from `/messages`"
|
||||
)
|
||||
assert not event_room_stream_token.is_before_or_eq(
|
||||
prev_pagination_token.room_key
|
||||
), (
|
||||
"expected the `gap.next_pagination_token` to point to the position after the event"
|
||||
)
|
||||
|
||||
# TODO: `test_gaps_going_forwards`
|
||||
|
||||
|
||||
class RoomMessageFilterTestCase(RoomBase):
|
||||
"""Tests /rooms/$room_id/messages REST events."""
|
||||
|
||||
Reference in New Issue
Block a user