1
0

More tracing for federated side

This commit is contained in:
Eric Eastwood
2022-08-05 20:44:21 -05:00
parent a7eabb78a2
commit 13855c5916
8 changed files with 116 additions and 52 deletions

28
poetry.lock generated
View File

@@ -1267,17 +1267,22 @@ telegram = ["requests"]
[[package]]
name = "treq"
version = "15.1.0"
description = "A requests-like API built on top of twisted.web's Agent"
version = "22.2.0"
description = "High-level Twisted HTTP Client API"
category = "main"
optional = false
python-versions = "*"
python-versions = ">=3.6"
[package.dependencies]
pyOpenSSL = {version = ">=0.15.1", markers = "python_version > \"3.0\""}
attrs = "*"
hyperlink = ">=21.0.0"
incremental = "*"
requests = ">=2.1.0"
service_identity = ">=14.0.0"
Twisted = {version = ">=15.5.0", markers = "python_version > \"3.0\""}
Twisted = {version = ">=18.7.0", extras = ["tls"]}
[package.extras]
dev = ["pep8", "pyflakes", "httpbin (==0.5.0)"]
docs = ["sphinx (>=1.4.8)"]
[[package]]
name = "twine"
@@ -1313,7 +1318,10 @@ attrs = ">=19.2.0"
Automat = ">=0.8.0"
constantly = ">=15.1"
hyperlink = ">=17.1.1"
idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""}
incremental = ">=21.3.0"
pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
typing-extensions = ">=3.6.5"
"zope.interface" = ">=4.4.2"
@@ -1339,7 +1347,7 @@ windows_platform = ["pywin32 (!=226)", "cython-test-exception-raiser (>=1.0.2,<2
type = "git"
url = "https://github.com/twisted/twisted.git"
reference = "trunk"
resolved_reference = "ff2ea6181f7ca4887adbaf4158b2fe0891e17ef9"
resolved_reference = "b249a121afffefa3d9d9ab5b7a1315c5a1bb454d"
[[package]]
name = "twisted-iocpsupport"
@@ -1615,7 +1623,7 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
content-hash = "c2cfbb348a49e088c404148c1b682fc5af5abb6278cf4479c6a51fff1656328c"
content-hash = "94116a568c9ab41174ec66c60cb0cb783e349bf586352b1fab08c714e5191665"
[metadata.files]
attrs = [
@@ -2642,8 +2650,8 @@ tqdm = [
{file = "tqdm-4.63.0.tar.gz", hash = "sha256:1d9835ede8e394bb8c9dcbffbca02d717217113adc679236873eeaac5bc0b3cd"},
]
treq = [
{file = "treq-15.1.0-py2.py3-none-any.whl", hash = "sha256:1ad1ba89ddc62ae877084b290bd327755b13f6e7bc7076dc4d8e2efb701bfd63"},
{file = "treq-15.1.0.tar.gz", hash = "sha256:425a47d5d52a993d51211028fb6ade252e5fbea094e878bb4b644096a7322de8"},
{file = "treq-22.2.0-py3-none-any.whl", hash = "sha256:27d95b07c5c14be3e7b280416139b036087617ad5595be913b1f9b3ce981b9b2"},
{file = "treq-22.2.0.tar.gz", hash = "sha256:df757e3f141fc782ede076a604521194ffcb40fa2645cf48e5a37060307f52ec"},
]
twine = [
{file = "twine-3.8.0-py3-none-any.whl", hash = "sha256:d0550fca9dc19f3d5e8eadfce0c227294df0a2a951251a4385797c8a6198b7c8"},

View File

@@ -119,7 +119,7 @@ signedjson = "^1.1.0"
service-identity = ">=18.1.0"
# Twisted 18.9 introduces some logger improvements that the structured
# logger utilises
twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"}
twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk", extras = ["tls"]}
treq = ">=15.1"
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
pyOpenSSL = ">=16.0.0"

View File

@@ -59,7 +59,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
from synapse.logging.tracing import trace
from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
@@ -410,6 +410,7 @@ class FederationEventHandler:
prev_member_event,
)
@trace
async def process_remote_join(
self,
origin: str,
@@ -753,6 +754,7 @@ class FederationEventHandler:
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
@tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
@@ -854,6 +856,7 @@ class FederationEventHandler:
else:
raise
@trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
@@ -970,6 +973,8 @@ class FederationEventHandler:
event, state_ids_before_event=state_map, partial_state=partial_state
)
@trace
@tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
@@ -1112,6 +1117,8 @@ class FederationEventHandler:
return state_map
@trace
@tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1133,6 +1140,7 @@ class FederationEventHandler:
destination=destination, room_id=room_id, event_ids=(event_id,)
)
@trace
async def _process_received_pdu(
self,
origin: str,
@@ -1283,6 +1291,7 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
@trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
@@ -1414,6 +1423,8 @@ class FederationEventHandler:
return event_from_response
@trace
@tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
@@ -1459,6 +1470,7 @@ class FederationEventHandler:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
@trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
@@ -1477,6 +1489,11 @@ class FederationEventHandler:
"""
event_map = {event.event_id: event for event in events}
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
event_map.keys(),
)
# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
@@ -1593,6 +1610,7 @@ class FederationEventHandler:
backfilled=True,
)
@trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
@@ -1631,6 +1649,9 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
set_attribute(
"claimed_auth_events", [ev.event_id for ev in claimed_auth_events]
)
# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1749,7 @@ class FederationEventHandler:
)
context.rejected = RejectedReason.AUTH_ERROR
@trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
@@ -1935,6 +1957,8 @@ class FederationEventHandler:
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
@trace
@tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1963,6 +1987,7 @@ class FederationEventHandler:
await self._auth_and_persist_outliers(room_id, remote_auth_events)
@trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
@@ -2008,6 +2033,7 @@ class FederationEventHandler:
await self._store.remove_push_actions_from_staging(event.event_id)
raise
@trace
async def persist_events_and_notify(
self,
room_id: str,

View File

@@ -281,6 +281,16 @@ class SynapseTags:
# The name of the external cache
CACHE_NAME = "cache.name"
# Used to tag function arguments
#
# Tag a named arg. The name of the argument should be appended to this
# prefix
FUNC_ARG_PREFIX = "ARG."
# Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`)
FUNC_ARGS = "args"
# Tag keyword args
FUNC_KWARGS = "kwargs"
class SynapseBaggage:
FORCE_TRACING = "synapse-force-tracing"
@@ -790,30 +800,28 @@ def extract_text_map(
# Tracing decorators
def create_decorator(
def _custom_sync_async_decorator(
func: Callable[P, R],
# TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here
wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]],
) -> Callable[P, R]:
"""
Creates a decorator that is able to handle sync functions, async functions
(coroutines), and inlineDeferred from Twisted.
Decorates a function that is sync or async (coroutines), or that returns a Twisted
`Deferred`. The custom business logic of the decorator goes in `wrapping_logic`.
Example usage:
```py
# Decorator to time the functiona and log it out
# Decorator to time the function and log it out
def duration(func: Callable[P, R]) -> Callable[P, R]:
@contextlib.contextmanager
def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]:
start_ts = time.time()
yield
end_ts = time.time()
duration = end_ts - start_ts
logger.info("%s took %s seconds", func.__name__, duration)
return create_decorator(func, _wrapping_logic)
try:
yield
finally:
end_ts = time.time()
duration = end_ts - start_ts
logger.info("%s took %s seconds", func.__name__, duration)
return _custom_sync_async_decorator(func, _wrapping_logic)
```
Args:
func: The function to be decorated
wrapping_logic: The business logic of your custom decorator.
@@ -821,14 +829,18 @@ def create_decorator(
before/after the function as desired.
"""
@wraps(func)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
if inspect.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
@wraps(func)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
with wrapping_logic(func, *args, **kwargs):
return await func(*args, **kwargs)
else:
# The other case here handles both sync functions and those
# decorated with inlineDeferred.
return await func(*args, **kwargs) # type: ignore[misc]
else:
# The other case here handles both sync functions and those
# decorated with inlineDeferred.
@wraps(func)
def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
scope = wrapping_logic(func, *args, **kwargs)
scope.__enter__()
@@ -866,7 +878,11 @@ def create_decorator(
return _wrapper # type: ignore[return-value]
def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
def trace_with_opname(
opname: str,
*,
tracer: Optional["opentelemetry.trace.Tracer"] = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
Decorator to trace a function with a custom opname.
@@ -875,21 +891,14 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]
@contextlib.contextmanager
def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
if opentelemetry is None:
return None
scope = start_active_span(opname)
scope.__enter__()
try:
with start_active_span(opname, tracer=tracer):
yield
except Exception as e:
scope.__exit__(type(e), None, e.__traceback__)
raise
finally:
scope.__exit__(None, None, None)
def _decorator(func: Callable[P, R]):
return create_decorator(func, _wrapping_logic)
if not opentelemetry:
return func
return _custom_sync_async_decorator(func, _wrapping_logic)
return _decorator
@@ -918,12 +927,12 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
argspec = inspect.getfullargspec(func)
for i, arg in enumerate(args[1:]):
set_attribute("ARG_" + argspec.args[i + 1], str(arg)) # type: ignore[index]
set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index]
set_attribute("kwargs", str(kwargs))
set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i + 1], str(arg)) # type: ignore[index]
set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index]
set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs))
yield
return create_decorator(func, _wrapping_logic)
return _custom_sync_async_decorator(func, _wrapping_logic)
@contextlib.contextmanager

View File

@@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
from synapse.logging.tracing import (
Link,
get_active_span,
set_attribute,
start_active_span,
trace,
SynapseTags,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@@ -383,6 +390,12 @@ class EventsPersistenceStorageController:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
[e.event_id for e, _ in events_and_contexts],
)
set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
@@ -781,7 +794,7 @@ class EventsPersistenceStorageController:
stale_forward_extremities_counter.observe(len(stale))
return result
async def _get_new_state_after_events(
self,
room_id: str,

View File

@@ -1381,6 +1381,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
@trace
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",

View File

@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.tracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -145,6 +146,7 @@ class PersistEventsStore:
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
@trace
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],

View File

@@ -54,6 +54,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.logging.tracing import trace, tag_args
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -394,6 +395,8 @@ class EventsWorkerStore(SQLBaseStore):
return event
@trace
@tag_args
async def get_events(
self,
event_ids: Collection[str],
@@ -1363,6 +1366,8 @@ class EventsWorkerStore(SQLBaseStore):
return {r["event_id"] for r in rows}
@trace
@tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
) -> Set[str]: