1
0

More debugging

This commit is contained in:
Eric Eastwood
2022-09-24 04:16:56 -05:00
parent 33d12a516f
commit 78b44340d6
2 changed files with 142 additions and 8 deletions

View File

@@ -178,6 +178,7 @@ class FederationEventHandler:
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@trace
async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
"""Process a PDU received via a federation /send/ transaction
@@ -663,11 +664,12 @@ class FederationEventHandler:
logger.info(
"backfill assumed reverse_chronological_events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
"event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in reverse_chronological_events
@@ -677,11 +679,12 @@ class FederationEventHandler:
# logger.info(
# "backfill chronological_events=%s",
# [
# "event_id=%s,depth=%d,body=%s,prevs=%s\n"
# "event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
# % (
# event.event_id,
# event.depth,
# event.content.get("body", event.type),
# getattr(event, "state_key", None),
# event.prev_event_ids(),
# )
# for event in chronological_events
@@ -712,8 +715,8 @@ class FederationEventHandler:
# Expecting to persist in chronological order here (oldest ->
# newest) so that events are persisted before they're referenced
# as a `prev_event`.
# chronological_events,
reverse_chronological_events,
chronological_events,
# reverse_chronological_events,
backfilled=True,
)
@@ -848,11 +851,12 @@ class FederationEventHandler:
"processing pulled backfilled=%s events=%s",
backfilled,
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
"event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in events
@@ -866,11 +870,12 @@ class FederationEventHandler:
logger.info(
"backfill sorted_events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
"event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in sorted_events
@@ -1872,6 +1877,12 @@ class FederationEventHandler:
# already have checked we have all the auth events, in
# _load_or_fetch_auth_events_for_event above)
if context.partial_state:
logger.info(
"_check_event_auth(event=%s) with partial_state - %s (%s)",
event.event_id,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
room_version = await self._store.get_room_version_id(event.room_id)
local_state_id_map = await context.get_prev_state_ids()
@@ -1889,15 +1900,38 @@ class FederationEventHandler:
)
)
else:
logger.info(
"_check_event_auth(event=%s) with full state - %s (%s)",
event.event_id,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
event_types = event_auth.auth_types_for_event(event.room_version, event)
state_for_auth_id_map = await context.get_prev_state_ids(
StateFilter.from_types(event_types)
)
logger.info(
"_check_event_auth(event=%s) state_for_auth_id_map=%s - %s (%s)",
event.event_id,
state_for_auth_id_map,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
calculated_auth_event_ids = self._event_auth_handler.compute_auth_events(
event, state_for_auth_id_map, for_verification=True
)
logger.info(
"_check_event_auth(event=%s) claimed_auth_events=%s calculated_auth_event_ids=%s - %s (%s)",
event.event_id,
event.auth_event_ids(),
calculated_auth_event_ids,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
# if those are the same, we're done here.
if collections.Counter(event.auth_event_ids()) == collections.Counter(
calculated_auth_event_ids

View File

@@ -36,11 +36,13 @@ import attr
from frozendict import frozendict
from prometheus_client import Counter, Histogram
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.tracing import SynapseTags, log_kv, trace, tag_args, set_attribute
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
@@ -250,6 +252,8 @@ class StateHandler:
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
return await self.store.get_joined_hosts(room_id, state, entry)
@trace
@tag_args
async def compute_event_context(
self,
event: EventBase,
@@ -282,6 +286,14 @@ class StateHandler:
RuntimeError if `state_ids_before_event` is not provided and one or more
prev events are missing or outliers.
"""
set_attribute(
SynapseTags.RESULT_PREFIX + "event_type_and_state",
f"{event.type} - {getattr(event, 'state_key', None)}",
)
set_attribute(
SynapseTags.RESULT_PREFIX + "event_body",
event.content.get("body", None),
)
assert not event.internal_metadata.is_outlier()
@@ -289,6 +301,14 @@ class StateHandler:
# first of all, figure out the state before the event, unless we
# already have it.
#
logger.info(
"compute_event_context(event=%s, state_ids_before_event=%s) - %s (%s)",
event.event_id,
state_ids_before_event,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
if state_ids_before_event:
# if we're given the state before the event, then we use that
state_group_before_event_prev_group = None
@@ -304,6 +324,12 @@ class StateHandler:
current_state_ids=state_ids_before_event,
)
)
log_kv(
{
"message": "Using state before event because `state_ids_before_event` was given:",
"state_group_before_event": state_group_before_event,
}
)
# the partial_state flag must be provided
assert partial_state is not None
@@ -324,7 +350,7 @@ class StateHandler:
)
partial_state = any(incomplete_prev_events.values())
if partial_state:
logger.debug(
logger.info(
"New/incoming event %s refers to prev_events %s with partial state",
event.event_id,
[k for (k, v) in incomplete_prev_events.items() if v],
@@ -343,6 +369,24 @@ class StateHandler:
deltas_to_state_group_before_event = entry.delta_ids
state_ids_before_event = None
logger.info(
"compute_event_context(event=%s) entry.state_group=%s state_group_before_event_prev_group=%s deltas_to_state_group_before_event=%s - %s (%s)",
event.event_id,
entry.state_group,
state_group_before_event_prev_group,
deltas_to_state_group_before_event,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
log_kv(
{
"message": "resolve_state_groups_for_events",
"entry.state_group": entry.state_group,
"state_group_before_event_prev_group": state_group_before_event_prev_group,
"deltas_to_state_group_before_event": deltas_to_state_group_before_event,
}
)
# We make sure that we have a state group assigned to the state.
if entry.state_group is None:
# store_state_group requires us to have either a previous state group
@@ -352,6 +396,12 @@ class StateHandler:
state_ids_before_event = await entry.get_state(
self._state_storage_controller, StateFilter.all()
)
log_kv(
{
"message": "state_group_before_event_prev_group was None so get state_ids_before_event",
"state_ids_before_event": state_ids_before_event,
}
)
state_group_before_event = (
await self._state_storage_controller.store_state_group(
@@ -363,15 +413,27 @@ class StateHandler:
)
)
entry.set_state_group(state_group_before_event)
log_kv(
{
"message": "entry.set_state_group(state_group_before_event)",
"state_group_before_event": state_group_before_event,
}
)
else:
state_group_before_event = entry.state_group
log_kv(
{
"message": "Entry already has a state_group",
"state_group_before_event": state_group_before_event,
}
)
#
# now if it's not a state event, we're done
#
if not event.is_state():
return EventContext.with_state(
event_context = EventContext.with_state(
storage=self._storage_controllers,
state_group_before_event=state_group_before_event,
state_group=state_group_before_event,
@@ -381,6 +443,22 @@ class StateHandler:
partial_state=partial_state,
)
state_for_auth_id_map = await event_context.get_prev_state_ids(
StateFilter.from_types(
event_auth.auth_types_for_event(event.room_version, event)
)
)
log_kv(
{
"message": "Done creating context for non-state event",
"state_for_auth_id_map from event_context": str(
state_for_auth_id_map
),
}
)
return event_context
#
# otherwise, we'll need to create a new state group for after the event
#
@@ -421,6 +499,7 @@ class StateHandler:
)
@measure_func()
@trace
async def resolve_state_groups_for_events(
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
) -> _StateCacheEntry:
@@ -448,6 +527,8 @@ class StateHandler:
state_group_ids = state_groups.values()
log_kv({"state_group_ids": state_group_ids, "state_groups": state_groups})
# check if each event has same state group id, if so there's no state to resolve
state_group_ids_set = set(state_group_ids)
if len(state_group_ids_set) == 1:
@@ -458,6 +539,13 @@ class StateHandler:
) = await self._state_storage_controller.get_state_group_delta(
state_group_id
)
log_kv(
{
"message": "Returning state_group_id",
"state_group_id": state_group_id,
"prev_group": prev_group,
}
)
return _StateCacheEntry(
state=None,
state_group=state_group_id,
@@ -465,6 +553,11 @@ class StateHandler:
delta_ids=delta_ids,
)
elif len(state_group_ids_set) == 0:
log_kv(
{
"message": "Returning empty state group since there are no state_group_ids",
}
)
return _StateCacheEntry(state={}, state_group=None)
room_version = await self.store.get_room_version_id(room_id)
@@ -480,6 +573,13 @@ class StateHandler:
None,
state_res_store=StateResolutionStore(self.store),
)
log_kv(
{
"message": "Resolving state groups and returning result",
"state_to_resolve": state_to_resolve,
"result": result,
}
)
return result
async def update_current_state(self, room_id: str) -> None: