1
0

Compare commits

...

12 Commits

Author SHA1 Message Date
Erik Johnston
67cd168522 Don't fetch event from remotes 2021-09-13 15:04:12 +01:00
Erik Johnston
6ec29fa2a7 Don't add reference hashes to non-v1 events 2021-09-13 15:03:22 +01:00
Erik Johnston
414c5683b9 Don't store event edges for outliers 2021-09-13 15:01:15 +01:00
Erik Johnston
5a117376e1 Merge remote-tracking branch 'origin/develop' into erikj/join_logging 2021-09-13 14:59:15 +01:00
Erik Johnston
e1750fd75d Remove batching 2021-09-08 13:14:04 +01:00
Erik Johnston
1a2113af55 Filter out leaves when joining 2021-09-08 11:09:51 +01:00
Erik Johnston
d381eae552 Merge remote-tracking branch 'origin/develop' into erikj/join_logging 2021-09-08 11:01:19 +01:00
Erik Johnston
96a1eb77f8 Defer verification to thread pool 2021-08-24 10:44:54 +01:00
Erik Johnston
717fca45b4 Add store_server_keys_json_multi 2021-08-09 11:38:38 +01:00
Erik Johnston
d3e555f90d Add spans to federation 2021-08-04 15:13:16 +01:00
Erik Johnston
20a57f5354 Add spans to persiste events 2021-08-04 15:11:56 +01:00
Erik Johnston
0ee0d7b460 Add extra spans to remote join path 2021-08-02 14:30:46 +01:00
6 changed files with 181 additions and 109 deletions

View File

