1
0

Compare commits

..

3 Commits

2 changed files with 74 additions and 57 deletions

View File

@@ -62,12 +62,13 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
Requester,
RoomAlias,
StateMap,
StreamToken,
UserID,
create_requester, PersistedEventPosition,
create_requester,
)
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results
@@ -1318,7 +1319,6 @@ class EventCreationHandler:
400, "Cannot start threads from an event with a relation"
)
async def handle_create_room_events(
self,
requester: Requester,
@@ -1794,6 +1794,55 @@ class EventCreationHandler:
requester, is_admin_redaction=is_admin_redaction
)
# run checks/actions on event based on type
await self._actions_by_event_type(event, context)
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self._storage_controllers.persistence.persist_event(
event, context=context, backfilled=backfilled
)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
return event
async def _actions_by_event_type(
self, event: EventBase, context: EventContext
) -> None:
"""
Helper function to execute actions/checks based on the event type
"""
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
@@ -1814,11 +1863,13 @@ class EventCreationHandler:
original_event_id = event.unsigned.get("replaces_state")
if original_event_id:
original_event = await self.store.get_event(original_event_id)
original_alias_event = await self.store.get_event(original_event_id)
if original_event:
original_alias = original_event.content.get("alias", None)
original_alt_aliases = original_event.content.get("alt_aliases", [])
if original_alias_event:
original_alias = original_alias_event.content.get("alias", None)
original_alt_aliases = original_alias_event.content.get(
"alt_aliases", []
)
# Check the alias is currently valid (if it has changed).
room_alias_str = event.content.get("alias", None)
@@ -1996,46 +2047,6 @@ class EventCreationHandler:
errcode=Codes.INVALID_PARAM,
)
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
event,
event_pos,
max_stream_token,
) = await self._storage_controllers.persistence.persist_event(
event, context=context, backfilled=backfilled
)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
async def _notify() -> None:
try:
await self.notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception(
"Error notifying about new room event %s",
event.event_id,
)
run_in_background(_notify)
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
return event
async def _maybe_kick_guest_users(
self, event: EventBase, context: EventContext
) -> None:

View File

@@ -1057,13 +1057,16 @@ class RoomCreationHandler:
creator_id = creator.user.to_string()
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
depth = 1
# the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None
# the most recently created event
prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this as events are
# created (but not persisted to the db) to determine state for future created events
# (as this info can't be pulled from the db)
# a map of event types, state keys -> event_ids. We collect these mappings this
# as events are created (but not persisted to the db) to determine state for
# future created events (as this info can't be pulled from the db)
state_map: dict = {}
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
@@ -1148,11 +1151,12 @@ class RoomCreationHandler:
prev_event_ids=[last_sent_event_id],
depth=depth,
)
last_sent_event_id = member_event_id
# last_sent_event_id = member_event_id
prev_event = [member_event_id]
# update the depth and state map here as these are otherwise updated in 'create_event'
# the membership event has been created through a different code path
# update the depth and state map here as these are otherwise updated in
# 'create_event' the membership event has been created through a different code
# path
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
@@ -1163,7 +1167,7 @@ class RoomCreationHandler:
power_event = await create_event(EventTypes.PowerLevels, pl_content)
power_context = await self.state.compute_event_context(power_event)
current_state_group = power_context._state_group
last_sent_stream_id = await send(power_event, power_context, creator)
await send(power_event, power_context, creator)
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
@@ -1212,7 +1216,7 @@ class RoomCreationHandler:
)
pl_context = await self.state.compute_event_context(pl_event)
current_state_group = pl_context._state_group
last_sent_stream_id = await send(pl_event, pl_context, creator)
await send(pl_event, pl_context, creator)
events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
@@ -1287,9 +1291,11 @@ class RoomCreationHandler:
)
events_to_send.append((encryption_event, encryption_context))
for event, context in events_to_send:
last_sent_stream_id = await send(event, context, creator)
return last_sent_stream_id, last_sent_event_id, depth
last_event = await self.event_creation_handler.handle_create_room_events(
creator, events_to_send
)
assert last_event.internal_metadata.stream_ordering is not None
return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
def _generate_room_id(self) -> str:
"""Generates a random room ID.