Compare commits
11 Commits
devon/acl-
...
anoa/test_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
866a592a6f | ||
|
|
e893697a94 | ||
|
|
42f14104e3 | ||
|
|
52387d783c | ||
|
|
a9709f0782 | ||
|
|
74812f9e18 | ||
|
|
1d24bd394d | ||
|
|
4d80032518 | ||
|
|
222d270e8a | ||
|
|
46f5f30c15 | ||
|
|
fc2286e9d4 |
1
changelog.d/14161.bugfix
Normal file
1
changelog.d/14161.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
|
||||
1
changelog.d/14164.bugfix
Normal file
1
changelog.d/14164.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
|
||||
@@ -798,9 +798,42 @@ class FederationEventHandler:
|
||||
],
|
||||
)
|
||||
|
||||
# Check if we already any of these have these events.
|
||||
# Note: we currently make a lookup in the database directly here rather than
|
||||
# checking the event cache, due to:
|
||||
# https://github.com/matrix-org/synapse/issues/13476
|
||||
existing_events_map = await self._store._get_events_from_db(
|
||||
[event.event_id for event in events]
|
||||
)
|
||||
|
||||
new_events = []
|
||||
for event in events:
|
||||
event_id = event.event_id
|
||||
|
||||
# If we've already seen this event ID...
|
||||
if event_id in existing_events_map:
|
||||
existing_event = existing_events_map[event_id]
|
||||
|
||||
# ...and the event itself was not previously stored as an outlier...
|
||||
if not existing_event.event.internal_metadata.is_outlier():
|
||||
# ...then there's no need to persist it. We have it already.
|
||||
logger.info(
|
||||
"_process_pulled_event: Ignoring received event %s which we "
|
||||
"have already seen",
|
||||
event.event_id,
|
||||
)
|
||||
continue
|
||||
|
||||
# While we have seen this event before, it was stored as an outlier.
|
||||
# We'll now persist it as a non-outlier.
|
||||
logger.info("De-outliering event %s", event_id)
|
||||
|
||||
# Continue on with the events that are new to us.
|
||||
new_events.append(event)
|
||||
|
||||
# We want to sort these by depth so we process them and
|
||||
# tell clients about them in order.
|
||||
sorted_events = sorted(events, key=lambda x: x.depth)
|
||||
sorted_events = sorted(new_events, key=lambda x: x.depth)
|
||||
for ev in sorted_events:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
@@ -852,18 +885,6 @@ class FederationEventHandler:
|
||||
|
||||
event_id = event.event_id
|
||||
|
||||
existing = await self._store.get_event(
|
||||
event_id, allow_none=True, allow_rejected=True
|
||||
)
|
||||
if existing:
|
||||
if not existing.internal_metadata.is_outlier():
|
||||
logger.info(
|
||||
"_process_pulled_event: Ignoring received event %s which we have already seen",
|
||||
event_id,
|
||||
)
|
||||
return
|
||||
logger.info("De-outliering event %s", event_id)
|
||||
|
||||
try:
|
||||
self._sanity_check_event(event)
|
||||
except SynapseError as err:
|
||||
|
||||
@@ -374,7 +374,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
If there is a mismatch, behave as per allow_none.
|
||||
|
||||
Returns:
|
||||
The event, or None if the event was not found.
|
||||
The event, or None if the event was not found and allow_none is `True`.
|
||||
"""
|
||||
if not isinstance(event_id, str):
|
||||
raise TypeError("Invalid event event_id %r" % (event_id,))
|
||||
@@ -1502,21 +1502,15 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
Returns:
|
||||
a dict {event_id -> bool}
|
||||
"""
|
||||
# if the event cache contains the event, obviously we've seen it.
|
||||
# TODO: We used to query the _get_event_cache here as a fast-path before
|
||||
# hitting the database. For if an event were in the cache, we've presumably
|
||||
# seen it before.
|
||||
#
|
||||
# But this is currently an invalid assumption due to the _get_event_cache
|
||||
# not being invalidated when purging events from a room. The optimisation can
|
||||
# be re-added after https://github.com/matrix-org/synapse/issues/13476
|
||||
|
||||
cache_results = {
|
||||
event_id
|
||||
for event_id in event_ids
|
||||
if await self._get_event_cache.contains((event_id,))
|
||||
}
|
||||
results = dict.fromkeys(cache_results, True)
|
||||
remaining = [
|
||||
event_id for event_id in event_ids if event_id not in cache_results
|
||||
]
|
||||
if not remaining:
|
||||
return results
|
||||
|
||||
def have_seen_events_txn(txn: LoggingTransaction) -> None:
|
||||
def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]:
|
||||
# we deliberately do *not* query the database for room_id, to make the
|
||||
# query an index-only lookup on `events_event_id_key`.
|
||||
#
|
||||
@@ -1524,16 +1518,17 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
sql = "SELECT event_id FROM events AS e WHERE "
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "e.event_id", remaining
|
||||
txn.database_engine, "e.event_id", event_ids
|
||||
)
|
||||
txn.execute(sql + clause, args)
|
||||
found_events = {eid for eid, in txn}
|
||||
|
||||
# ... and then we can update the results for each key
|
||||
results.update({eid: (eid in found_events) for eid in remaining})
|
||||
return {eid: (eid in found_events) for eid in event_ids}
|
||||
|
||||
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
|
||||
return results
|
||||
return await self.db_pool.runInteraction(
|
||||
"have_seen_events", have_seen_events_txn
|
||||
)
|
||||
|
||||
@cached(max_entries=100000, tree=True)
|
||||
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
|
||||
|
||||
@@ -19,7 +19,13 @@ from unittest.mock import Mock, patch
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
LimitExceededError,
|
||||
NotFoundError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.federation.federation_base import event_from_pdu_json
|
||||
@@ -28,6 +34,7 @@ from synapse.logging.context import LoggingContext, run_in_background
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.events_worker import EventCacheEntry
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
@@ -322,6 +329,102 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
||||
)
|
||||
self.get_success(d)
|
||||
|
||||
def test_backfill_ignores_known_events(self) -> None:
|
||||
"""
|
||||
Tests that events that we already know about are ignored when backfilling.
|
||||
"""
|
||||
# Set up users
|
||||
user_id = self.register_user("kermit", "test")
|
||||
tok = self.login("kermit", "test")
|
||||
|
||||
other_server = "otherserver"
|
||||
other_user = "@otheruser:" + other_server
|
||||
|
||||
# Create a room to backfill events into
|
||||
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||
room_version = self.get_success(self.store.get_room_version(room_id))
|
||||
|
||||
# Build an event to backfill
|
||||
event = event_from_pdu_json(
|
||||
{
|
||||
"type": EventTypes.Message,
|
||||
"content": {"body": "hello world", "msgtype": "m.text"},
|
||||
"room_id": room_id,
|
||||
"sender": other_user,
|
||||
"depth": 32,
|
||||
"prev_events": [],
|
||||
"auth_events": [],
|
||||
"origin_server_ts": self.clock.time_msec(),
|
||||
},
|
||||
room_version,
|
||||
)
|
||||
|
||||
# Ensure the event is not already in the DB
|
||||
self.get_failure(
|
||||
self.store.get_event(event.event_id),
|
||||
NotFoundError,
|
||||
)
|
||||
|
||||
# Backfill the event and check that it has entered the DB.
|
||||
|
||||
# We mock out the FederationClient.backfill method, to pretend that a remote
|
||||
# server has returned our fake event.
|
||||
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
|
||||
self.hs.get_federation_client().backfill = federation_client_backfill_mock
|
||||
|
||||
# We also mock the persist method with a side effect of itself. This allows us
|
||||
# to track when it has been called while preserving its function.
|
||||
persist_events_and_notify_mock = Mock(
|
||||
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
|
||||
)
|
||||
self.hs.get_federation_event_handler().persist_events_and_notify = (
|
||||
persist_events_and_notify_mock
|
||||
)
|
||||
|
||||
# Small side-tangent. We populate the event cache with the event, even though
|
||||
# it is not yet in the DB. This is an invalid scenario that can currently occur
|
||||
# due to not properly invalidating the event cache.
|
||||
# See https://github.com/matrix-org/synapse/issues/13476.
|
||||
#
|
||||
# As a result, backfill should not rely on the event cache to check whether
|
||||
# we already have an event in the DB.
|
||||
# TODO: Remove this bit when the event cache is properly invalidated.
|
||||
cache_entry = EventCacheEntry(
|
||||
event=event,
|
||||
redacted_event=None,
|
||||
)
|
||||
self.store._get_event_cache.set_local((event.event_id,), cache_entry)
|
||||
|
||||
# We now call FederationEventHandler.backfill (a separate method) to trigger
|
||||
# a backfill request. It should receive the fake event.
|
||||
self.get_success(
|
||||
self.hs.get_federation_event_handler().backfill(
|
||||
other_user,
|
||||
room_id,
|
||||
limit=10,
|
||||
extremities=[],
|
||||
)
|
||||
)
|
||||
|
||||
# Check that our fake event was persisted.
|
||||
persist_events_and_notify_mock.assert_called_once()
|
||||
persist_events_and_notify_mock.reset_mock()
|
||||
|
||||
# Now we repeat the backfill, having the homeserver receive the fake event
|
||||
# again.
|
||||
self.get_success(
|
||||
self.hs.get_federation_event_handler().backfill(
|
||||
other_user,
|
||||
room_id,
|
||||
limit=10,
|
||||
extremities=[],
|
||||
),
|
||||
)
|
||||
|
||||
# This time, we expect no event persistence to have occurred, as we already
|
||||
# have this event.
|
||||
persist_events_and_notify_mock.assert_not_called()
|
||||
|
||||
@unittest.override_config(
|
||||
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
|
||||
)
|
||||
|
||||
@@ -90,18 +90,6 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(res, {self.event_ids[0]})
|
||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
||||
|
||||
def test_query_via_event_cache(self):
|
||||
# fetch an event into the event cache
|
||||
self.get_success(self.store.get_event(self.event_ids[0]))
|
||||
|
||||
# looking it up should now cause no db hits
|
||||
with LoggingContext(name="test") as ctx:
|
||||
res = self.get_success(
|
||||
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
|
||||
)
|
||||
self.assertEqual(res, {self.event_ids[0]})
|
||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
|
||||
|
||||
def test_persisting_event_invalidates_cache(self):
|
||||
"""
|
||||
Test to make sure that the `have_seen_event` cache
|
||||
|
||||
Reference in New Issue
Block a user