Batch generation of stream_ids
This commit is contained in:
@@ -303,8 +303,8 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
events: list[EventBase],
|
||||
) -> None:
|
||||
now_ms = self.clock.time_msec()
|
||||
# event, expires_at, stream_id
|
||||
sticky_events: list[tuple[EventBase, int, int]] = []
|
||||
# event, expires_at
|
||||
sticky_events: list[tuple[EventBase, int]] = []
|
||||
for ev in events:
|
||||
# MSC: Note: policy servers and other similar antispam techniques still apply to these events.
|
||||
if ev.internal_metadata.policy_server_spammy:
|
||||
@@ -322,16 +322,23 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
expires_at = min(ev.origin_server_ts, now_ms) + sticky_duration
|
||||
# Filter out already expired sticky events
|
||||
if expires_at > now_ms:
|
||||
sticky_events.append(
|
||||
(ev, expires_at, self._sticky_events_id_gen.get_next_txn(txn))
|
||||
)
|
||||
sticky_events.append((ev, expires_at))
|
||||
if len(sticky_events) == 0:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"inserting %d sticky events in room %s",
|
||||
len(sticky_events),
|
||||
sticky_events[0][0].room_id,
|
||||
)
|
||||
|
||||
# Generate stream_ids in one go
|
||||
sticky_events_with_ids = zip(
|
||||
sticky_events,
|
||||
self._sticky_events_id_gen.get_next_mult_txn(txn, len(sticky_events)),
|
||||
strict=True,
|
||||
)
|
||||
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
"sticky_events",
|
||||
@@ -354,7 +361,7 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
expires_at,
|
||||
ev.internal_metadata.is_soft_failed(),
|
||||
)
|
||||
for (ev, expires_at, stream_id) in sticky_events
|
||||
for (ev, expires_at), stream_id in sticky_events_with_ids
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user