1
0

change action for event by user to take a list of events, context and pass batch of events to _get_power_levels_and_sender_level

This commit is contained in:
H. Shay
2022-10-06 11:28:56 -07:00
parent c7f677fd1d
commit dcc096b435
+118 -102
View File
@@ -261,125 +261,141 @@ class BulkPushRuleEvaluator:
@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
self, event: EventBase, context: EventContext
self, events_and_context: List[Tuple[EventBase, EventContext]]
) -> None:
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""Given a list of events and their associated contexts, evaluate the push rules
for each event, check if the message should increment the unread count, and
insert the results into the event_push_actions_staging table.
"""
if not event.internal_metadata.is_notifiable():
# Push rules for events that aren't notifiable can't be processed by this
return
for event, context in events_and_context:
if not event.internal_metadata.is_notifiable():
# Push rules for events that aren't notifiable can't be processed by this
return
# Skip push notification actions for historical messages
# because we don't want to notify people about old history back in time.
# The historical messages also do not have the proper `context.current_state_ids`
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if event.internal_metadata.is_historical():
return
# Disable counting as unread unless the experimental configuration is
# enabled, as it can cause additional (unwanted) rows to be added to the
# event_push_actions table.
count_as_unread = False
if self.hs.config.experimental.msc2654_enabled:
count_as_unread = _should_count_as_unread(event, context)
# Disable counting as unread unless the experimental configuration is
# enabled, as it can cause additional (unwanted) rows to be added to the
# event_push_actions table.
count_as_unread = False
if self.hs.config.experimental.msc2654_enabled:
count_as_unread = _should_count_as_unread(event, context)
rules_by_user = await self._get_rules_for_event(event)
actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
rules_by_user = await self._get_rules_for_event(event)
actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
room_member_count = await self.store.get_number_joined_users_in_room(
event.room_id
)
(
power_levels,
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = MAIN_TIMELINE
if relation:
relations = await self._get_mutual_relations(
relation.parent_id,
itertools.chain(*(r.rules() for r in rules_by_user.values())),
room_member_count = await self.store.get_number_joined_users_in_room(
event.room_id
)
# Recursively attempt to find the thread this event relates to.
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
else:
# Since the event has not yet been persisted we check whether
# the parent is part of a thread.
thread_id = await self.store.get_thread_id(relation.parent_id) or "main"
evaluator = PushRuleEvaluator(
_flatten_dict(event),
room_member_count,
sender_power_level,
power_levels.get("notifications", {}),
relations,
self._relations_match_enabled,
)
# For batched events the power level events may not have been persisted yet,
# so we pass in the batched events. Thus if the event cannot be found in the
# database we can check in the batch.
event_id_to_event = {e.event_id: e for e, _ in events_and_context}
(
power_levels,
sender_power_level,
) = await self._get_power_levels_and_sender_level(
event, context, event_id_to_event
)
users = rules_by_user.keys()
profiles = await self.store.get_subset_users_in_room_with_profiles(
event.room_id, users
)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = MAIN_TIMELINE
if relation:
relations = await self._get_mutual_relations(
relation.parent_id,
itertools.chain(*(r.rules() for r in rules_by_user.values())),
)
# Recursively attempt to find the thread this event relates to.
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id
else:
# Since the event has not yet been persisted we check whether
# the parent is part of a thread.
thread_id = (
await self.store.get_thread_id(relation.parent_id) or "main"
)
for uid, rules in rules_by_user.items():
if event.sender == uid:
continue
evaluator = PushRuleEvaluator(
_flatten_dict(event),
room_member_count,
sender_power_level,
power_levels.get("notifications", {}),
relations,
self._relations_match_enabled,
)
display_name = None
profile = profiles.get(uid)
if profile:
display_name = profile.display_name
users = rules_by_user.keys()
profiles = await self.store.get_subset_users_in_room_with_profiles(
event.room_id, users
)
if not display_name:
# Handle the case where we are pushing a membership event to
# that user, as they might not be already joined.
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None
for uid, rules in rules_by_user.items():
if event.sender == uid:
continue
if count_as_unread:
# Add an element for the current user if the event needs to be marked as
# unread, so that add_push_actions_to_staging iterates over it.
# If the event shouldn't be marked as unread but should notify the
# current user, it'll be added to the dict later.
actions_by_user[uid] = []
display_name = None
profile = profiles.get(uid)
if profile:
display_name = profile.display_name
actions = evaluator.run(rules, uid, display_name)
if "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions
if not display_name:
# Handle the case where we are pushing a membership event to
# that user, as they might not be already joined.
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
if not isinstance(display_name, str):
display_name = None
# If there aren't any actions then we can skip the rest of the
# processing.
if not actions_by_user:
return
if count_as_unread:
# Add an element for the current user if the event needs to be marked as
# unread, so that add_push_actions_to_staging iterates over it.
# If the event shouldn't be marked as unread but should notify the
# current user, it'll be added to the dict later.
actions_by_user[uid] = []
# This is a check for the case where user joins a room without being
# allowed to see history, and then the server receives a delayed event
# from before the user joined, which they should not be pushed for
#
# We do this *after* calculating the push actions as a) its unlikely
# that we'll filter anyone out and b) for large rooms its likely that
# most users will have push disabled and so the set of users to check is
# much smaller.
uids_with_visibility = await filter_event_for_clients_with_state(
self.store, actions_by_user.keys(), event, context
)
actions = evaluator.run(rules, uid, display_name)
if "notify" in actions:
# Push rules say we should notify the user of this event
actions_by_user[uid] = actions
for user_id in set(actions_by_user).difference(uids_with_visibility):
actions_by_user.pop(user_id, None)
# If there aren't any actions then we can skip the rest of the
# processing.
if not actions_by_user:
return
# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)
# This is a check for the case where user joins a room without being
# allowed to see history, and then the server receives a delayed event
# from before the user joined, which they should not be pushed for
#
# We do this *after* calculating the push actions as a) its unlikely
# that we'll filter anyone out and b) for large rooms its likely that
# most users will have push disabled and so the set of users to check is
# much smaller.
uids_with_visibility = await filter_event_for_clients_with_state(
self.store, actions_by_user.keys(), event, context
)
for user_id in set(actions_by_user).difference(uids_with_visibility):
actions_by_user.pop(user_id, None)
# Mark in the DB staging area the push actions for users who should be
# notified for this event. (This will then get handled when we persist
# the event)
await self.store.add_push_actions_to_staging(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)
MemberMap = Dict[str, Optional[EventIdMembership]]