@@ -43,7 +43,11 @@ from synapse.api.errors import (
from synapse.config.key import TrustedKeyServer from synapse.config.key import TrustedKeyServer
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.utils import prune_event_dict from synapse.events.utils import prune_event_dict
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import (
defer_to_thread,
make_deferred_yieldable,
run_in_background,
)
from synapse.storage.keys import FetchKeyResult from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
@@ -161,6 +165,7 @@ class Keyring:
self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None
): ):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.reactor = hs.get_reactor()
if key_fetchers is None: if key_fetchers is None:
key_fetchers = ( key_fetchers = (
@@ -288,7 +293,9 @@ class Keyring:
verify_key = key_result.verify_key verify_key = key_result.verify_key
json_object = verify_request.get_json_object() json_object = verify_request.get_json_object()
try: try:
verify_signed_json( await defer_to_thread(
self.reactor,
verify_signed_json,
json_object, json_object,
verify_request.server_name, verify_request.server_name,
verify_key, verify_key,
@@ -544,22 +551,18 @@ class BaseV2KeyFetcher(KeyFetcher):
key_json_bytes = encode_canonical_json(response_json) key_json_bytes = encode_canonical_json(response_json)
await make_deferred_yieldable( await self.store.store_server_keys_json_multi(
defer.gatherResults(
[ [
run_in_background( (
self.store.store_server_keys_json, server_name,
server_name=server_name, key_id,
key_id=key_id, from_server,
from_server=from_server, time_added_ms,
ts_now_ms=time_added_ms, ts_valid_until_ms,
ts_expires_ms=ts_valid_until_ms, key_json_bytes,
key_json_bytes=key_json_bytes,
) )
for key_id in verify_keys for key_id in verify_keys
], ],
consumeErrors=True,
).addErrback(unwrapFirstError)
) )
return verify_keys return verify_keys

View File

@@ -56,6 +56,7 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, builder from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.transport.client import SendJoinResponse from synapse.federation.transport.client import SendJoinResponse
from synapse.logging.opentracing import start_active_span
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.types import JsonDict, get_domain_from_id from synapse.types import JsonDict, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
@@ -465,18 +466,18 @@ class FederationClient(FederationBase):
pdu.event_id, allow_rejected=True, allow_none=True pdu.event_id, allow_rejected=True, allow_none=True
) )
pdu_origin = get_domain_from_id(pdu.sender) # pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin: # if not res and pdu_origin != origin:
try: # try:
res = await self.get_pdu( # res = await self.get_pdu(
destinations=[pdu_origin], # destinations=[pdu_origin],
event_id=pdu.event_id, # event_id=pdu.event_id,
room_version=room_version, # room_version=room_version,
outlier=outlier, # outlier=outlier,
timeout=10000, # timeout=10000,
) # )
except SynapseError: # except SynapseError:
pass # pass
if not res: if not res:
logger.warning( logger.warning(
@@ -754,6 +755,7 @@ class FederationClient(FederationBase):
""" """
async def send_request(destination) -> SendJoinResult: async def send_request(destination) -> SendJoinResult:
with start_active_span("_do_send_join"):
response = await self._do_send_join(room_version, destination, pdu) response = await self._do_send_join(room_version, destination, pdu)
# If an event was returned (and expected to be returned): # If an event was returned (and expected to be returned):
@@ -804,6 +806,34 @@ class FederationClient(FederationBase):
% (create_room_version,) % (create_room_version,)
) )
logger.info("Got from send_join %d events", len(state) + len(auth_chain))
with start_active_span("filter_auth_chain"):
event_map = {e.event_id: e for e in auth_chain}
state = [
e
for e in state
if e.type != EventTypes.Member or e.membership != Membership.LEAVE
]
roots = list(state)
new_auth_chain_ids = set()
while roots:
e = roots.pop()
for aid in e.auth_event_ids():
if aid in new_auth_chain_ids:
continue
a = event_map.get(aid)
if a:
roots.append(a)
new_auth_chain_ids.add(aid)
auth_chain = [event_map[aid] for aid in new_auth_chain_ids]
logger.info( logger.info(
"Processing from send_join %d events", len(state) + len(auth_chain) "Processing from send_join %d events", len(state) + len(auth_chain)
) )
@@ -814,6 +844,7 @@ class FederationClient(FederationBase):
valid_pdus_map: Dict[str, EventBase] = {} valid_pdus_map: Dict[str, EventBase] = {}
async def _execute(pdu: EventBase) -> None: async def _execute(pdu: EventBase) -> None:
with start_active_span("_check_sigs_and_hash_and_fetch_one"):
valid_pdu = await self._check_sigs_and_hash_and_fetch_one( valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu, pdu=pdu,
origin=destination, origin=destination,
@@ -824,6 +855,7 @@ class FederationClient(FederationBase):
if valid_pdu: if valid_pdu:
valid_pdus_map[valid_pdu.event_id] = valid_pdu valid_pdus_map[valid_pdu.event_id] = valid_pdu
with start_active_span("check_sigs"):
await concurrently_execute( await concurrently_execute(
_execute, itertools.chain(state, auth_chain), 10000 _execute, itertools.chain(state, auth_chain), 10000
) )

View File

@@ -52,6 +52,7 @@ from synapse.logging.context import (
preserve_fn, preserve_fn,
run_in_background, run_in_background,
) )
from synapse.logging.opentracing import start_active_span
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.replication.http.federation import ( from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet, ReplicationCleanRoomRestServlet,
@@ -452,6 +453,7 @@ class FederationHandler(BaseHandler):
logger.debug("Joining %s to %s", joinee, room_id) logger.debug("Joining %s to %s", joinee, room_id)
with start_active_span("make_join"):
origin, event, room_version_obj = await self._make_and_verify_event( origin, event, room_version_obj = await self._make_and_verify_event(
target_hosts, target_hosts,
room_id, room_id,
@@ -480,6 +482,7 @@ class FederationHandler(BaseHandler):
except ValueError: except ValueError:
pass pass
with start_active_span("send_join"):
ret = await self.federation_client.send_join( ret = await self.federation_client.send_join(
host_list, event, room_version_obj host_list, event, room_version_obj
) )
@@ -510,6 +513,7 @@ class FederationHandler(BaseHandler):
auth_events=auth_chain, auth_events=auth_chain,
) )
with start_active_span("_persist_auth_tree"):
max_stream_id = await self._persist_auth_tree( max_stream_id = await self._persist_auth_tree(
origin, room_id, auth_chain, state, event, room_version_obj origin, room_id, auth_chain, state, event, room_version_obj
) )
@@ -1139,6 +1143,7 @@ class FederationHandler(BaseHandler):
if e_id not in event_map: if e_id not in event_map:
missing_auth_events.add(e_id) missing_auth_events.add(e_id)
with start_active_span("fetching.missing_auth_events"):
for e_id in missing_auth_events: for e_id in missing_auth_events:
m_ev = await self.federation_client.get_pdu( m_ev = await self.federation_client.get_pdu(
[origin], [origin],
@@ -1152,6 +1157,7 @@ class FederationHandler(BaseHandler):
else: else:
logger.info("Failed to find auth event %r", e_id) logger.info("Failed to find auth event %r", e_id)
with start_active_span("authing_events"):
for e in itertools.chain(auth_events, state, [event]): for e in itertools.chain(auth_events, state, [event]):
auth_for_e = { auth_for_e = {
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
@@ -1177,6 +1183,7 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
if auth_events or state: if auth_events or state:
with start_active_span("persist_events_and_notify.state"):
await self._federation_event_handler.persist_events_and_notify( await self._federation_event_handler.persist_events_and_notify(
room_id, room_id,
[ [

View File

@@ -33,10 +33,11 @@ from prometheus_client import Counter
import synapse.metrics import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import RoomVersions from synapse.api.room_versions import EventFormatVersions, RoomVersions
from synapse.crypto.event_signing import compute_event_reference_hash from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.logging.opentracing import start_active_span
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import DatabasePool, LoggingTransaction
@@ -241,7 +242,7 @@ class PersistEventsStore:
txn.execute(sql + clause, args) txn.execute(sql + clause, args)
results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed")) results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 1000000000000000):
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
) )
@@ -304,7 +305,7 @@ class PersistEventsStore:
to_recursively_check.append(prev_event_id) to_recursively_check.append(prev_event_id)
existing_prevs.add(prev_event_id) existing_prevs.add(prev_event_id)
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 100000000000000000):
await self.db_pool.runInteraction( await self.db_pool.runInteraction(
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
) )
@@ -382,6 +383,7 @@ class PersistEventsStore:
# Insert into event_to_state_groups. # Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts) self._store_event_state_mappings_txn(txn, events_and_contexts)
with start_active_span("_persist_event_auth_chain_txn"):
self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts]) self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
# _store_rejected_events_txn filters out any events which were # _store_rejected_events_txn filters out any events which were
@@ -393,6 +395,7 @@ class PersistEventsStore:
# From this point onwards the events are only ones that weren't # From this point onwards the events are only ones that weren't
# rejected. # rejected.
with start_active_span("_update_metadata_tables_txn"):
self._update_metadata_tables_txn( self._update_metadata_tables_txn(
txn, txn,
events_and_contexts=events_and_contexts, events_and_contexts=events_and_contexts,
@@ -1298,6 +1301,8 @@ class PersistEventsStore:
}, },
) )
self._handle_mult_prev_events(txn, [event])
sql = "UPDATE events SET outlier = ? WHERE event_id = ?" sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id)) txn.execute(sql, (False, event.event_id))
@@ -1446,7 +1451,11 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove] return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _update_metadata_tables_txn( def _update_metadata_tables_txn(
self, txn, events_and_contexts, all_events_and_contexts, backfilled self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
all_events_and_contexts,
backfilled,
): ):
"""Update all the miscellaneous tables for new events """Update all the miscellaneous tables for new events
@@ -1537,7 +1546,12 @@ class PersistEventsStore:
# Insert event_reference_hashes table. # Insert event_reference_hashes table.
self._store_event_reference_hashes_txn( self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts] txn,
[
event
for event, _ in events_and_contexts
if event.format_version == EventFormatVersions.V1
],
) )
# Prefill the event cache # Prefill the event cache
@@ -2120,7 +2134,7 @@ class PersistEventsStore:
values={"min_depth": depth}, values={"min_depth": depth},
) )
def _handle_mult_prev_events(self, txn, events): def _handle_mult_prev_events(self, txn, events: Iterable[EventBase]):
""" """
For the given event, update the event edges table and forward and For the given event, update the event edges table and forward and
backward extremities tables. backward extremities tables.
@@ -2137,6 +2151,7 @@ class PersistEventsStore:
} }
for ev in events for ev in events
for e_id in ev.prev_event_ids() for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
], ],
) )

View File

@@ -138,6 +138,19 @@ class KeyStore(SQLBaseStore):
for i in invalidations: for i in invalidations:
invalidate((i,)) invalidate((i,))
async def store_server_keys_json_multi(
self,
entries: List[Tuple[str, str, str, int, int, bytes]],
):
await self.db_pool.simple_upsert_many(
table="server_keys_json",
key_names=("server_name", "key_id", "from_server"),
key_values=[e[:3] for e in entries],
value_names=("ts_added_ms", "ts_valid_until_ms", "key_json"),
value_values=[(e[3], e[4], db_binary_type(e[5])) for e in entries],
desc="store_server_keys_json_multi",
)
async def store_server_keys_json( async def store_server_keys_json(
self, self,
server_name: str, server_name: str,

View File

@@ -412,8 +412,8 @@ class EventsPersistenceStorage:
return replaced_events return replaced_events
chunks = [ chunks = [
events_and_contexts[x : x + 100] events_and_contexts[x : x + 100000000]
for x in range(0, len(events_and_contexts), 100) for x in range(0, len(events_and_contexts), 10000000)
] ]
for chunk in chunks: for chunk in chunks:
@@ -445,7 +445,9 @@ class EventsPersistenceStorage:
potentially_left_users: Set[str] = set() potentially_left_users: Set[str] = set()
if not backfilled: if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"): with Measure(
self._clock, "_calculate_state_and_extrem"
), opentracing.start_active_span("_calculate_state_and_extrem"):
# Work out the new "current state" for each room. # Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then # We do this by working out what the new extremities are and then
# calculating the state from that. # calculating the state from that.