Compare commits
11 Commits
develop
...
anoa/test_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
866a592a6f | ||
|
|
e893697a94 | ||
|
|
42f14104e3 | ||
|
|
52387d783c | ||
|
|
a9709f0782 | ||
|
|
74812f9e18 | ||
|
|
1d24bd394d | ||
|
|
4d80032518 | ||
|
|
222d270e8a | ||
|
|
46f5f30c15 | ||
|
|
fc2286e9d4 |
1
changelog.d/14161.bugfix
Normal file
1
changelog.d/14161.bugfix
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
|
||||||
1
changelog.d/14164.bugfix
Normal file
1
changelog.d/14164.bugfix
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
|
||||||
@@ -798,9 +798,42 @@ class FederationEventHandler:
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Check if we already any of these have these events.
|
||||||
|
# Note: we currently make a lookup in the database directly here rather than
|
||||||
|
# checking the event cache, due to:
|
||||||
|
# https://github.com/matrix-org/synapse/issues/13476
|
||||||
|
existing_events_map = await self._store._get_events_from_db(
|
||||||
|
[event.event_id for event in events]
|
||||||
|
)
|
||||||
|
|
||||||
|
new_events = []
|
||||||
|
for event in events:
|
||||||
|
event_id = event.event_id
|
||||||
|
|
||||||
|
# If we've already seen this event ID...
|
||||||
|
if event_id in existing_events_map:
|
||||||
|
existing_event = existing_events_map[event_id]
|
||||||
|
|
||||||
|
# ...and the event itself was not previously stored as an outlier...
|
||||||
|
if not existing_event.event.internal_metadata.is_outlier():
|
||||||
|
# ...then there's no need to persist it. We have it already.
|
||||||
|
logger.info(
|
||||||
|
"_process_pulled_event: Ignoring received event %s which we "
|
||||||
|
"have already seen",
|
||||||
|
event.event_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# While we have seen this event before, it was stored as an outlier.
|
||||||
|
# We'll now persist it as a non-outlier.
|
||||||
|
logger.info("De-outliering event %s", event_id)
|
||||||
|
|
||||||
|
# Continue on with the events that are new to us.
|
||||||
|
new_events.append(event)
|
||||||
|
|
||||||
# We want to sort these by depth so we process them and
|
# We want to sort these by depth so we process them and
|
||||||
# tell clients about them in order.
|
# tell clients about them in order.
|
||||||
sorted_events = sorted(events, key=lambda x: x.depth)
|
sorted_events = sorted(new_events, key=lambda x: x.depth)
|
||||||
for ev in sorted_events:
|
for ev in sorted_events:
|
||||||
with nested_logging_context(ev.event_id):
|
with nested_logging_context(ev.event_id):
|
||||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||||
@@ -852,18 +885,6 @@ class FederationEventHandler:
|
|||||||
|
|
||||||
event_id = event.event_id
|
event_id = event.event_id
|
||||||
|
|
||||||
existing = await self._store.get_event(
|
|
||||||
event_id, allow_none=True, allow_rejected=True
|
|
||||||
)
|
|
||||||
if existing:
|
|
||||||
if not existing.internal_metadata.is_outlier():
|
|
||||||
logger.info(
|
|
||||||
"_process_pulled_event: Ignoring received event %s which we have already seen",
|
|
||||||
event_id,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
logger.info("De-outliering event %s", event_id)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._sanity_check_event(event)
|
self._sanity_check_event(event)
|
||||||
except SynapseError as err:
|
except SynapseError as err:
|
||||||
|
|||||||
@@ -374,7 +374,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
If there is a mismatch, behave as per allow_none.
|
If there is a mismatch, behave as per allow_none.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The event, or None if the event was not found.
|
The event, or None if the event was not found and allow_none is `True`.
|
||||||
"""
|
"""
|
||||||
if not isinstance(event_id, str):
|
if not isinstance(event_id, str):
|
||||||
raise TypeError("Invalid event event_id %r" % (event_id,))
|
raise TypeError("Invalid event event_id %r" % (event_id,))
|
||||||
@@ -1502,21 +1502,15 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
Returns:
|
Returns:
|
||||||
a dict {event_id -> bool}
|
a dict {event_id -> bool}
|
||||||
"""
|
"""
|
||||||
# if the event cache contains the event, obviously we've seen it.
|
# TODO: We used to query the _get_event_cache here as a fast-path before
|
||||||
|
# hitting the database. For if an event were in the cache, we've presumably
|
||||||
|
# seen it before.
|
||||||
|
#
|
||||||
|
# But this is currently an invalid assumption due to the _get_event_cache
|
||||||
|
# not being invalidated when purging events from a room. The optimisation can
|
||||||
|
# be re-added after https://github.com/matrix-org/synapse/issues/13476
|
||||||
|
|
||||||
cache_results = {
|
def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]:
|
||||||
event_id
|
|
||||||
for event_id in event_ids
|
|
||||||
if await self._get_event_cache.contains((event_id,))
|
|
||||||
}
|
|
||||||
results = dict.fromkeys(cache_results, True)
|
|
||||||
remaining = [
|
|
||||||
event_id for event_id in event_ids if event_id not in cache_results
|
|
||||||
]
|
|
||||||
if not remaining:
|
|
||||||
return results
|
|
||||||
|
|
||||||
def have_seen_events_txn(txn: LoggingTransaction) -> None:
|
|
||||||
# we deliberately do *not* query the database for room_id, to make the
|
# we deliberately do *not* query the database for room_id, to make the
|
||||||
# query an index-only lookup on `events_event_id_key`.
|
# query an index-only lookup on `events_event_id_key`.
|
||||||
#
|
#
|
||||||
@@ -1524,16 +1518,17 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
sql = "SELECT event_id FROM events AS e WHERE "
|
sql = "SELECT event_id FROM events AS e WHERE "
|
||||||
clause, args = make_in_list_sql_clause(
|
clause, args = make_in_list_sql_clause(
|
||||||
txn.database_engine, "e.event_id", remaining
|
txn.database_engine, "e.event_id", event_ids
|
||||||
)
|
)
|
||||||
txn.execute(sql + clause, args)
|
txn.execute(sql + clause, args)
|
||||||
found_events = {eid for eid, in txn}
|
found_events = {eid for eid, in txn}
|
||||||
|
|
||||||
# ... and then we can update the results for each key
|
# ... and then we can update the results for each key
|
||||||
results.update({eid: (eid in found_events) for eid in remaining})
|
return {eid: (eid in found_events) for eid in event_ids}
|
||||||
|
|
||||||
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
|
return await self.db_pool.runInteraction(
|
||||||
return results
|
"have_seen_events", have_seen_events_txn
|
||||||
|
)
|
||||||
|
|
||||||
@cached(max_entries=100000, tree=True)
|
@cached(max_entries=100000, tree=True)
|
||||||
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
|
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
|
||||||
|
|||||||
@@ -19,7 +19,13 @@ from unittest.mock import Mock, patch
|
|||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
|
from synapse.api.errors import (
|
||||||
|
AuthError,
|
||||||
|
Codes,
|
||||||
|
LimitExceededError,
|
||||||
|
NotFoundError,
|
||||||
|
SynapseError,
|
||||||
|
)
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.events import EventBase, make_event_from_dict
|
from synapse.events import EventBase, make_event_from_dict
|
||||||
from synapse.federation.federation_base import event_from_pdu_json
|
from synapse.federation.federation_base import event_from_pdu_json
|
||||||
@@ -28,6 +34,7 @@ from synapse.logging.context import LoggingContext, run_in_background
|
|||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import login, room
|
from synapse.rest.client import login, room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
from synapse.storage.databases.main.events_worker import EventCacheEntry
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
@@ -322,6 +329,102 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
|||||||
)
|
)
|
||||||
self.get_success(d)
|
self.get_success(d)
|
||||||
|
|
||||||
|
def test_backfill_ignores_known_events(self) -> None:
|
||||||
|
"""
|
||||||
|
Tests that events that we already know about are ignored when backfilling.
|
||||||
|
"""
|
||||||
|
# Set up users
|
||||||
|
user_id = self.register_user("kermit", "test")
|
||||||
|
tok = self.login("kermit", "test")
|
||||||
|
|
||||||
|
other_server = "otherserver"
|
||||||
|
other_user = "@otheruser:" + other_server
|
||||||
|
|
||||||
|
# Create a room to backfill events into
|
||||||
|
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||||
|
room_version = self.get_success(self.store.get_room_version(room_id))
|
||||||
|
|
||||||
|
# Build an event to backfill
|
||||||
|
event = event_from_pdu_json(
|
||||||
|
{
|
||||||
|
"type": EventTypes.Message,
|
||||||
|
"content": {"body": "hello world", "msgtype": "m.text"},
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": other_user,
|
||||||
|
"depth": 32,
|
||||||
|
"prev_events": [],
|
||||||
|
"auth_events": [],
|
||||||
|
"origin_server_ts": self.clock.time_msec(),
|
||||||
|
},
|
||||||
|
room_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ensure the event is not already in the DB
|
||||||
|
self.get_failure(
|
||||||
|
self.store.get_event(event.event_id),
|
||||||
|
NotFoundError,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Backfill the event and check that it has entered the DB.
|
||||||
|
|
||||||
|
# We mock out the FederationClient.backfill method, to pretend that a remote
|
||||||
|
# server has returned our fake event.
|
||||||
|
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
|
||||||
|
self.hs.get_federation_client().backfill = federation_client_backfill_mock
|
||||||
|
|
||||||
|
# We also mock the persist method with a side effect of itself. This allows us
|
||||||
|
# to track when it has been called while preserving its function.
|
||||||
|
persist_events_and_notify_mock = Mock(
|
||||||
|
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
|
||||||
|
)
|
||||||
|
self.hs.get_federation_event_handler().persist_events_and_notify = (
|
||||||
|
persist_events_and_notify_mock
|
||||||
|
)
|
||||||
|
|
||||||
|
# Small side-tangent. We populate the event cache with the event, even though
|
||||||
|
# it is not yet in the DB. This is an invalid scenario that can currently occur
|
||||||
|
# due to not properly invalidating the event cache.
|
||||||
|
# See https://github.com/matrix-org/synapse/issues/13476.
|
||||||
|
#
|
||||||
|
# As a result, backfill should not rely on the event cache to check whether
|
||||||
|
# we already have an event in the DB.
|
||||||
|
# TODO: Remove this bit when the event cache is properly invalidated.
|
||||||
|
cache_entry = EventCacheEntry(
|
||||||
|
event=event,
|
||||||
|
redacted_event=None,
|
||||||
|
)
|
||||||
|
self.store._get_event_cache.set_local((event.event_id,), cache_entry)
|
||||||
|
|
||||||
|
# We now call FederationEventHandler.backfill (a separate method) to trigger
|
||||||
|
# a backfill request. It should receive the fake event.
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler().backfill(
|
||||||
|
other_user,
|
||||||
|
room_id,
|
||||||
|
limit=10,
|
||||||
|
extremities=[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that our fake event was persisted.
|
||||||
|
persist_events_and_notify_mock.assert_called_once()
|
||||||
|
persist_events_and_notify_mock.reset_mock()
|
||||||
|
|
||||||
|
# Now we repeat the backfill, having the homeserver receive the fake event
|
||||||
|
# again.
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler().backfill(
|
||||||
|
other_user,
|
||||||
|
room_id,
|
||||||
|
limit=10,
|
||||||
|
extremities=[],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# This time, we expect no event persistence to have occurred, as we already
|
||||||
|
# have this event.
|
||||||
|
persist_events_and_notify_mock.assert_not_called()
|
||||||
|
|
||||||
@unittest.override_config(
|
@unittest.override_config(
|
||||||
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
|
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -90,18 +90,6 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
|||||||
self.assertEqual(res, {self.event_ids[0]})
|
self.assertEqual(res, {self.event_ids[0]})
|
||||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
||||||
|
|
||||||
def test_query_via_event_cache(self):
|
|
||||||
# fetch an event into the event cache
|
|
||||||
self.get_success(self.store.get_event(self.event_ids[0]))
|
|
||||||
|
|
||||||
# looking it up should now cause no db hits
|
|
||||||
with LoggingContext(name="test") as ctx:
|
|
||||||
res = self.get_success(
|
|
||||||
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
|
|
||||||
)
|
|
||||||
self.assertEqual(res, {self.event_ids[0]})
|
|
||||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
|
||||||
|
|
||||||
def test_persisting_event_invalidates_cache(self):
|
def test_persisting_event_invalidates_cache(self):
|
||||||
"""
|
"""
|
||||||
Test to make sure that the `have_seen_event` cache
|
Test to make sure that the `have_seen_event` cache
|
||||||
|
|||||||
Reference in New Issue
Block a user