1
0

port BackgroundUpdateTestCase to HomeserverTestCase (#6653)

* commit 'd20c34654':
  port BackgroundUpdateTestCase to HomeserverTestCase (#6653)
  changelog
  Fix exceptions in log when rejected event is replicated
  async/await for SyncReplicationHandler.process_and_notify
  Clarify documentation on get_event* methods
This commit is contained in:
Andrew Morgan
2020-03-20 16:59:35 +00:00
5 changed files with 86 additions and 50 deletions

1
changelog.d/6645.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix exceptions in the synchrotron worker log when events are rejected.

1
changelog.d/6653.misc Normal file
View File

@@ -0,0 +1 @@
Port core background update routines to async/await.

View File

@@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users()
@defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows):
async def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
@@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
event = yield self.store.get_event(row.data.event_id)
assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
@@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
room_ids = await self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]

View File

@@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_event(
self,
event_id: List[str],
event_id: str,
redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT,
get_prev_content: bool = False,
allow_rejected: bool = False,
@@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_id: The event_id of the event to fetch
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events
* DISALLOW - Do not return redacted events (behave as per allow_none
if the event is redacted)
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected: If True return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
behave as per allow_none.
allow_none: If True, return None if no event found, if
False throw a NotFoundError
check_room_id: if not None, check the room of the found event.
If there is a mismatch, behave as per allow_none.
@@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_ids: The event_ids of the events to fetch
redact_behaviour: Determine what to do with a redacted event. Possible
values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events
* DISALLOW - Do not return redacted events (omit them from the response)
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected: If True return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
omits rejeted events from the response.
Returns:
Deferred : Dict from event_id to event.
@@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore):
"""Get events from the database and return in a list in the same order
as given by `event_ids` arg.
Unknown events will be omitted from the response.
Args:
event_ids: The event_ids of the events to fetch
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
* DISALLOW - Do not return redacted events
* DISALLOW - Do not return redacted events (omit them from the response)
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected: If True, return rejected events.
allow_rejected: If True, return rejected events. Otherwise,
omits rejected events from the response.
Returns:
Deferred[list[EventBase]]: List of events fetched from the database. The
@@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore):
If events are pulled from the database, they will be cached for future lookups.
Unknown events are omitted from the response.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
@@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore):
Returned events will be added to the cache for future lookups.
Unknown events are omitted from the response.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:

View File

@@ -2,44 +2,37 @@ from mock import Mock
from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdater
from tests import unittest
from tests.utils import setup_test_homeserver
class BackgroundUpdateTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield setup_test_homeserver(self.addCleanup)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
# the base test class should have run the real bg updates for us
self.assertTrue(self.updates.has_completed_background_updates())
self.update_handler = Mock()
yield self.store.db.updates.register_background_update_handler(
self.updates.register_background_update_handler(
"test_update", self.update_handler
)
# run the real background updates, to get them out the way
# (perhaps we should run them as part of the test HS setup, since we
# run all of the other schema setup stuff there?)
while True:
res = yield self.store.db.updates.do_next_background_update(1000)
if res is None:
break
@defer.inlineCallbacks
def test_do_background_update(self):
desired_count = 1000
# the time we claim each update takes
duration_ms = 42
# the target runtime for each bg update
target_background_update_duration_ms = 50000
# first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
self.clock.advance_time_msec(count * duration_ms)
yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1}
yield self.store.db.runInteraction(
yield self.hs.get_datastore().db.runInteraction(
"update_progress",
self.store.db.updates._background_update_progress_txn,
self.updates._background_update_progress_txn,
"test_update",
progress,
)
@@ -47,37 +40,46 @@ class BackgroundUpdateTestCase(unittest.TestCase):
self.update_handler.side_effect = update
yield self.store.db.updates.start_background_update(
"test_update", {"my_key": 1}
self.get_success(
self.updates.start_background_update("test_update", {"my_key": 1})
)
self.update_handler.reset_mock()
result = yield self.store.db.updates.do_next_background_update(
duration_ms * desired_count
res = self.get_success(
self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
)
self.assertIsNotNone(result)
self.assertIsNotNone(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
{"my_key": 1}, self.store.db.updates.DEFAULT_BACKGROUND_BATCH_SIZE
{"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE
)
# second step: complete the update
# we should now get run with a much bigger number of items to update
@defer.inlineCallbacks
def update(progress, count):
yield self.store.db.updates._end_background_update("test_update")
self.assertEqual(progress, {"my_key": 2})
self.assertAlmostEqual(
count, target_background_update_duration_ms / duration_ms, places=0,
)
yield self.updates._end_background_update("test_update")
return count
self.update_handler.side_effect = update
self.update_handler.reset_mock()
result = yield self.store.db.updates.do_next_background_update(
duration_ms * desired_count
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNotNone(result)
self.update_handler.assert_called_once_with({"my_key": 2}, desired_count)
self.update_handler.assert_called_once()
# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = yield self.store.db.updates.do_next_background_update(
duration_ms * desired_count
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNone(result)
self.assertFalse(self.update_handler.called)