1
0

Add benchmark

This commit is contained in:
Eric Eastwood
2022-08-19 01:30:23 -05:00
parent 7cb07d3a03
commit 2fdbca62e4
2 changed files with 58 additions and 14 deletions

View File

@@ -1455,7 +1455,7 @@ class EventsWorkerStore(SQLBaseStore):
@trace
@tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
self, room_id: str, event_ids: Collection[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.
@@ -1480,14 +1480,14 @@ class EventsWorkerStore(SQLBaseStore):
# the batches as big as possible.
remaining_event_ids: Set[str] = set()
for chunk in batch_iter(event_ids, 1000):
for chunk in batch_iter(event_ids, 500):
remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk)
remaining_event_ids.update(remaining_event_ids_from_chunk)
return remaining_event_ids
@cachedList(cached_method_name="have_seen_event", list_name="event_ids")
async def _have_seen_events_dict(self, event_ids: Iterable[str]) -> Set[str]:
# @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
async def _have_seen_events_dict(self, event_ids: Collection[str]) -> set[str]:
"""Helper for have_seen_events
Returns:
@@ -1501,10 +1501,10 @@ class EventsWorkerStore(SQLBaseStore):
event_id for event_id in event_ids if event_id not in event_ids_in_cache
}
if not remaining_event_ids:
return []
return set()
def have_seen_events_txn(txn: LoggingTransaction) -> None:
global remaining_event_ids
nonlocal remaining_event_ids
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#

View File

@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
from contextlib import contextmanager
from typing import Generator, List, Tuple
@@ -36,6 +37,8 @@ from synapse.util.async_helpers import yieldable_gather_results
from tests import unittest
logger = logging.getLogger(__name__)
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
@@ -91,15 +94,56 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
)
self.event_ids.append(event_id)
# def test_benchmark(self):
# with LoggingContext(name="test") as ctx:
# res = self.get_success(
# self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
# )
# self.assertEqual(res, {self.event_ids[0]})
def test_benchmark(self):
import time
# # that should result in a single db query
# self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
room_id = "room123"
event_ids = []
setup_start_time = time.time()
with LoggingContext(name="test-setup") as ctx:
for i in range(50000):
event_json = {"type": f"test {i}", "room_id": room_id}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
event_ids.append(event_id)
self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": room_id,
"topological_ordering": i,
"stream_ordering": 123 + i,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
setup_end_time = time.time()
logger.info(
"Setup time: %s",
(setup_end_time - setup_start_time),
)
with LoggingContext(name="test") as ctx:
benchmark_start_time = time.time()
remaining_event_ids = self.get_success(
self.store.have_seen_events(room_id, event_ids)
)
benchmark_end_time = time.time()
logger.info("afewewf %s %s", benchmark_start_time, benchmark_end_time)
logger.info(
"Benchmark time: %s",
(benchmark_end_time - benchmark_start_time),
)
# self.assertEqual(remaining_event_ids, set())
# that should result in a many db queries
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
def test_simple(self):
with LoggingContext(name="test") as ctx: