1
0

Compare commits

...

130 Commits

Author SHA1 Message Date
Eric Eastwood c5b1ce8426 Log maria membership event 2022-10-12 19:45:11 -05:00
Eric Eastwood 0b104e387b Ordering off because not setting stream_ordering 2022-10-12 19:20:27 -05:00
Eric Eastwood 8a69706a50 Add stream_ordering to debug string 2022-10-12 19:08:21 -05:00
Eric Eastwood 3f8fef2beb Explain why auth/state necessary here 2022-09-30 00:47:13 -05:00
Eric Eastwood 8f8c1a08d8 WIP: Connect state_chain to prev_event and the batch to the state_chain so everyhting is valid
We are going to lose the benefit of keeping the join noise out of the timeline.
And will probably have to hide "historical" state on the client.
2022-09-29 04:12:24 -05:00
Eric Eastwood 5faebbdfcf Seems to work with Maria 2022-09-29 02:25:42 -05:00
Eric Eastwood 61c1296703 Working once you connect the floating insertion event 2022-09-29 01:53:16 -05:00
Eric Eastwood 1ed0276abf Simplify case more (no more alice) 2022-09-28 21:29:29 -05:00
Eric Eastwood aaa9679ae1 Simplify case 2022-09-28 21:27:04 -05:00
Eric Eastwood cfa5e57aeb Try chronolgoical which rejects the historical 2022-09-28 21:23:47 -05:00
Eric Eastwood 50b11cb538 I think working same as Complement reverse_chronological, only the insertion event rejected 2022-09-28 21:22:33 -05:00
Eric Eastwood 4dcb2f61f4 Align more to Complement test which does pass 2022-09-28 19:09:29 -05:00
Eric Eastwood 62f35ead9e Show extra unepexpected events 2022-09-28 18:33:17 -05:00
Eric Eastwood 20f4d1c4f9 Use event_id to compare 2022-09-27 03:10:37 -05:00
Eric Eastwood 6423938350 Better assertion message 2022-09-27 03:03:46 -05:00
Eric Eastwood 7a3ded017c Test running but no expected results yet 2022-09-27 02:05:43 -05:00
Eric Eastwood 31e2c10355 Non-working test 2022-09-27 01:48:43 -05:00
Eric Eastwood 85451b99c2 Merge branch 'madlittlemods/13356-messages-investigation-scratch-v1' into maddlittlemods/msc2716-many-batches-optimization
Conflicts:
	synapse/handlers/federation.py
	synapse/storage/databases/main/cache.py
	synapse/storage/databases/main/event_federation.py
2022-09-26 15:28:14 -05:00
Eric Eastwood d8899e45a7 Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madlittlemods/13356-messages-investigation-scratch-v1 2022-09-26 15:06:30 -05:00
Eric Eastwood e4b9898dec Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry 2022-09-26 15:04:08 -05:00
Eric Eastwood 78b44340d6 More debugging 2022-09-24 04:16:56 -05:00
Eric Eastwood 33d12a516f Debugging 2022-09-24 01:29:11 -05:00
Eric Eastwood 44e97465e9 Merge branch 'madlittlemods/13856-fix-have-seen-events-not-being-invalidated' into maddlittlemods/msc2716-many-batches-optimization
Conflicts:
	tests/storage/databases/main/test_events_worker.py
2022-09-23 00:16:50 -05:00
Eric Eastwood f8dc17b539 Add test to ensure the safety works 2022-09-22 23:05:51 -05:00
Eric Eastwood b9be6c5b5d Raise exception so we don't run into this arg mismatch again 2022-09-22 22:48:58 -05:00
Eric Eastwood 4fa8f05344 Add test to make sure we can actually clear entries just by room_id 2022-09-22 22:28:56 -05:00
Eric Eastwood 9fb750dc2f Better changelog 2022-09-22 21:56:57 -05:00
Eric Eastwood 5b9b645400 Add test description 2022-09-22 21:51:51 -05:00
Eric Eastwood 0cdc7bf432 Fix @cachedList on _have_seen_events_dict
As mentioned by @erikjohnston,
https://github.com/matrix-org/synapse/issues/13865#issuecomment-1254751569
2022-09-22 17:36:54 -05:00
Eric Eastwood 2162ab5607 Invalidate cache like #13796
Copying what https://github.com/matrix-org/synapse/pull/13796
is doing
2022-09-22 17:18:34 -05:00
Eric Eastwood 1054f91e92 Merge branch 'develop' into madlittlemods/13856-fix-have-seen-events-not-being-invalidated 2022-09-22 17:15:44 -05:00
Eric Eastwood 5a5c3246f4 Scratch try different orders just to see how the tests pass differently 2022-09-22 14:26:58 -05:00
Eric Eastwood a25821d160 Try sort backfill points by tie breaking on stream_ordering 2022-09-22 01:03:35 -05:00
Eric Eastwood b23b3e4b29 Calculate the stream_ordering from newest -> oldest (in the correct order) and persist in the oldest -> newest to get the least missing prev_event fetch thrashing 2022-09-21 23:32:56 -05:00
Eric Eastwood f2a5c70b88 Assert is not None 2022-09-21 21:29:35 -05:00
Eric Eastwood d07947d0d3 Fix have_seen_event cache not being invalidated
Fix https://github.com/matrix-org/synapse/issues/13856

`_invalidate_caches_for_event` doesn't run in monolith mode which
means we never even tried to clear the `have_seen_event` and other
caches. And even in worker mode, it only runs on the workers, not
the master (AFAICT).

Additionally there is bug with the key being wrong so
`_invalidate_caches_for_event` never invalidates the
`have_seen_event` cache even when it does run.

Wrong:
```py
self.have_seen_event.invalidate((room_id, event_id))
```

Correct:
```py
self.have_seen_event.invalidate(((room_id, event_id),))
```
2022-09-21 20:33:11 -05:00
Eric Eastwood f6393db0da Add changelog 2022-09-21 18:16:09 -05:00
Eric Eastwood a847a35921 Fix have_seen_event cache not being invalidated
Fix https://github.com/matrix-org/synapse/issues/13856

`_invalidate_caches_for_event` doesn't run in monolith mode which
means we never even tried to clear the `have_seen_event` and other
caches. And even in worker mode, it only runs on the workers, not
the master (AFAICT).

Additionally there is bug with the key being wrong so
`_invalidate_caches_for_event` never invalidates the
`have_seen_event` cache even when it does run.

Wrong:
```py
self.have_seen_event.invalidate((room_id, event_id))
```

Correct:
```py
self.have_seen_event.invalidate(((room_id, event_id),))
```
2022-09-21 18:03:57 -05:00
Eric Eastwood 05e511368b Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madlittlemods/13356-messages-investigation-scratch-v1
Conflicts:
	poetry.lock
	synapse/handlers/federation.py
2022-09-20 18:04:30 -05:00
Eric Eastwood b86869feef Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	.github/workflows/tests.yml
	poetry.lock
	synapse/storage/schema/__init__.py
2022-09-20 18:00:28 -05:00
Eric Eastwood 84f91e36f3 Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	synapse/storage/schema/__init__.py
2022-09-14 15:36:57 -05:00
Eric Eastwood a027c6e9fe Maybe fix positional argument mismatch for DummyLink
Hopefully fix:
```
  File "/home/runner/work/synapse/synapse/synapse/storage/controllers/persist_events.py", line 246, in add_to_queue
    links=[Link(end_item.tracing_span_context)],
builtins.TypeError: __init__() takes 1 positional argument but 2 were given
```
2022-09-14 01:59:37 -05:00
Eric Eastwood b77d49f441 Hopefully fix problem when OTEL not installed with non recording span
```
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/synapse/http/server.py", line 306, in _async_render_wrapper
    callback_return = await self._async_render(request)
  File "/usr/local/lib/python3.9/site-packages/synapse/http/server.py", line 512, in _async_render
    callback_return = await raw_callback_return
  File "/usr/local/lib/python3.9/site-packages/synapse/federation/transport/server/_base.py", line 357, in new_func
    remote_parent_span = create_non_recording_span()
  File "/usr/local/lib/python3.9/site-packages/synapse/logging/tracing.py", line 502, in create_non_recording_span
    return opentelemetry.trace.NonRecordingSpan(
AttributeError: 'NoneType' object has no attribute 'trace'
```
2022-09-13 16:53:14 -05:00
Eric Eastwood 19c6f6ecc9 Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	synapse/storage/schema/__init__.py
2022-09-13 16:26:53 -05:00
Eric Eastwood ed11237fe5 Remove linting from CI for now 2022-09-13 08:58:23 -05:00
Eric Eastwood d730a46451 Update Twisted to lastest 2022-09-13 08:56:12 -05:00
Eric Eastwood 15e242eeaa OTEL install with DMR 2022-09-13 08:54:18 -05:00
Eric Eastwood ad3e324118 Install otel deps from develop 2022-09-12 15:46:38 -05:00
Eric Eastwood 6c40dfafcf Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry 2022-09-12 15:34:02 -05:00
Eric Eastwood 32b9d16d82 poetry update 2022-09-09 17:38:44 -05:00
Eric Eastwood a15592d073 Poetry install again 2022-09-09 17:16:19 -05:00
Eric Eastwood f73bc59d25 Try to resolve poetry deps 2022-09-09 17:02:40 -05:00
Eric Eastwood 50f0342594 Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	poetry.lock
	synapse/api/auth.py
	synapse/federation/federation_client.py
	synapse/logging/opentracing.py
	synapse/rest/client/keys.py
	synapse/rest/client/sendtodevice.py
	synapse/storage/schema/__init__.py
2022-09-09 16:28:05 -05:00
Eric Eastwood 4168ba53ee Remove debug logs 2022-08-18 22:11:47 -05:00
Eric Eastwood db04b16060 Some cleanup 2022-08-18 16:55:11 -05:00
Eric Eastwood 0f2bfa4224 Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madlittlemods/13356-messages-investigation-scratch-v1
Conflicts:
	synapse/federation/federation_client.py
	synapse/handlers/federation.py
	synapse/handlers/federation_event.py
	synapse/logging/tracing.py
	synapse/storage/controllers/persist_events.py
	synapse/storage/controllers/state.py
	synapse/storage/databases/main/events_worker.py
	synapse/util/ratelimitutils.py
2022-08-18 16:51:28 -05:00
Eric Eastwood 8def7e4b4b Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	poetry.lock
	synapse/federation/federation_client.py
	synapse/federation/federation_server.py
	synapse/handlers/federation.py
	synapse/handlers/federation_event.py
	synapse/logging/opentracing.py
	synapse/rest/client/room.py
	synapse/storage/controllers/persist_events.py
	synapse/storage/controllers/state.py
2022-08-18 16:33:22 -05:00
Eric Eastwood 53b8453a99 Refactor from feedback
From feedback in https://github.com/matrix-org/synapse/pull/13499
2022-08-15 12:25:28 -05:00
Eric Eastwood 898ba0effe More tracing 2022-08-11 10:27:55 -05:00
Eric Eastwood f4ec9d1f74 Instrument FederationStateIdsServlet 2022-08-10 18:13:16 -05:00
Eric Eastwood 597c3f276e Trace some results 2022-08-09 16:39:29 -05:00
Eric Eastwood 2a467fd26b Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madlittlemods/13356-messages-investigation-scratch-v1
Conflicts:
	pyproject.toml
	synapse/logging/tracing.py
2022-08-09 14:50:10 -05:00
Eric Eastwood 7024d7b86e Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	synapse/logging/opentracing.py
	tests/logging/test_opentracing.py
2022-08-09 14:46:03 -05:00
Eric Eastwood aa5e92506b Only set attribute if going forward 2022-08-08 20:37:05 -05:00
Eric Eastwood ee465f993b Fix some lints (mistakes) and better trace when fetching events 2022-08-06 02:24:48 -05:00
Eric Eastwood c51883e509 Add length to the list of events 2022-08-06 01:37:36 -05:00
Eric Eastwood 552b7f13b6 More tracing for federation 2022-08-06 01:06:48 -05:00
Eric Eastwood 13855c5916 More tracing for federated side 2022-08-05 20:44:21 -05:00
Eric Eastwood 756637501f Try fix Twisted/treq problems
See https://github.com/matrix-org/synapse/pull/13400#discussion_r937213304
2022-08-04 01:41:56 -05:00
Eric Eastwood a7eabb78a2 Trace more 2022-08-04 01:24:58 -05:00
Eric Eastwood fdce1c2ec3 Allow @trace and @tag_args to be used together 2022-08-03 20:50:44 -05:00
Eric Eastwood 2f752877fc Fix @tag_args being one-off (ahead) 2022-08-03 18:30:23 -05:00
Eric Eastwood 9f691824d9 Move Twisted git install where it was before 2022-08-03 17:18:15 -05:00
Eric Eastwood c3f3e59cca Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madlittlemods/13356-messages-investigation-scratch-v1 2022-08-03 17:05:04 -05:00
Eric Eastwood d7166a0d31 Update docs/tracing.md 2022-08-03 16:59:58 -05:00
Eric Eastwood ccd475299b Fix tracing imports after merging in develop 2022-08-03 16:18:12 -05:00
Eric Eastwood f5da762960 Revert "Update treq to match minimum Twisted Python versions"
This reverts commit 270db429cd.
2022-08-03 16:10:10 -05:00
Eric Eastwood 270db429cd Update treq to match minimum Twisted Python versions
Hopefully fix https://github.com/matrix-org/synapse/runs/7645395562?check_suite_focus=true#step:7:6727

```
builtins.ImportError: cannot import name '_PY3' from 'twisted.python.compat' (unknown location)
```
2022-08-03 15:36:04 -05:00
Eric Eastwood 699dad008c Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	docs/usage/configuration/config_documentation.md
	synapse/api/auth.py
2022-08-03 15:15:34 -05:00
Eric Eastwood b6f56656ab Use latested Twisted from source to fix contextvar issues causing OTEL Failed to detach context errors
See https://github.com/matrix-org/synapse/pull/13400#discussion_r936195492
2022-08-03 00:10:14 -05:00
Eric Eastwood 9cd63206fb Fix imports after OTEL changes 2022-08-02 20:23:14 -05:00
Eric Eastwood 2504bc627a Merge branch 'madlittlemods/instrument-messages-tracing' into madlittlemods/13356-messages-investigation-scratch-v1
Conflicts:
	synapse/api/auth.py
2022-08-02 20:11:56 -05:00
Eric Eastwood b6a18d2822 Trace in Complement 2022-08-02 20:11:07 -05:00
Eric Eastwood 16d17f7553 Fix table missing column 2022-08-02 20:09:22 -05:00
Eric Eastwood 2491665ed9 Fix remnant 2022-08-02 15:19:28 -05:00
Eric Eastwood 5999132287 Fix lints 2022-08-02 15:12:33 -05:00
Eric Eastwood c26fa2d0d1 Move to 72 schema version
Do as they tell us:
https://github.com/matrix-org/synapse/runs/7639487550?check_suite_focus=true
2022-08-02 15:10:01 -05:00
Eric Eastwood 72c718df54 Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	docs/usage/configuration/config_documentation.md
	synapse/logging/opentracing.py

Got changes from:

 - https://github.com/matrix-org/synapse/pull/13362/files
2022-08-02 15:05:27 -05:00
Eric Eastwood ba4a46a7fb Seems to (see test_side_by_side_spans) 2022-08-02 14:36:05 -05:00
Eric Eastwood d72cacfb3e Add changelog 2022-08-02 14:32:45 -05:00
Eric Eastwood fcc42208ae Update docs 2022-08-02 14:32:38 -05:00
Eric Eastwood 9d6fcf3ab9 Clean up some opentracing text references 2022-08-02 14:07:04 -05:00
Eric Eastwood 59facea792 Restore logging current_context (not sure why removed 2022-08-02 13:52:34 -05:00
Eric Eastwood ad71bc39d6 End on exit is already the default expected behavior 2022-08-02 13:48:15 -05:00
Eric Eastwood da396a2538 Add test for what happens when side by side spans in with statement 2022-08-02 13:43:06 -05:00
Eric Eastwood b09651a00a Always return config path for config error 2022-08-02 13:31:23 -05:00
Eric Eastwood fb0e8203ca More clear method names 2022-08-02 13:27:45 -05:00
Eric Eastwood 36d6648fad Remove type ignore comments
See https://github.com/matrix-org/synapse/pull/13400#discussion_r935887649
2022-08-02 13:22:58 -05:00
Eric Eastwood 0f93ec8d59 Fix lints 2022-08-02 12:49:57 -05:00
Eric Eastwood dbd9005cd1 Revert crazy custom sampler and span process to try force tracing for users 2022-08-02 11:56:51 -05:00
Eric Eastwood 6bb7cb7166 Revert "Non-working try baggage to inherit force tracing/sampling"
This reverts commit d15fa457c9.
2022-08-02 11:43:28 -05:00
Eric Eastwood d15fa457c9 Non-working try baggage to inherit force tracing/sampling 2022-08-02 11:43:17 -05:00
Eric Eastwood b3cdbad985 PoC force tracing
Doesn't force tracing for the child spans yet
2022-08-02 02:36:56 -05:00
Eric Eastwood 6255a1a622 Fix tests and some lints 2022-08-01 19:07:11 -05:00
Eric Eastwood 00be06cfd9 Try to align read from edu content 2022-08-01 17:54:14 -05:00
Eric Eastwood 8e902b858d Remove what's left of scopemanager 2022-08-01 17:38:07 -05:00
Eric Eastwood a9fb504dcd Implement start_active_span_from_edu for OTEL
AFAICT, this never worked before because everything was serialized into `content["org.matrix.opentracing_context"]`
but `start_active_span_from_edu` read from `content["opentracing"]`.
See https://github.com/matrix-org/synapse/pull/5852#discussion_r934960586

Do we even still want this?
2022-08-01 17:23:19 -05:00
Eric Eastwood 33fd24e48c todos 2022-08-01 16:21:40 -05:00
Eric Eastwood 322da5137f Fix some lints 2022-08-01 14:42:13 -05:00
Eric Eastwood 7772f50e60 Use HTTP_HOST attribute 2022-07-30 02:07:46 -05:00
Eric Eastwood 070195afee Use correct type for what start_as_current_span returns
See:

 - https://github.com/open-telemetry/opentelemetry-python/pull/198#discussion_r333399436
 - https://github.com/open-telemetry/opentelemetry-python/issues/219
2022-07-29 22:49:34 -05:00
Eric Eastwood d84815663e Passing tests and context manager doesn't seem to be needed 2022-07-29 22:44:21 -05:00
Eric Eastwood 041acdf985 Working second test although it's a bit pointless testing whether opentelemetry works 2022-07-29 22:18:59 -05:00
Eric Eastwood d29a4af916 Move to start_active_span 2022-07-29 22:08:11 -05:00
Eric Eastwood 7c135b93bd Easier to follow local vs remote span tracing
The `incoming-federation-request` vs `process-federation_request` was first introduced in
https://github.com/matrix-org/synapse/pull/11870

 - Span for remote trace: `incoming-federation-request`
    - `child_of` reference: `origin_span_context`
    - `follows_from` reference: `servlet_span`
 - Span for local trace: `process-federation-request`
    - `child_of` reference: `servlet_span` (by the nature of it being active)
    - `follows_from` reference: `incoming-federation-request`
2022-07-29 21:49:47 -05:00
Eric Eastwood 786dd9b4b1 Explain weird function 2022-07-29 17:06:43 -05:00
Eric Eastwood 19d20b50e8 Record exception 2022-07-29 16:54:26 -05:00
Eric Eastwood 2011ac2100 Fix using wrong type of context (Context vs SpanContext)
Fix error:
```
AttributeError: 'SpanContext' object has no attribute 'get'
```

`Context`:
```
{'current-span-1a226c96-a5db-4412-bcaa-1fdd34213c5c': _Span(name="sendToDevice", context=SpanContext(trace_id=0x5d2dcc3fdc8205046d60a5cd18672ac6, span_id=0x715c736ff5f4d208, trace_flags=0x01, trace_state=[], is_remote=False))}
```

`SpanContext`:
```
SpanContext(trace_id=0xf7cd9d058b7b76f364bdd649c4ba7b8a, span_id=0x287ce71bac31bfc4, trace_flags=0x01, trace_state=[], is_remote=False)
```
2022-07-29 00:50:37 -05:00
Eric Eastwood 1d208fa17e Fix invalid attribute type
```
Invalid type StreamToken for attribute value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types
```

Had to add a few more logs to find this instance since the warning doens't give much info where I am setting this invalid attribute.
This was good enough to find it in the code.
```
BoundedAttributes __setitem__ key=since_token value=StreamToken(room_key=RoomStreamToken(topological=None, stream=1787, instance_map=frozendict.frozendict({})), presence_key=481272, typing_key=0, receipt_key=340, account_data_key=1233, push_rules_key=8, to_device_key=57, device_list_key=199, groups_key=0)

BoundedAttributes __setitem__ key=now_token value=StreamToken(room_key=RoomStreamToken(topological=None, stream=1787, instance_map=frozendict.frozendict({})), presence_key=481287, typing_key=0, receipt_key=340, account_data_key=1233, push_rules_key=8, to_device_key=57, device_list_key=199, groups_key=0)

BoundedAttributes __setitem__ key=token value=StreamToken(room_key=RoomStreamToken(topological=None, stream=1787, instance_map=frozendict.frozendict({})), presence_key=481291, typing_key=0, receipt_key=340, account_data_key=1237, push_rules_key=8, to_device_key=57, device_list_key=199, groups_key=0)
```
2022-07-29 00:25:03 -05:00
Eric Eastwood 1b0840e3aa Fix some lints 2022-07-28 19:43:43 -05:00
Eric Eastwood 3a259960fb Fixup some todos 2022-07-28 00:18:08 -05:00
Eric Eastwood f6c3b22a21 Fix some lints 2022-07-27 16:49:00 -05:00
Eric Eastwood 9e1de8696c We use the config for the Jaeger exporter now 2022-07-27 12:52:51 -05:00
Eric Eastwood 0d7a2b93cf Revert changes to Sentry scopes (not OTEL)
See https://github.com/matrix-org/synapse/pull/13400#discussion_r931325627
2022-07-27 12:52:10 -05:00
Eric Eastwood 242817213e Export to Jaeger (things are showing up) 2022-07-27 02:36:10 -05:00
Eric Eastwood 6406fd5d84 Server running 2022-07-27 01:12:48 -05:00
Eric Eastwood 6984cefa79 Progress towards OTEL 2022-07-27 00:55:43 -05:00
Eric Eastwood 2fe6911957 Some shim and some new 2022-07-26 21:53:11 -05:00
Eric Eastwood 0cc610ecbe Migrate to OpenTelemetry tracing
See https://github.com/matrix-org/synapse/issues/11850
2022-07-26 18:44:21 -05:00
Eric Eastwood 522c29bfc7 Instrument /messages for understandable traces 2022-07-22 22:29:18 -05:00
86 changed files with 3260 additions and 2284 deletions
+1
View File
@@ -0,0 +1 @@
Migrate from OpenTracing to OpenTelemetry (config changes necessary).
+1
View File
@@ -0,0 +1 @@
Fix `have_seen_event` cache not being invalidated after we persist an event which causes inefficiency effects like extra `/state` federation calls.
@@ -31,7 +31,7 @@ federation_ip_range_blacklist: []
# Disable server rate-limiting
rc_federation:
window_size: 1000
sleep_limit: 10
sleep_limit: 99999
sleep_delay: 500
reject_limit: 99999
concurrent: 3
+20
View File
@@ -184,3 +184,23 @@ trusted_key_servers:
password_config:
enabled: true
# foo
tracing:
enabled: true
sample_rate: 1
homeserver_whitelist:
- ".*"
jaeger_exporter_config:
agent_host_name: host.docker.internal
agent_port: 6831
# Split UDP packets (UDP_PACKET_MAX_LENGTH is set to 65k in OpenTelemetry)
udp_split_oversized_batches: true
# If you define a collector, it will communicate directly to the collector,
# bypassing the agent
#
# It does not seem like the agent can keep up with the massive UDP load
# (1065 spans in one trace) so lets just use the HTTP collector endpoint
# instead which seems to work.
collector_endpoint: "http://host.docker.internal:14268/api/traces?format=jaeger.thrift"
+1 -1
View File
@@ -86,7 +86,7 @@
- [Git Usage](development/git.md)
- [Testing]()
- [Demo scripts](development/demo.md)
- [OpenTracing](opentracing.md)
- [Tracing](tracing.md)
- [Database Schemas](development/database_schema.md)
- [Experimental features](development/experimental_features.md)
- [Dependency management](development/dependencies.md)
+1 -92
View File
@@ -1,94 +1,3 @@
# OpenTracing
## Background
OpenTracing is a semi-standard being adopted by a number of distributed
tracing platforms. It is a common api for facilitating vendor-agnostic
tracing instrumentation. That is, we can use the OpenTracing api and
select one of a number of tracer implementations to do the heavy lifting
in the background. Our current selected implementation is Jaeger.
OpenTracing is a tool which gives an insight into the causal
relationship of work done in and between servers. The servers each track
events and report them to a centralised server - in Synapse's case:
Jaeger. The basic unit used to represent events is the span. The span
roughly represents a single piece of work that was done and the time at
which it occurred. A span can have child spans, meaning that the work of
the child had to be completed for the parent span to complete, or it can
have follow-on spans which represent work that is undertaken as a result
of the parent but is not depended on by the parent to in order to
finish.
Since this is undertaken in a distributed environment a request to
another server, such as an RPC or a simple GET, can be considered a span
(a unit or work) for the local server. This causal link is what
OpenTracing aims to capture and visualise. In order to do this metadata
about the local server's span, i.e the 'span context', needs to be
included with the request to the remote.
It is up to the remote server to decide what it does with the spans it
creates. This is called the sampling policy and it can be configured
through Jaeger's settings.
For OpenTracing concepts see
<https://opentracing.io/docs/overview/what-is-tracing/>.
For more information about Jaeger's implementation see
<https://www.jaegertracing.io/docs/>
## Setting up OpenTracing
To receive OpenTracing spans, start up a Jaeger server. This can be done
using docker like so:
```sh
docker run -d --name jaeger \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
jaegertracing/all-in-one:1
```
Latest documentation is probably at
https://www.jaegertracing.io/docs/latest/getting-started.
## Enable OpenTracing in Synapse
OpenTracing is not enabled by default. It must be enabled in the
homeserver config by adding the `opentracing` option to your config file. You can find
documentation about how to do this in the [config manual under the header 'Opentracing'](usage/configuration/config_documentation.md#opentracing).
See below for an example Opentracing configuration:
```yaml
opentracing:
enabled: true
homeserver_whitelist:
- "mytrustedhomeserver.org"
- "*.myotherhomeservers.com"
```
## Homeserver whitelisting
The homeserver whitelist is configured using regular expressions. A list
of regular expressions can be given and their union will be compared
when propagating any spans contexts to another homeserver.
Though it's mostly safe to send and receive span contexts to and from
untrusted users since span contexts are usually opaque ids it can lead
to two problems, namely:
- If the span context is marked as sampled by the sending homeserver
the receiver will sample it. Therefore two homeservers with wildly
different sampling policies could incur higher sampling counts than
intended.
- Sending servers can attach arbitrary data to spans, known as
'baggage'. For safety this has been disabled in Synapse but that
doesn't prevent another server sending you baggage which will be
logged to OpenTracing's logs.
## Configuring Jaeger
Sampling strategies can be set as in this document:
<https://www.jaegertracing.io/docs/latest/sampling/>.
Synapse now uses OpenTelemetry and the [documentation for tracing has moved](./tracing.md).
+90
View File
@@ -0,0 +1,90 @@
# Tracing
## Background
OpenTelemetry is a semi-standard being adopted by a number of distributed
tracing platforms. It is a common API for facilitating vendor-agnostic
tracing instrumentation.
Tracing is a tool which gives an insight into the causal
relationship of work done in and between servers. The servers each track
events and report them to a centralised server - in Synapse's case:
Jaeger. The basic unit used to represent events is the span. The span
roughly represents a single piece of work that was done and the time at
which it occurred. A span can have child spans, meaning that the work of
the child had to be completed for the parent span to complete, or it can
have follow-on spans which represent work that is undertaken as a result
of the parent but is not depended on by the parent to in order to
finish.
Since this is undertaken in a distributed environment a request to
another server, such as an RPC or a simple GET, can be considered a span
(a unit or work) for the local server. This causal link is what
tracing aims to capture and visualise. In order to do this metadata
about the local server's span, i.e the 'span context', needs to be
included with the request to the remote.
It is up to the remote server to decide what it does with the spans it
creates. This is called the sampling policy and it can be configured
through Jaeger's settings.
For OpenTelemetry concepts, see
<https://opentelemetry.io/docs/concepts/>.
For more information about the Python implementation of OpenTelemetry we're using, see
<https://opentelemetry.io/docs/instrumentation/python/>
For more information about Jaeger, see
<https://www.jaegertracing.io/docs/>
## Setting up tracing
To receive tracing spans, start up a Jaeger server. This can be done
using docker like so:
```sh
docker run -d --name jaeger \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
jaegertracing/all-in-one:1
```
Latest documentation is probably at
https://www.jaegertracing.io/docs/latest/getting-started.
## Enable tracing in Synapse
Tracing is not enabled by default. It must be enabled in the
homeserver config by adding the `tracing` option to your config file. You can find
documentation about how to do this in the [config manual under the header 'Tracing'](usage/configuration/config_documentation.md#tracing).
See below for an example tracing configuration:
```yaml
tracing:
enabled: true
homeserver_whitelist:
- "mytrustedhomeserver.org"
- "*.myotherhomeservers.com"
```
## Homeserver whitelisting
The homeserver whitelist is configured using regular expressions. A list
of regular expressions can be given and their union will be compared
when propagating any spans contexts to another homeserver.
Though it's mostly safe to send and receive span contexts to and from
untrusted users since span contexts are usually opaque ids it can lead
to two problems, namely:
- If the span context is marked as sampled by the sending homeserver
the receiver will sample it. Therefore two homeservers with wildly
different sampling policies could incur higher sampling counts than
intended.
- Sending servers can attach arbitrary data to spans, known as
'baggage'. For safety this has been disabled in Synapse but that
doesn't prevent another server sending you baggage which will be
logged in the trace.
@@ -72,6 +72,7 @@ apply if you want your config file to be read properly. A few helpful things to
In addition, each setting has an example of its usage, with the proper indentation
shown.
## Modules
Server admins can expand Synapse's functionality with external modules.
@@ -3597,47 +3598,50 @@ default_power_level_content_override:
```
---
## Opentracing ##
Configuration options related to Opentracing support.
## Tracing ##
Configuration options related to tracing support.
---
### `opentracing`
### `tracing`
These settings enable and configure opentracing, which implements distributed tracing.
This allows you to observe the causal chains of events across servers
including requests, key lookups etc., across any server running
synapse or any other services which support opentracing
(specifically those implemented with Jaeger).
These settings enable and configure tracing. This allows you to observe the
causal chains of events across servers including requests, key lookups etc.,
across any server running synapse or any other services which support
OpenTelemetry.
Sub-options include:
* `enabled`: whether tracing is enabled. Set to true to enable. Disabled by default.
* `homeserver_whitelist`: The list of homeservers we wish to send and receive span contexts and span baggage.
See [here](../../opentracing.md) for more.
See [here](../../tracing.md#homeserver-whitelisting) for more.
This is a list of regexes which are matched against the `server_name` of the homeserver.
By default, it is empty, so no servers are matched.
* `force_tracing_for_users`: # A list of the matrix IDs of users whose requests will always be traced,
* `sample_rate`: The probability that a given span and subsequent child spans in the trace will be
recorded. This controls the amount of spans that record and are exported from Synapse.
* `force_tracing_for_users`: A list of the matrix IDs of users whose requests will always be traced,
even if the tracing system would otherwise drop the traces due to probabilistic sampling.
By default, the list is empty.
* `jaeger_config`: Jaeger can be configured to sample traces at different rates.
All configuration options provided by Jaeger can be set here. Jaeger's configuration is
mostly related to trace sampling which is documented [here](https://www.jaegertracing.io/docs/latest/sampling/).
* `jaeger_exporter_config`: Configure authentication and where you Jaeger instance is located.
Full options available in the [`JaegerExporter` API docs](https://opentelemetry-python.readthedocs.io/en/latest/exporter/jaeger/jaeger.html#opentelemetry.exporter.jaeger.thrift.JaegerExporter).
Example configuration:
```yaml
opentracing:
tracing:
enabled: true
homeserver_whitelist:
- ".*"
sample_rate: 1
force_tracing_for_users:
- "@user1:server_name"
- "@user2:server_name"
jaeger_config:
sampler:
type: const
param: 1
logging:
false
jaeger_exporter_config:
agent_host_name: localhost
agent_port: 6831
# Split UDP packets so they fit within the limit (UDP_PACKET_MAX_LENGTH is set to 65k in OpenTelemetry)
udp_split_oversized_batches: true
# If you define a collector, it will communicate directly to the collector, bypassing the agent
#collector_endpoint: "http://localhost:14268/api/traces?format=jaeger.thrift"
```
---
## Workers ##
-3
View File
@@ -165,9 +165,6 @@ ignore_missing_imports = True
[mypy-pympler.*]
ignore_missing_imports = True
[mypy-rust_python_jaeger_reporter.*]
ignore_missing_imports = True
[mypy-saml2.*]
ignore_missing_imports = True
Generated
+226 -126
View File
@@ -219,7 +219,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
name = "deprecated"
version = "1.2.13"
description = "Python @deprecated decorator to deprecate old python classes, functions or methods."
category = "dev"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
@@ -320,6 +320,34 @@ python-versions = ">=3.7"
gitdb = ">=4.0.1,<5"
typing-extensions = {version = ">=3.7.4.3", markers = "python_version < \"3.8\""}
[[package]]
name = "googleapis-common-protos"
version = "1.56.2"
description = "Common protobufs used in Google APIs"
category = "main"
optional = true
python-versions = ">=3.6"
[package.dependencies]
protobuf = ">=3.15.0,<4.0.0dev"
[package.extras]
grpc = ["grpcio (>=1.0.0,<2.0.0dev)"]
[[package]]
name = "grpcio"
version = "1.48.1"
description = "HTTP/2-based RPC framework"
category = "main"
optional = true
python-versions = ">=3.6"
[package.dependencies]
six = ">=1.5.2"
[package.extras]
protobuf = ["grpcio-tools (>=1.48.1)"]
[[package]]
name = "hiredis"
version = "2.0.0"
@@ -410,23 +438,6 @@ colors = ["colorama (>=0.4.3,<0.5.0)"]
pipfile_deprecated_finder = ["pipreqs", "requirementslib"]
requirements_deprecated_finder = ["pip-api", "pipreqs"]
[[package]]
name = "jaeger-client"
version = "4.8.0"
description = "Jaeger Python OpenTracing Tracer implementation"
category = "main"
optional = true
python-versions = ">=3.7"
[package.dependencies]
opentracing = ">=2.1,<3.0"
threadloop = ">=1,<2"
thrift = "*"
tornado = ">=4.3"
[package.extras]
tests = ["codecov", "coverage", "flake8", "flake8-quotes", "flake8-typing-imports", "mock", "mypy", "opentracing_instrumentation (>=3,<4)", "prometheus_client (==0.11.0)", "pycurl", "pytest", "pytest-benchmark[histogram]", "pytest-cov", "pytest-localserver", "pytest-timeout", "pytest-tornado", "tchannel (==2.1.0)"]
[[package]]
name = "jeepney"
version = "0.7.1"
@@ -623,15 +634,77 @@ optional = false
python-versions = "*"
[[package]]
name = "opentracing"
version = "2.4.0"
description = "OpenTracing API for Python. See documentation at http://opentracing.io"
name = "opentelemetry-api"
version = "1.12.0"
description = "OpenTelemetry Python API"
category = "main"
optional = true
python-versions = "*"
python-versions = ">=3.6"
[package.extras]
tests = ["Sphinx", "doubles", "flake8", "flake8-quotes", "gevent", "mock", "pytest", "pytest-cov", "pytest-mock", "six (>=1.10.0,<2.0)", "sphinx_rtd_theme", "tornado"]
[package.dependencies]
Deprecated = ">=1.2.6"
setuptools = ">=16.0"
[[package]]
name = "opentelemetry-exporter-jaeger"
version = "1.12.0"
description = "Jaeger Exporters for OpenTelemetry"
category = "main"
optional = true
python-versions = ">=3.6"
[package.dependencies]
opentelemetry-exporter-jaeger-proto-grpc = "1.12.0"
opentelemetry-exporter-jaeger-thrift = "1.12.0"
[[package]]
name = "opentelemetry-exporter-jaeger-proto-grpc"
version = "1.12.0"
description = "Jaeger Protobuf Exporter for OpenTelemetry"
category = "main"
optional = true
python-versions = ">=3.6"
[package.dependencies]
googleapis-common-protos = ">=1.52,<1.56.3"
grpcio = ">=1.0.0,<2.0.0"
opentelemetry-api = ">=1.3,<2.0"
opentelemetry-sdk = ">=1.11,<2.0"
[[package]]
name = "opentelemetry-exporter-jaeger-thrift"
version = "1.12.0"
description = "Jaeger Thrift Exporter for OpenTelemetry"
category = "main"
optional = true
python-versions = ">=3.6"
[package.dependencies]
opentelemetry-api = ">=1.3,<2.0"
opentelemetry-sdk = ">=1.11,<2.0"
thrift = ">=0.10.0"
[[package]]
name = "opentelemetry-sdk"
version = "1.12.0"
description = "OpenTelemetry Python SDK"
category = "main"
optional = true
python-versions = ">=3.6"
[package.dependencies]
opentelemetry-api = "1.12.0"
opentelemetry-semantic-conventions = "0.33b0"
setuptools = ">=16.0"
typing-extensions = ">=3.7.4"
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.33b0"
description = "OpenTelemetry Semantic Conventions"
category = "main"
optional = true
python-versions = ">=3.6"
[[package]]
name = "packaging"
@@ -713,6 +786,14 @@ python-versions = ">=3.6"
[package.extras]
twisted = ["twisted"]
[[package]]
name = "protobuf"
version = "3.20.1"
description = "Protocol Buffers"
category = "main"
optional = true
python-versions = ">=3.7"
[[package]]
name = "psycopg2"
version = "2.9.3"
@@ -1183,17 +1264,6 @@ category = "main"
optional = true
python-versions = "*"
[[package]]
name = "threadloop"
version = "1.0.2"
description = "Tornado IOLoop Backed Concurrent Futures"
category = "main"
optional = true
python-versions = "*"
[package.dependencies]
tornado = "*"
[[package]]
name = "thrift"
version = "0.15.0"
@@ -1218,14 +1288,6 @@ category = "dev"
optional = false
python-versions = ">=3.6"
[[package]]
name = "tornado"
version = "6.1"
description = "Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed."
category = "main"
optional = true
python-versions = ">= 3.5"
[[package]]
name = "towncrier"
version = "21.9.0"
@@ -1301,12 +1363,12 @@ tqdm = ">=4.14"
urllib3 = ">=1.26.0"
[[package]]
name = "twisted"
version = "22.4.0"
name = "Twisted"
version = "22.8.0"
description = "An asynchronous networking framework written in Python"
category = "main"
optional = false
python-versions = ">=3.6.7"
python-versions = ">=3.7.1"
[package.dependencies]
attrs = ">=19.2.0"
@@ -1315,27 +1377,28 @@ constantly = ">=15.1"
hyperlink = ">=17.1.1"
idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""}
incremental = ">=21.3.0"
pyopenssl = {version = ">=16.0.0", optional = true, markers = "extra == \"tls\""}
pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
typing-extensions = ">=3.6.5"
"zope.interface" = ">=4.4.2"
[package.extras]
all_non_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=16.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
all_non_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"]
conch_nacl = ["PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"]
contextvars = ["contextvars (>=2.4,<3)"]
dev = ["coverage (>=6b1,<7)", "pydoctor (>=21.9.0,<21.10.0)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "sphinx-rtd-theme (>=0.5,<1.0)", "towncrier (>=19.2,<20.0)", "twistedchecker (>=0.7,<1.0)"]
dev_release = ["pydoctor (>=21.9.0,<21.10.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "sphinx-rtd-theme (>=0.5,<1.0)", "towncrier (>=19.2,<20.0)"]
dev = ["coverage (>=6b1,<7)", "pydoctor (>=22.7.0,<22.8.0)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "sphinx-rtd-theme (>=0.5,<1.0)", "towncrier (>=19.2,<20.0)", "twistedchecker (>=0.7,<1.0)"]
dev_release = ["pydoctor (>=22.7.0,<22.8.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=4.1.2,<6)", "sphinx-rtd-theme (>=0.5,<1.0)", "towncrier (>=19.2,<20.0)"]
gtk_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pygobject", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"]
macos_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=16.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
mypy = ["PyHamcrest (>=1.9.0)", "PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "coverage (>=6b1,<7)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "mypy (==0.930)", "mypy-zope (==0.3.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pydoctor (>=21.9.0,<21.10.0)", "pyflakes (>=2.2,<3.0)", "pyopenssl (>=16.0.0)", "pyserial (>=3.0)", "python-subunit (>=1.4,<2.0)", "pywin32 (!=226)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "service-identity (>=18.1.0)", "sphinx (>=4.1.2,<6)", "sphinx-rtd-theme (>=0.5,<1.0)", "towncrier (>=19.2,<20.0)", "twistedchecker (>=0.7,<1.0)", "types-pyOpenSSL", "types-setuptools"]
osx_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=16.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
macos_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
mypy = ["PyHamcrest (>=1.9.0)", "PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "coverage (>=6b1,<7)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "mypy (==0.930)", "mypy-zope (==0.3.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pydoctor (>=22.7.0,<22.8.0)", "pyflakes (>=2.2,<3.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "python-subunit (>=1.4,<2.0)", "pywin32 (!=226)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "service-identity (>=18.1.0)", "sphinx (>=4.1.2,<6)", "sphinx-rtd-theme (>=0.5,<1.0)", "towncrier (>=19.2,<20.0)", "twistedchecker (>=0.7,<1.0)", "types-pyOpenSSL", "types-setuptools"]
osx_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
serial = ["pyserial (>=3.0)", "pywin32 (!=226)"]
test = ["PyHamcrest (>=1.9.0)", "cython-test-exception-raiser (>=1.0.2,<2)"]
tls = ["idna (>=2.4)", "pyopenssl (>=16.0.0)", "service-identity (>=18.1.0)"]
windows_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=16.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
tls = ["idna (>=2.4)", "pyopenssl (>=21.0.0)", "service-identity (>=18.1.0)"]
windows_platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
[[package]]
name = "twisted-iocpsupport"
@@ -1417,14 +1480,6 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-opentracing"
version = "2.4.7"
description = "Typing stubs for opentracing"
category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-pillow"
version = "9.0.15"
@@ -1528,7 +1583,7 @@ python-versions = "*"
name = "wrapt"
version = "1.13.3"
description = "Module for decorators, wrappers and monkey patching."
category = "dev"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
@@ -1609,12 +1664,12 @@ docs = ["Sphinx", "repoze.sphinx.autointerface"]
test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"]
[extras]
all = ["matrix-synapse-ldap3", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pysaml2", "authlib", "lxml", "sentry-sdk", "jaeger-client", "opentracing", "txredisapi", "hiredis", "Pympler"]
all = ["matrix-synapse-ldap3", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pysaml2", "authlib", "lxml", "sentry-sdk", "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-jaeger", "txredisapi", "hiredis", "Pympler"]
cache_memory = ["Pympler"]
jwt = ["authlib"]
matrix-synapse-ldap3 = ["matrix-synapse-ldap3"]
oidc = ["authlib"]
opentracing = ["jaeger-client", "opentracing"]
opentelemetry = ["opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-jaeger"]
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
redis = ["txredisapi", "hiredis"]
saml2 = ["pysaml2"]
@@ -1626,7 +1681,7 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
content-hash = "1b14fc274d9e2a495a7f864150f3ffcf4d9f585e09a67e53301ae4ef3c2f3e48"
content-hash = "8697b449d4c7eb1eee9e10d5ff030d2a973576c5a9c8ad64fed9337489d5b37a"
[metadata.files]
attrs = [
@@ -1842,6 +1897,58 @@ gitpython = [
{file = "GitPython-3.1.27-py3-none-any.whl", hash = "sha256:5b68b000463593e05ff2b261acff0ff0972df8ab1b70d3cdbd41b546c8b8fc3d"},
{file = "GitPython-3.1.27.tar.gz", hash = "sha256:1c885ce809e8ba2d88a29befeb385fcea06338d3640712b59ca623c220bb5704"},
]
googleapis-common-protos = [
{file = "googleapis-common-protos-1.56.2.tar.gz", hash = "sha256:b09b56f5463070c2153753ef123f07d2e49235e89148e9b2459ec8ed2f68d7d3"},
{file = "googleapis_common_protos-1.56.2-py2.py3-none-any.whl", hash = "sha256:023eaea9d8c1cceccd9587c6af6c20f33eeeb05d4148670f2b0322dc1511700c"},
]
grpcio = [
{file = "grpcio-1.48.1-cp310-cp310-linux_armv7l.whl", hash = "sha256:19f9c021ae858d3ef6d5ec4c0acf3f0b0a61e599e5aa36c36943c209520a0e66"},
{file = "grpcio-1.48.1-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:b0fa666fecdb1b118d37823937e9237afa17fe734fc4dbe6dd642e1e4cca0246"},
{file = "grpcio-1.48.1-cp310-cp310-manylinux_2_17_aarch64.whl", hash = "sha256:a661d4b9b314327dec1e92ed57e591e8e5eb055700e0ba9e9687f734d922dcb6"},
{file = "grpcio-1.48.1-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:598c8c42420443c55431eba1821c7a2f72707f1ff674a4de9e0bb03282923cfb"},
{file = "grpcio-1.48.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c924d4e0493fd536ba3b82584b370e8b3c809ef341f9f828cff2dc3c761b3ab"},
{file = "grpcio-1.48.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a5edbcb8289681fcb5ded7542f2b7dd456489e83007a95e32fcaf55e9f18603e"},
{file = "grpcio-1.48.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:9d116106cf220c79e91595523c893f1cf09ec0c2ea49de4fb82152528b7e6833"},
{file = "grpcio-1.48.1-cp310-cp310-win32.whl", hash = "sha256:5d81cd3c161291339ed3b469250c2f5013c3083dea7796e93aedff8f05fdcec1"},
{file = "grpcio-1.48.1-cp310-cp310-win_amd64.whl", hash = "sha256:d751f8beb383c4a5a95625d7ccc1ab183b98b02c6a88924814ea7fbff530872d"},
{file = "grpcio-1.48.1-cp36-cp36m-linux_armv7l.whl", hash = "sha256:1471e6f25a8e47d9f88499f48c565fc5b2876e8ee91bfb0ff33eaadd188b7ea6"},
{file = "grpcio-1.48.1-cp36-cp36m-macosx_10_10_x86_64.whl", hash = "sha256:9fba1d0ba7cf56811728f1951c800a9aca6677e86433c5e353f2cc2c4039fda6"},
{file = "grpcio-1.48.1-cp36-cp36m-manylinux_2_17_aarch64.whl", hash = "sha256:f3a99ed422c38bd1bc893cb2cb2cea6d64173ec30927f699e95f5f58bdf625cf"},
{file = "grpcio-1.48.1-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b005502c59835f9ba3c3f8742f64c19eeb3db41eae1a89b035a559b39b421803"},
{file = "grpcio-1.48.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f0ef1dafb4eadeaca58aec8c721a5a73d551064b0c63d57fa003e233277c642e"},
{file = "grpcio-1.48.1-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:9477967e605ba08715dcc769b5ee0f0d8b22bda40ef25a0df5a8759e5a4d21a5"},
{file = "grpcio-1.48.1-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:dbba883c2b6d63949bc98ab1950bc22cf7c8d4e8cb68de6edde49d3cccd8fd26"},
{file = "grpcio-1.48.1-cp36-cp36m-win32.whl", hash = "sha256:8bbaa6647986b874891bc682a1093df54cbdb073b5d4b844a2b480c47c7ffafd"},
{file = "grpcio-1.48.1-cp36-cp36m-win_amd64.whl", hash = "sha256:e02f6ba10a3d4e289fa7ae91b301783a750d118b60f17924ca05e506c7d29bc8"},
{file = "grpcio-1.48.1-cp37-cp37m-linux_armv7l.whl", hash = "sha256:97dc35a99c61d5f35ec6457d3df0a4695ba9bb04a35686e1c254462b15c53f98"},
{file = "grpcio-1.48.1-cp37-cp37m-macosx_10_10_x86_64.whl", hash = "sha256:ca382028cdfd2d79b7704b2acb8ae1fb54e9e1a03a6765e1895ba89a6fcfaba1"},
{file = "grpcio-1.48.1-cp37-cp37m-manylinux_2_17_aarch64.whl", hash = "sha256:3d319a0c89ffac9b8dfc75bfe727a4c835d18bbccc14203b20eb5949c6c7d87d"},
{file = "grpcio-1.48.1-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a1b81849061c67c2ffaa6ed27aa3d9b0762e71e68e784e24b0330b7b1c67470a"},
{file = "grpcio-1.48.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1ff1be0474846ed15682843b187e6062f845ddfeaceb2b28972073f474f7b735"},
{file = "grpcio-1.48.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:53b6306f9473020bc47ddf64ca704356466e63d5f88f5c2a7bf0a4692e7f03c4"},
{file = "grpcio-1.48.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:dad2501603f954f222a6e555413c454a5f8d763ab910fbab3855bcdfef6b3148"},
{file = "grpcio-1.48.1-cp37-cp37m-win32.whl", hash = "sha256:4786323555a9f2c6380cd9a9922bcfd42165a51d68d242eebfcdfdc667651c96"},
{file = "grpcio-1.48.1-cp37-cp37m-win_amd64.whl", hash = "sha256:934aad7350d9577f4275e787f3d91d3c8ff4efffa8d6b807d343d3c891ff53eb"},
{file = "grpcio-1.48.1-cp38-cp38-linux_armv7l.whl", hash = "sha256:2563357697f5f2d7fd80c1b07a57ef4736551327ad84de604e7b9f6c1b6b4e20"},
{file = "grpcio-1.48.1-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:1d065f40fe74b52b88a6c42d4373a0983f1b0090f952a0747f34f2c11d6cbc64"},
{file = "grpcio-1.48.1-cp38-cp38-manylinux_2_17_aarch64.whl", hash = "sha256:3340cb2224cc397954def015729391d85fb31135b5a7efca363e73e6f1b0e908"},
{file = "grpcio-1.48.1-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d03009a26f7edca9f0a581aa5d3153242b815b858cb4790e34a955afb303c6ba"},
{file = "grpcio-1.48.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:53fa2fc1a1713195fa7acf7443a6f59b6ac7837607690f813c66cc18a9cb8135"},
{file = "grpcio-1.48.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:5a6a750c8324f3974e95265d3f9a0541573c537af1f67b3f6f46bf9c0b2e1b36"},
{file = "grpcio-1.48.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:626822d799d8fab08f07c8d95ef5c36213d24143f7cad3f548e97413db9f4110"},
{file = "grpcio-1.48.1-cp38-cp38-win32.whl", hash = "sha256:ca5209ef89f7607be47a308fa92308cf079805ed556ecda672f00039a26e366f"},
{file = "grpcio-1.48.1-cp38-cp38-win_amd64.whl", hash = "sha256:7cee20a4f873d61274d70c28ff63d19677d9eeea869c6a9cbaf3a00712336b6c"},
{file = "grpcio-1.48.1-cp39-cp39-linux_armv7l.whl", hash = "sha256:460f5bec23fffa3c041aeba1f93a0f06b7a29e6a4da3658a52e1a866494920ab"},
{file = "grpcio-1.48.1-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:c54734a6eb3be544d332e65c846236d02e5fc71325e8c53af91e83a46b87b506"},
{file = "grpcio-1.48.1-cp39-cp39-manylinux_2_17_aarch64.whl", hash = "sha256:c6b6969c529521c86884a13745a4b68930db1ef2e051735c0f479d0a7adb25b6"},
{file = "grpcio-1.48.1-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:346bef672a1536d59437210f16af35389d715d2b321bfe4899b3d6476a196706"},
{file = "grpcio-1.48.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f29627d66ae816837fd32c9450dc9c54780962cd74d034513ed829ba3ab46652"},
{file = "grpcio-1.48.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:b01faf7934c606d5050cf055c1d03943180f23d995d68d04cf50c80d1ef2c65a"},
{file = "grpcio-1.48.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:741eeff39a26d26da2b6d74ff0559f882ee95ee4e3b20c0b4b829021cb917f96"},
{file = "grpcio-1.48.1-cp39-cp39-win32.whl", hash = "sha256:a15409bc1d05c52ecb00f5e42ab8ff280e7149f2eb854728f628fb2a0a161a5b"},
{file = "grpcio-1.48.1-cp39-cp39-win_amd64.whl", hash = "sha256:2b6c336409937fd1cd2bf78eb72651f44d292d88da5e63059a4e8bd01b9d7411"},
{file = "grpcio-1.48.1.tar.gz", hash = "sha256:660217eccd2943bf23ea9a36e2a292024305aec04bf747fbcff1f5032b83610e"},
]
hiredis = [
{file = "hiredis-2.0.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:b4c8b0bc5841e578d5fb32a16e0c305359b987b850a06964bd5a62739d688048"},
{file = "hiredis-2.0.0-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:0adea425b764a08270820531ec2218d0508f8ae15a448568109ffcae050fee26"},
@@ -1973,9 +2080,6 @@ isort = [
{file = "isort-5.7.0-py3-none-any.whl", hash = "sha256:fff4f0c04e1825522ce6949973e83110a6e907750cd92d128b0d14aaaadbffdc"},
{file = "isort-5.7.0.tar.gz", hash = "sha256:c729845434366216d320e936b8ad6f9d681aab72dc7cbc2d51bedc3582f3ad1e"},
]
jaeger-client = [
{file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
]
jeepney = [
{file = "jeepney-0.7.1-py3-none-any.whl", hash = "sha256:1b5a0ea5c0e7b166b2f5895b91a08c14de8915afda4407fb5022a195224958ac"},
{file = "jeepney-0.7.1.tar.gz", hash = "sha256:fa9e232dfa0c498bd0b8a3a73b8d8a31978304dcef0515adc859d4e096f96f4f"},
@@ -2198,8 +2302,29 @@ netaddr = [
{file = "netaddr-0.8.0-py2.py3-none-any.whl", hash = "sha256:9666d0232c32d2656e5e5f8d735f58fd6c7457ce52fc21c98d45f2af78f990ac"},
{file = "netaddr-0.8.0.tar.gz", hash = "sha256:d6cc57c7a07b1d9d2e917aa8b36ae8ce61c35ba3fcd1b83ca31c5a0ee2b5a243"},
]
opentracing = [
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
opentelemetry-api = [
{file = "opentelemetry-api-1.12.0.tar.gz", hash = "sha256:740c2cf9aa75e76c208b3ee04b3b3b3721f58bbac8e97019174f07ec12cde7af"},
{file = "opentelemetry_api-1.12.0-py3-none-any.whl", hash = "sha256:2e1cef8ce175be6464f240422babfe1dfb581daec96f0daad5d0d0e951b38f7b"},
]
opentelemetry-exporter-jaeger = [
{file = "opentelemetry-exporter-jaeger-1.12.0.tar.gz", hash = "sha256:e0e346466f2fee7d26e62e6fbad47bf5effb27ab17106e15a427cabfaa7e5879"},
{file = "opentelemetry_exporter_jaeger-1.12.0-py3-none-any.whl", hash = "sha256:e42f5d2d2d5fb4e6a8f14424bb8ee515cba2da38f97ecd4498cba4da315d09dc"},
]
opentelemetry-exporter-jaeger-proto-grpc = [
{file = "opentelemetry-exporter-jaeger-proto-grpc-1.12.0.tar.gz", hash = "sha256:dba063bbfc7c9927d05dd1806e0534c3d34a6aaac7548d5c1f66c210019f79a5"},
{file = "opentelemetry_exporter_jaeger_proto_grpc-1.12.0-py3-none-any.whl", hash = "sha256:f029e5cddb42745e6fe73412b46151e066a30e4689ccad837913c683402d3aad"},
]
opentelemetry-exporter-jaeger-thrift = [
{file = "opentelemetry-exporter-jaeger-thrift-1.12.0.tar.gz", hash = "sha256:8a87b0e63c62dee13ef9fa9a28ad1ca612e06f29e3fa9266f40d2f4969be3af3"},
{file = "opentelemetry_exporter_jaeger_thrift-1.12.0-py3-none-any.whl", hash = "sha256:c60cac61637fef57bda4917432493c80f4168654067be24e2a3eb9065d76963e"},
]
opentelemetry-sdk = [
{file = "opentelemetry-sdk-1.12.0.tar.gz", hash = "sha256:bf37830ca4f93d0910cf109749237c5cb4465e31a54dfad8400011e9822a2a14"},
{file = "opentelemetry_sdk-1.12.0-py3-none-any.whl", hash = "sha256:d13be09765441c0513a3de01b7a2f56a7da36d902f60bff7c97f338903a57c34"},
]
opentelemetry-semantic-conventions = [
{file = "opentelemetry-semantic-conventions-0.33b0.tar.gz", hash = "sha256:67d62461c87b683b958428ced79162ec4d567dabf30b050f270bbd01eff89ced"},
{file = "opentelemetry_semantic_conventions-0.33b0-py3-none-any.whl", hash = "sha256:56b67b3f8f49413cbfbbeb32e9cf7b4c7dfb27a83064d959733766376ba11bc7"},
]
packaging = [
{file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"},
@@ -2266,6 +2391,32 @@ prometheus-client = [
{file = "prometheus_client-0.14.0-py3-none-any.whl", hash = "sha256:f4aba3fdd1735852049f537c1f0ab177159b7ab76f271ecc4d2f45aa2a1d01f2"},
{file = "prometheus_client-0.14.0.tar.gz", hash = "sha256:8f7a922dd5455ad524b6ba212ce8eb2b4b05e073f4ec7218287f88b1cac34750"},
]
protobuf = [
{file = "protobuf-3.20.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3cc797c9d15d7689ed507b165cd05913acb992d78b379f6014e013f9ecb20996"},
{file = "protobuf-3.20.1-cp310-cp310-manylinux2014_aarch64.whl", hash = "sha256:ff8d8fa42675249bb456f5db06c00de6c2f4c27a065955917b28c4f15978b9c3"},
{file = "protobuf-3.20.1-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:cd68be2559e2a3b84f517fb029ee611546f7812b1fdd0aa2ecc9bc6ec0e4fdde"},
{file = "protobuf-3.20.1-cp310-cp310-win32.whl", hash = "sha256:9016d01c91e8e625141d24ec1b20fed584703e527d28512aa8c8707f105a683c"},
{file = "protobuf-3.20.1-cp310-cp310-win_amd64.whl", hash = "sha256:32ca378605b41fd180dfe4e14d3226386d8d1b002ab31c969c366549e66a2bb7"},
{file = "protobuf-3.20.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:9be73ad47579abc26c12024239d3540e6b765182a91dbc88e23658ab71767153"},
{file = "protobuf-3.20.1-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:097c5d8a9808302fb0da7e20edf0b8d4703274d140fd25c5edabddcde43e081f"},
{file = "protobuf-3.20.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:e250a42f15bf9d5b09fe1b293bdba2801cd520a9f5ea2d7fb7536d4441811d20"},
{file = "protobuf-3.20.1-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:cdee09140e1cd184ba9324ec1df410e7147242b94b5f8b0c64fc89e38a8ba531"},
{file = "protobuf-3.20.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:af0ebadc74e281a517141daad9d0f2c5d93ab78e9d455113719a45a49da9db4e"},
{file = "protobuf-3.20.1-cp37-cp37m-win32.whl", hash = "sha256:755f3aee41354ae395e104d62119cb223339a8f3276a0cd009ffabfcdd46bb0c"},
{file = "protobuf-3.20.1-cp37-cp37m-win_amd64.whl", hash = "sha256:62f1b5c4cd6c5402b4e2d63804ba49a327e0c386c99b1675c8a0fefda23b2067"},
{file = "protobuf-3.20.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:06059eb6953ff01e56a25cd02cca1a9649a75a7e65397b5b9b4e929ed71d10cf"},
{file = "protobuf-3.20.1-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:cb29edb9eab15742d791e1025dd7b6a8f6fcb53802ad2f6e3adcb102051063ab"},
{file = "protobuf-3.20.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:69ccfdf3657ba59569c64295b7d51325f91af586f8d5793b734260dfe2e94e2c"},
{file = "protobuf-3.20.1-cp38-cp38-win32.whl", hash = "sha256:dd5789b2948ca702c17027c84c2accb552fc30f4622a98ab5c51fcfe8c50d3e7"},
{file = "protobuf-3.20.1-cp38-cp38-win_amd64.whl", hash = "sha256:77053d28427a29987ca9caf7b72ccafee011257561259faba8dd308fda9a8739"},
{file = "protobuf-3.20.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6f50601512a3d23625d8a85b1638d914a0970f17920ff39cec63aaef80a93fb7"},
{file = "protobuf-3.20.1-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:284f86a6207c897542d7e956eb243a36bb8f9564c1742b253462386e96c6b78f"},
{file = "protobuf-3.20.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:7403941f6d0992d40161aa8bb23e12575637008a5a02283a930addc0508982f9"},
{file = "protobuf-3.20.1-cp39-cp39-win32.whl", hash = "sha256:db977c4ca738dd9ce508557d4fce0f5aebd105e158c725beec86feb1f6bc20d8"},
{file = "protobuf-3.20.1-cp39-cp39-win_amd64.whl", hash = "sha256:7e371f10abe57cee5021797126c93479f59fccc9693dafd6bd5633ab67808a91"},
{file = "protobuf-3.20.1-py2.py3-none-any.whl", hash = "sha256:adfc6cf69c7f8c50fd24c793964eef18f0ac321315439d94945820612849c388"},
{file = "protobuf-3.20.1.tar.gz", hash = "sha256:adc31566d027f45efe3f44eeb5b1f329da43891634d61c75a5944e9be6dd42c9"},
]
psycopg2 = [
{file = "psycopg2-2.9.3-cp310-cp310-win32.whl", hash = "sha256:083707a696e5e1c330af2508d8fab36f9700b26621ccbcb538abe22e15485362"},
{file = "psycopg2-2.9.3-cp310-cp310-win_amd64.whl", hash = "sha256:d3ca6421b942f60c008f81a3541e8faf6865a28d5a9b48544b0ee4f40cac7fca"},
@@ -2600,10 +2751,6 @@ sortedcontainers = [
systemd-python = [
{file = "systemd-python-234.tar.gz", hash = "sha256:fd0e44bf70eadae45aadc292cb0a7eb5b0b6372cd1b391228047d33895db83e7"},
]
threadloop = [
{file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"},
{file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"},
]
thrift = [
{file = "thrift-0.15.0.tar.gz", hash = "sha256:87c8205a71cf8bbb111cb99b1f7495070fbc9cabb671669568854210da5b3e29"},
]
@@ -2611,49 +2758,6 @@ tomli = [
{file = "tomli-1.2.3-py3-none-any.whl", hash = "sha256:e3069e4be3ead9668e21cb9b074cd948f7b3113fd9c8bba083f48247aab8b11c"},
{file = "tomli-1.2.3.tar.gz", hash = "sha256:05b6166bff487dc068d322585c7ea4ef78deed501cc124060e0f238e89a9231f"},
]
tornado = [
{file = "tornado-6.1-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:d371e811d6b156d82aa5f9a4e08b58debf97c302a35714f6f45e35139c332e32"},
{file = "tornado-6.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:0d321a39c36e5f2c4ff12b4ed58d41390460f798422c4504e09eb5678e09998c"},
{file = "tornado-6.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:9de9e5188a782be6b1ce866e8a51bc76a0fbaa0e16613823fc38e4fc2556ad05"},
{file = "tornado-6.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:61b32d06ae8a036a6607805e6720ef00a3c98207038444ba7fd3d169cd998910"},
{file = "tornado-6.1-cp35-cp35m-manylinux2010_x86_64.whl", hash = "sha256:3e63498f680547ed24d2c71e6497f24bca791aca2fe116dbc2bd0ac7f191691b"},
{file = "tornado-6.1-cp35-cp35m-manylinux2014_aarch64.whl", hash = "sha256:6c77c9937962577a6a76917845d06af6ab9197702a42e1346d8ae2e76b5e3675"},
{file = "tornado-6.1-cp35-cp35m-win32.whl", hash = "sha256:6286efab1ed6e74b7028327365cf7346b1d777d63ab30e21a0f4d5b275fc17d5"},
{file = "tornado-6.1-cp35-cp35m-win_amd64.whl", hash = "sha256:fa2ba70284fa42c2a5ecb35e322e68823288a4251f9ba9cc77be04ae15eada68"},
{file = "tornado-6.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:0a00ff4561e2929a2c37ce706cb8233b7907e0cdc22eab98888aca5dd3775feb"},
{file = "tornado-6.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:748290bf9112b581c525e6e6d3820621ff020ed95af6f17fedef416b27ed564c"},
{file = "tornado-6.1-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:e385b637ac3acaae8022e7e47dfa7b83d3620e432e3ecb9a3f7f58f150e50921"},
{file = "tornado-6.1-cp36-cp36m-manylinux2010_i686.whl", hash = "sha256:25ad220258349a12ae87ede08a7b04aca51237721f63b1808d39bdb4b2164558"},
{file = "tornado-6.1-cp36-cp36m-manylinux2010_x86_64.whl", hash = "sha256:65d98939f1a2e74b58839f8c4dab3b6b3c1ce84972ae712be02845e65391ac7c"},
{file = "tornado-6.1-cp36-cp36m-manylinux2014_aarch64.whl", hash = "sha256:e519d64089b0876c7b467274468709dadf11e41d65f63bba207e04217f47c085"},
{file = "tornado-6.1-cp36-cp36m-win32.whl", hash = "sha256:b87936fd2c317b6ee08a5741ea06b9d11a6074ef4cc42e031bc6403f82a32575"},
{file = "tornado-6.1-cp36-cp36m-win_amd64.whl", hash = "sha256:cc0ee35043162abbf717b7df924597ade8e5395e7b66d18270116f8745ceb795"},
{file = "tornado-6.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:7250a3fa399f08ec9cb3f7b1b987955d17e044f1ade821b32e5f435130250d7f"},
{file = "tornado-6.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:ed3ad863b1b40cd1d4bd21e7498329ccaece75db5a5bf58cd3c9f130843e7102"},
{file = "tornado-6.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:dcef026f608f678c118779cd6591c8af6e9b4155c44e0d1bc0c87c036fb8c8c4"},
{file = "tornado-6.1-cp37-cp37m-manylinux2010_i686.whl", hash = "sha256:70dec29e8ac485dbf57481baee40781c63e381bebea080991893cd297742b8fd"},
{file = "tornado-6.1-cp37-cp37m-manylinux2010_x86_64.whl", hash = "sha256:d3f7594930c423fd9f5d1a76bee85a2c36fd8b4b16921cae7e965f22575e9c01"},
{file = "tornado-6.1-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:3447475585bae2e77ecb832fc0300c3695516a47d46cefa0528181a34c5b9d3d"},
{file = "tornado-6.1-cp37-cp37m-win32.whl", hash = "sha256:e7229e60ac41a1202444497ddde70a48d33909e484f96eb0da9baf8dc68541df"},
{file = "tornado-6.1-cp37-cp37m-win_amd64.whl", hash = "sha256:cb5ec8eead331e3bb4ce8066cf06d2dfef1bfb1b2a73082dfe8a161301b76e37"},
{file = "tornado-6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:20241b3cb4f425e971cb0a8e4ffc9b0a861530ae3c52f2b0434e6c1b57e9fd95"},
{file = "tornado-6.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:c77da1263aa361938476f04c4b6c8916001b90b2c2fdd92d8d535e1af48fba5a"},
{file = "tornado-6.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:fba85b6cd9c39be262fcd23865652920832b61583de2a2ca907dbd8e8a8c81e5"},
{file = "tornado-6.1-cp38-cp38-manylinux2010_i686.whl", hash = "sha256:1e8225a1070cd8eec59a996c43229fe8f95689cb16e552d130b9793cb570a288"},
{file = "tornado-6.1-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:d14d30e7f46a0476efb0deb5b61343b1526f73ebb5ed84f23dc794bdb88f9d9f"},
{file = "tornado-6.1-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:8f959b26f2634a091bb42241c3ed8d3cedb506e7c27b8dd5c7b9f745318ddbb6"},
{file = "tornado-6.1-cp38-cp38-win32.whl", hash = "sha256:34ca2dac9e4d7afb0bed4677512e36a52f09caa6fded70b4e3e1c89dbd92c326"},
{file = "tornado-6.1-cp38-cp38-win_amd64.whl", hash = "sha256:6196a5c39286cc37c024cd78834fb9345e464525d8991c21e908cc046d1cc02c"},
{file = "tornado-6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:f0ba29bafd8e7e22920567ce0d232c26d4d47c8b5cf4ed7b562b5db39fa199c5"},
{file = "tornado-6.1-cp39-cp39-manylinux1_i686.whl", hash = "sha256:33892118b165401f291070100d6d09359ca74addda679b60390b09f8ef325ffe"},
{file = "tornado-6.1-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:7da13da6f985aab7f6f28debab00c67ff9cbacd588e8477034c0652ac141feea"},
{file = "tornado-6.1-cp39-cp39-manylinux2010_i686.whl", hash = "sha256:e0791ac58d91ac58f694d8d2957884df8e4e2f6687cdf367ef7eb7497f79eaa2"},
{file = "tornado-6.1-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:66324e4e1beede9ac79e60f88de548da58b1f8ab4b2f1354d8375774f997e6c0"},
{file = "tornado-6.1-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:a48900ecea1cbb71b8c71c620dee15b62f85f7c14189bdeee54966fbd9a0c5bd"},
{file = "tornado-6.1-cp39-cp39-win32.whl", hash = "sha256:d3d20ea5782ba63ed13bc2b8c291a053c8d807a8fa927d941bd718468f7b950c"},
{file = "tornado-6.1-cp39-cp39-win_amd64.whl", hash = "sha256:548430be2740e327b3fe0201abe471f314741efcb0067ec4f2d7dcfb4825f3e4"},
{file = "tornado-6.1.tar.gz", hash = "sha256:33c6e81d7bd55b468d2e793517c909b139960b6c790a60b7991b9b6b76fb9791"},
]
towncrier = [
{file = "towncrier-21.9.0-py2.py3-none-any.whl", hash = "sha256:fc5a88a2a54988e3a8ed2b60d553599da8330f65722cc607c839614ed87e0f92"},
{file = "towncrier-21.9.0.tar.gz", hash = "sha256:9cb6f45c16e1a1eec9d0e7651165e7be60cd0ab81d13a5c96ca97a498ae87f48"},
@@ -2670,9 +2774,9 @@ twine = [
{file = "twine-3.8.0-py3-none-any.whl", hash = "sha256:d0550fca9dc19f3d5e8eadfce0c227294df0a2a951251a4385797c8a6198b7c8"},
{file = "twine-3.8.0.tar.gz", hash = "sha256:8efa52658e0ae770686a13b675569328f1fba9837e5de1867bfe5f46a9aefe19"},
]
twisted = [
{file = "Twisted-22.4.0-py3-none-any.whl", hash = "sha256:f9f7a91f94932477a9fc3b169d57f54f96c6e74a23d78d9ce54039a7f48928a2"},
{file = "Twisted-22.4.0.tar.gz", hash = "sha256:a047990f57dfae1e0bd2b7df2526d4f16dcdc843774dc108b78c52f2a5f13680"},
Twisted = [
{file = "Twisted-22.8.0-py3-none-any.whl", hash = "sha256:8d4718d1e48dcc28933f8beb48dc71cfe77a125e37ad1eb7a3d0acc49baf6c99"},
{file = "Twisted-22.8.0.tar.gz", hash = "sha256:e5b60de39f2d1da153fbe1874d885fe3fcbdb21fcc446fa759a53e8fc3513bed"},
]
twisted-iocpsupport = [
{file = "twisted-iocpsupport-1.0.2.tar.gz", hash = "sha256:72068b206ee809c9c596b57b5287259ea41ddb4774d86725b19f35bf56aa32a9"},
@@ -2742,10 +2846,6 @@ types-jsonschema = [
{file = "types-jsonschema-4.4.6.tar.gz", hash = "sha256:7f2a804618756768c7c0616f8c794b61fcfe3077c7ee1ad47dcf01c5e5f692bb"},
{file = "types_jsonschema-4.4.6-py3-none-any.whl", hash = "sha256:1db9031ca49a8444d01bd2ce8cf2f89318382b04610953b108321e6f8fb03390"},
]
types-opentracing = [
{file = "types-opentracing-2.4.7.tar.gz", hash = "sha256:be60e9618355aa892571ace002e6b353702538b1c0dc4fbc1c921219d6658830"},
{file = "types_opentracing-2.4.7-py3-none-any.whl", hash = "sha256:861fb8103b07cf717f501dd400cb274ca9992552314d4d6c7a824b11a215e512"},
]
types-pillow = [
{file = "types-Pillow-9.0.15.tar.gz", hash = "sha256:d2e385fe5c192e75970f18accce69f5c2a9f186f3feb578a9b91cd6fdf64211d"},
{file = "types_Pillow-9.0.15-py3-none-any.whl", hash = "sha256:c9646595dfafdf8b63d4b1443292ead17ee0fc7b18a143e497b68e0ea2dc1eb6"},
+6 -6
View File
@@ -200,13 +200,14 @@ authlib = { version = ">=0.14.0", optional = true }
systemd-python = { version = ">=231", optional = true }
lxml = { version = ">=4.2.0", optional = true }
sentry-sdk = { version = ">=0.7.2", optional = true }
opentracing = { version = ">=2.2.0", optional = true }
jaeger-client = { version = ">=4.0.0", optional = true }
txredisapi = { version = ">=1.4.7", optional = true }
hiredis = { version = "*", optional = true }
Pympler = { version = "*", optional = true }
parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
opentelemetry-api = {version = "^1.12.0", optional = true}
opentelemetry-sdk = {version = "^1.12.0", optional = true}
opentelemetry-exporter-jaeger = {version = "^1.12.0", optional = true}
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
@@ -221,7 +222,7 @@ oidc = ["authlib"]
systemd = ["systemd-python"]
url_preview = ["lxml"]
sentry = ["sentry-sdk"]
opentracing = ["jaeger-client", "opentracing"]
opentelemetry = ["opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-jaeger"]
jwt = ["authlib"]
# hiredis is not a *strict* dependency, but it makes things much faster.
# (if it is not installed, we fall back to slow code.)
@@ -254,8 +255,8 @@ all = [
"lxml",
# sentry
"sentry-sdk",
# opentracing
"jaeger-client", "opentracing",
# opentelemetry
"opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-jaeger",
# redis
"txredisapi", "hiredis",
# cache_memory
@@ -279,7 +280,6 @@ mypy-zope = "*"
types-bleach = ">=4.1.0"
types-commonmark = ">=0.9.2"
types-jsonschema = ">=3.2.0"
types-opentracing = ">=2.4.2"
types-Pillow = ">=8.3.4"
types-psycopg2 = ">=2.9.9"
types-pyOpenSSL = ">=20.0.7"
+11 -11
View File
@@ -31,10 +31,10 @@ from synapse.api.errors import (
from synapse.appservice import ApplicationService
from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import (
from synapse.logging.tracing import (
SynapseTags,
active_span,
force_tracing,
get_active_span,
start_active_span,
trace,
)
@@ -144,7 +144,7 @@ class Auth:
is invalid.
AuthError if access is denied for the user in the access token
"""
parent_span = active_span()
parent_span = get_active_span()
with start_active_span("get_user_by_req"):
requester = await self._wrapped_get_user_by_req(
request, allow_guest, allow_expired
@@ -154,25 +154,25 @@ class Auth:
if requester.authenticated_entity in self._force_tracing_for_users:
# request tracing is enabled for this user, so we need to force it
# tracing on for the parent span (which will be the servlet span).
#
force_tracing(parent_span)
# It's too late for the get_user_by_req span to inherit the setting,
# so we also force it on for that.
force_tracing()
force_tracing(parent_span)
parent_span.set_tag(
parent_span.set_attribute(
"authenticated_entity", requester.authenticated_entity
)
# We tag the Synapse instance name so that it's an easy jumping
# off point into the logs. Can also be used to filter for an
# instance that is under load.
parent_span.set_tag(
parent_span.set_attribute(
SynapseTags.INSTANCE_NAME, self.hs.get_instance_name()
)
parent_span.set_tag("user_id", requester.user.to_string())
parent_span.set_attribute("user_id", requester.user.to_string())
if requester.device_id is not None:
parent_span.set_tag("device_id", requester.device_id)
parent_span.set_attribute("device_id", requester.device_id)
if requester.app_service is not None:
parent_span.set_tag("appservice_id", requester.app_service.id)
parent_span.set_attribute("appservice_id", requester.app_service.id)
return requester
@cancellable
@@ -184,7 +184,7 @@ class Auth:
) -> Requester:
"""Helper for get_user_by_req
Once get_user_by_req has set up the opentracing span, this does the actual work.
Once get_user_by_req has set up the tracing span, this does the actual work.
"""
try:
ip_addr = request.getClientAddress().host
+3
View File
@@ -193,6 +193,9 @@ class LimitBlockingTypes:
class EventContentFields:
"""Fields found in events' content, regardless of type."""
# Synapse internal content field for tracing
TRACING_CONTEXT: Final = "org.matrix.tracing_context"
# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS: Final = "org.matrix.labels"
+1 -1
View File
@@ -62,7 +62,7 @@ from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import init_tracer
from synapse.logging.tracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
+26 -17
View File
@@ -24,41 +24,50 @@ class TracerConfig(Config):
section = "tracing"
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
opentracing_config = config.get("opentracing")
if opentracing_config is None:
opentracing_config = {}
tracing_config = config.get("tracing")
if tracing_config is None:
tracing_config = {}
self.opentracer_enabled = opentracing_config.get("enabled", False)
self.tracing_enabled = tracing_config.get("enabled", False)
self.jaeger_config = opentracing_config.get(
"jaeger_config",
{"sampler": {"type": "const", "param": 1}, "logging": False},
self.jaeger_exporter_config = tracing_config.get(
"jaeger_exporter_config",
{},
)
self.force_tracing_for_users: Set[str] = set()
if not self.opentracer_enabled:
if not self.tracing_enabled:
return
check_requirements("opentracing")
check_requirements("opentelemetry")
# The tracer is enabled so sanitize the config
self.opentracer_whitelist: List[str] = opentracing_config.get(
# Default to always sample. Range: [0.0 - 1.0]
self.sample_rate: float = float(tracing_config.get("sample_rate", 1))
if self.sample_rate < 0.0 or self.sample_rate > 1.0:
raise ConfigError(
"Tracing sample_rate must be in range [0.0, 1.0].",
("tracing", "sample_rate"),
)
self.homeserver_whitelist: List[str] = tracing_config.get(
"homeserver_whitelist", []
)
if not isinstance(self.opentracer_whitelist, list):
raise ConfigError("Tracer homeserver_whitelist config is malformed")
force_tracing_for_users = opentracing_config.get("force_tracing_for_users", [])
if not isinstance(force_tracing_for_users, list):
if not isinstance(self.homeserver_whitelist, list):
raise ConfigError(
"Expected a list", ("opentracing", "force_tracing_for_users")
"Tracing homeserver_whitelist config is malformed",
("tracing", "homeserver_whitelist"),
)
force_tracing_for_users = tracing_config.get("force_tracing_for_users", [])
if not isinstance(force_tracing_for_users, list):
raise ConfigError("Expected a list", ("tracing", "force_tracing_for_users"))
for i, u in enumerate(force_tracing_for_users):
if not isinstance(u, str):
raise ConfigError(
"Expected a string",
("opentracing", "force_tracing_for_users", f"index {i}"),
("tracing", "force_tracing_for_users", f"index {i}"),
)
self.force_tracing_for_users.add(u)
+1 -1
View File
@@ -28,7 +28,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.utils import prune_event, prune_event_dict
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
+5
View File
@@ -132,6 +132,11 @@ class EventBuilder:
auth_event_ids = self._event_auth_handler.compute_auth_events(
self, state_ids
)
logger.info(
"builder.build state_ids=%s auth_event_ids=%s",
state_ids,
auth_event_ids,
)
format_version = self.room_version.event_format
# The types of auth/prev events changes between event versions.
+1 -1
View File
@@ -32,7 +32,7 @@ from typing_extensions import Literal
import synapse
from synapse.api.errors import Codes
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
from synapse.spam_checker_api import RegistrationBehaviour
+1 -1
View File
@@ -23,7 +23,7 @@ from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.logging.tracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
if TYPE_CHECKING:
+6 -6
View File
@@ -61,7 +61,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.logging.tracing import SynapseTags, log_kv, set_attribute, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -472,19 +472,19 @@ class FederationClient(FederationBase):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "state_event_ids",
str(state_event_ids),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "state_event_ids.length",
str(len(state_event_ids)),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "auth_event_ids",
str(auth_event_ids),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
str(len(auth_event_ids)),
)
@@ -587,7 +587,7 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "pdus.length",
str(len(pdus)),
)
+2 -7
View File
@@ -61,12 +61,7 @@ from synapse.logging.context import (
nested_logging_context,
run_in_background,
)
from synapse.logging.opentracing import (
log_kv,
start_active_span_from_edu,
tag_args,
trace,
)
from synapse.logging.tracing import log_kv, start_active_span_from_edu, tag_args, trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -1431,7 +1426,7 @@ class FederationHandlerRegistry:
# Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
if handler:
with start_active_span_from_edu(content, "handle_edu"):
with start_active_span_from_edu("handle_edu", edu_content=content):
try:
await handler(origin, content)
except SynapseError as e:
@@ -32,7 +32,7 @@ from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.logging.tracing import SynapseTags, set_attribute
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
@@ -596,7 +596,7 @@ class PerDestinationQueue:
if not message_id:
continue
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
edus = [
Edu(
@@ -21,11 +21,13 @@ from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.logging.opentracing import (
from synapse.logging.tracing import (
Link,
StatusCode,
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
get_span_context_from_context,
set_status,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import JsonDict
@@ -79,7 +81,7 @@ class TransactionManager:
edus: List of EDUs to send
"""
# Make a transaction-sending opentracing span. This span follows on from
# Make a transaction-sending tracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
@@ -88,13 +90,20 @@ class TransactionManager:
keep_destination = whitelisted_homeserver(destination)
for edu in edus:
context = edu.get_context()
if context:
span_contexts.append(extract_text_map(json_decoder.decode(context)))
tracing_context_json = edu.get_tracing_context_json()
if tracing_context_json:
context = extract_text_map(json_decoder.decode(tracing_context_json))
if context:
span_context = get_span_context_from_context(context)
if span_context:
span_contexts.append(span_context)
if keep_destination:
edu.strip_context()
edu.strip_tracing_context()
with start_active_span_follows_from("send_transaction", span_contexts):
with start_active_span(
"send_transaction",
links=[Link(span_context) for span_context in span_contexts],
):
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
@@ -166,7 +175,7 @@ class TransactionManager:
except HttpResponseException as e:
code = e.code
set_tag(tags.ERROR, True)
set_status(StatusCode.ERROR, e)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise
+57 -44
View File
@@ -15,7 +15,6 @@
import functools
import logging
import re
import time
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple, cast
@@ -25,12 +24,15 @@ from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
active_span,
set_tag,
span_context_from_request,
from synapse.logging.tracing import (
Link,
context_from_request,
create_non_recording_span,
get_active_span,
set_attribute,
start_active_span,
start_active_span_follows_from,
start_span,
use_span,
whitelisted_homeserver,
)
from synapse.types import JsonDict
@@ -309,60 +311,71 @@ class BaseFederationServlet:
logger.warning("authenticate_request failed: %s", e)
raise
# update the active opentracing span with the authenticated entity
set_tag("authenticated_entity", str(origin))
# update the active tracing span with the authenticated entity
set_attribute("authenticated_entity", str(origin))
# if the origin is authenticated and whitelisted, use its span context
# as the parent.
context = None
origin_context = None
if origin and whitelisted_homeserver(origin):
context = span_context_from_request(request)
origin_context = context_from_request(request)
if context:
servlet_span = active_span()
# a scope which uses the origin's context as a parent
processing_start_time = time.time()
scope = start_active_span_follows_from(
remote_parent_span = None
if origin_context:
local_servlet_span = get_active_span()
# Create a span which uses the `origin_context` as a parent
# so we can see how the incoming payload was processed while
# we're looking at the outgoing trace. Since the parent is set
# to a remote span (from the origin), it won't show up in the
# local trace which is why we create another span below for the
# local trace. A span can only have one parent so we have to
# create two separate ones.
remote_parent_span = start_span(
"incoming-federation-request",
child_of=context,
contexts=(servlet_span,),
start_time=processing_start_time,
context=origin_context,
# Cross-link back to the local trace so we can jump
# to the incoming side from the remote origin trace.
links=[Link(local_servlet_span.get_span_context())]
if local_servlet_span
else None,
)
# Create a local span to appear in the local trace
local_parent_span_cm = start_active_span(
"process-federation-request",
# Cross-link back to the remote outgoing trace so we can
# jump over there.
links=[Link(remote_parent_span.get_span_context())],
)
else:
# just use our context as a parent
scope = start_active_span(
"incoming-federation-request",
# Otherwise just use our local active servlet context as a parent
local_parent_span_cm = start_active_span(
"process-federation-request",
)
try:
with scope:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
if request._disconnected:
logger.warning(
"client disconnected before we started processing "
"request"
)
return None
response = await func(
origin, content, request.args, *args, **kwargs
# Don't need to record anything for the remote because no remote
# trace context given.
remote_parent_span = create_non_recording_span()
remote_parent_span_cm = use_span(remote_parent_span, end_on_exit=True)
with remote_parent_span_cm, local_parent_span_cm:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
if request._disconnected:
logger.warning(
"client disconnected before we started processing "
"request"
)
else:
return None
response = await func(
origin, content, request.args, *args, **kwargs
)
finally:
# if we used the origin's context as the parent, add a new span using
# the servlet span as a parent, so that we have a link
if context:
scope2 = start_active_span_follows_from(
"process-federation_request",
contexts=(scope.span,),
start_time=processing_start_time,
else:
response = await func(
origin, content, request.args, *args, **kwargs
)
scope2.close()
return response
+7 -4
View File
@@ -21,6 +21,7 @@ from typing import List, Optional
import attr
from synapse.api.constants import EventContentFields
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -54,11 +55,13 @@ class Edu:
"destination": self.destination,
}
def get_context(self) -> str:
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
def get_tracing_context_json(self) -> str:
return getattr(self, "content", {}).get(
EventContentFields.TRACING_CONTEXT, "{}"
)
def strip_context(self) -> None:
getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
def strip_tracing_context(self) -> None:
getattr(self, "content", {})[EventContentFields.TRACING_CONTEXT] = "{}"
def _none_to_list(edus: Optional[List[JsonDict]]) -> List[JsonDict]:
+16 -16
View File
@@ -36,7 +36,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -87,7 +87,7 @@ class DeviceWorkerHandler:
info on each device
"""
set_tag("user_id", user_id)
set_attribute("user_id", user_id)
device_map = await self.store.get_devices_by_user(user_id)
ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -119,8 +119,8 @@ class DeviceWorkerHandler:
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
set_tag("device", str(device))
set_tag("ips", str(ips))
set_attribute("device", str(device))
set_attribute("ips", str(ips))
return device
@@ -172,8 +172,8 @@ class DeviceWorkerHandler:
joined a room, that `user_id` may be interested in.
"""
set_tag("user_id", user_id)
set_tag("from_token", str(from_token))
set_attribute("user_id", user_id)
set_attribute("from_token", str(from_token))
now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id)
@@ -466,8 +466,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
set_attribute("error", True)
set_attribute("reason", "User doesn't have that device id.")
else:
raise
@@ -685,7 +685,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
return
for user_id, device_id, room_id, stream_id, opentracing_context in rows:
for user_id, device_id, room_id, stream_id, tracing_context in rows:
hosts = set()
# Ignore any users that aren't ours
@@ -711,7 +711,7 @@ class DeviceHandler(DeviceWorkerHandler):
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
context=tracing_context,
)
# Notify replication that we've updated the device list stream.
@@ -804,8 +804,8 @@ class DeviceListUpdater:
for parsing the EDU and adding to pending updates list.
"""
set_tag("origin", origin)
set_tag("edu_content", str(edu_content))
set_attribute("origin", origin)
set_attribute("edu_content", str(edu_content))
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -825,7 +825,7 @@ class DeviceListUpdater:
origin,
)
set_tag("error", True)
set_attribute("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
@@ -840,7 +840,7 @@ class DeviceListUpdater:
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
set_tag("error", True)
set_attribute("error", True)
log_kv(
{
"message": "Got an update from a user for which "
@@ -1037,12 +1037,12 @@ class DeviceListUpdater:
# eventually become consistent.
return None
except FederationDeniedError as e:
set_tag("error", True)
set_attribute("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return None
except Exception as e:
set_tag("error", True)
set_attribute("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
+6 -6
View File
@@ -15,15 +15,15 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
from synapse.api.constants import EduTypes, ToDeviceEventTypes
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
from synapse.logging.tracing import (
SynapseTags,
get_active_span_text_map,
log_kv,
set_tag,
set_attribute,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
@@ -217,10 +217,10 @@ class DeviceMessageHandler:
sender_user_id = requester.user.to_string()
message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
set_attribute("sender", sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
@@ -273,7 +273,7 @@ class DeviceMessageHandler:
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json_encoder.encode(context),
EventContentFields.TRACING_CONTEXT: json_encoder.encode(context),
}
# Add messages to the database.
+13 -13
View File
@@ -28,7 +28,7 @@ from twisted.internet import defer
from synapse.api.constants import EduTypes
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.logging.tracing import log_kv, set_attribute, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
JsonDict,
@@ -140,8 +140,8 @@ class E2eKeysHandler:
else:
remote_queries[user_id] = device_ids
set_tag("local_key_query", str(local_query))
set_tag("remote_key_query", str(remote_queries))
set_attribute("local_key_query", str(local_query))
set_attribute("remote_key_query", str(remote_queries))
# First get local devices.
# A map of destination -> failure response.
@@ -377,8 +377,8 @@ class E2eKeysHandler:
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", str(failure))
set_attribute("error", True)
set_attribute("reason", str(failure))
return
@@ -442,7 +442,7 @@ class E2eKeysHandler:
Returns:
A map from user_id -> device_id -> device details
"""
set_tag("local_query", str(query))
set_attribute("local_query", str(query))
local_query: List[Tuple[str, Optional[str]]] = []
result_dict: Dict[str, Dict[str, dict]] = {}
@@ -457,7 +457,7 @@ class E2eKeysHandler:
"user_id": user_id,
}
)
set_tag("error", True)
set_attribute("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -514,8 +514,8 @@ class E2eKeysHandler:
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = one_time_keys
set_tag("local_key_query", str(local_query))
set_tag("remote_key_query", str(remote_queries))
set_attribute("local_key_query", str(local_query))
set_attribute("remote_key_query", str(remote_queries))
results = await self.store.claim_e2e_one_time_keys(local_query)
@@ -531,7 +531,7 @@ class E2eKeysHandler:
@trace
async def claim_client_keys(destination: str) -> None:
set_tag("destination", destination)
set_attribute("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = await self.federation.claim_client_keys(
@@ -544,8 +544,8 @@ class E2eKeysHandler:
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", str(failure))
set_attribute("error", True)
set_attribute("reason", str(failure))
await make_deferred_yieldable(
defer.gatherResults(
@@ -648,7 +648,7 @@ class E2eKeysHandler:
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
set_tag("one_time_key_counts", str(result))
set_attribute("one_time_key_counts", str(result))
return {"one_time_key_counts": result}
async def _upload_one_time_keys_for_user(
+1 -1
View File
@@ -25,7 +25,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
from synapse.logging.tracing import log_kv, trace
from synapse.storage.databases.main.e2e_room_keys import RoomKey
from synapse.types import JsonDict
from synapse.util.async_helpers import Linearizer
+46 -45
View File
@@ -38,7 +38,7 @@ from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
from synapse import event_auth
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -60,7 +60,7 @@ from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
@@ -119,6 +119,7 @@ class _BackfillPoint:
event_id: str
depth: int
stream_ordering: int
type: _BackfillPointType
@@ -225,16 +226,24 @@ class FederationHandler:
processing. Only used for timing.
"""
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
for event_id, depth in await self.store.get_backfill_points_in_room(room_id)
_BackfillPoint(
event_id, depth, stream_ordering, _BackfillPointType.BACKWARDS_EXTREMITY
)
for event_id, depth, stream_ordering in await self.store.get_backfill_points_in_room(
room_id=room_id,
current_depth=current_depth,
)
]
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = [
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
room_id
_BackfillPoint(
event_id, depth, stream_ordering, _BackfillPointType.INSERTION_PONT
)
for event_id, depth, stream_ordering in await self.store.get_insertion_event_backward_extremities_in_room(
room_id=room_id,
current_depth=current_depth,
)
]
logger.debug(
@@ -243,10 +252,6 @@ class FederationHandler:
insertion_events_to_be_backfilled,
)
if not backwards_extremities and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False
# we now have a list of potential places to backpaginate from. We prefer to
# start with the most recent (ie, max depth), so let's sort the list.
sorted_backfill_points: List[_BackfillPoint] = sorted(
@@ -254,7 +259,7 @@ class FederationHandler:
backwards_extremities,
insertion_events_to_be_backfilled,
),
key=lambda e: -int(e.depth),
key=lambda e: (-e.depth, -e.stream_ordering, e.event_id),
)
logger.debug(
@@ -267,6 +272,29 @@ class FederationHandler:
sorted_backfill_points,
)
# If we have no backfill points lower than the `current_depth` then
# either we can a) bail or b) still attempt to backfill. We opt to try
# backfilling anyway just in case we do get relevant events.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
)
return await self._maybe_backfill_inner(
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
current_depth=MAX_DEPTH,
limit=limit,
processing_start_time=processing_start_time,
)
elif not sorted_backfill_points and current_depth == MAX_DEPTH:
# Even after trying again with `MAX_DEPTH`, we didn't find any
# backward extremities to backfill from.
logger.debug(
"_maybe_backfill_inner: Not backfilling as no backward extremeties found."
)
return False
# If we're approaching an extremity we trigger a backfill, otherwise we
# no-op.
#
@@ -280,43 +308,16 @@ class FederationHandler:
# XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
# care about events that have happened after our current position.
#
max_depth = sorted_backfill_points[0].depth
if current_depth - 2 * limit > max_depth:
max_depth_of_backfill_points = sorted_backfill_points[0].depth
if current_depth - 2 * limit > max_depth_of_backfill_points:
logger.debug(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
max_depth_of_backfill_points,
current_depth,
limit,
)
return False
# We ignore extremities that have a greater depth than our current depth
# as:
# 1. we don't really care about getting events that have happened
# after our current position; and
# 2. we have likely previously tried and failed to backfill from that
# extremity, so to avoid getting "stuck" requesting the same
# backfill repeatedly we drop those extremities.
#
# However, we need to check that the filtered extremities are non-empty.
# If they are empty then either we can a) bail or b) still attempt to
# backfill. We opt to try backfilling anyway just in case we do get
# relevant events.
#
filtered_sorted_backfill_points = [
t for t in sorted_backfill_points if t.depth <= current_depth
]
if filtered_sorted_backfill_points:
logger.debug(
"_maybe_backfill_inner: backfill points before current depth: %s",
filtered_sorted_backfill_points,
)
sorted_backfill_points = filtered_sorted_backfill_points
else:
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
)
# For performance's sake, we only want to paginate from a particular extremity
# if we can actually see the events we'll get. Otherwise, we'd just spend a lot
# of resources to get redacted events. We check each extremity in turn and
@@ -386,14 +387,14 @@ class FederationHandler:
)
return False
logger.debug(
logger.info(
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "extremities_to_request",
str(extremities_to_request),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
str(len(extremities_to_request)),
)
+187 -21
View File
@@ -14,6 +14,7 @@
import collections
import itertools
import json
import logging
from http import HTTPStatus
from typing import (
@@ -59,9 +60,9 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import (
from synapse.logging.tracing import (
SynapseTags,
set_tag,
set_attribute,
start_active_span,
tag_args,
trace,
@@ -137,6 +138,7 @@ class FederationEventHandler:
"""
def __init__(self, hs: "HomeServer"):
self.hs = hs
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
@@ -177,6 +179,7 @@ class FederationEventHandler:
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@trace
async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
"""Process a PDU received via a federation /send/ transaction
@@ -644,9 +647,108 @@ class FederationEventHandler:
f"room {ev.room_id}, when we were backfilling in {room_id}"
)
# foo
#
# We expect the events from the `/backfill`response to start from
# `?v` and include events that preceded it (so the list will be
# newest -> oldest, reverse chronological). This is at-most a
# convention between Synapse servers as the order is not specced.
#
# We want to calculate the `stream_ordering`` from newest -> oldest
# (so historical events sort in the correct order) and persist in
# oldest -> newest to get the least missing `prev_event` fetch
# thrashing.
reverse_chronological_events = events
# `[::-1]` is just syntax to reverse the list and give us a copy
chronological_events = reverse_chronological_events[::-1]
logger.info(
"backfill assumed reverse_chronological_events=%s",
[
"event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in reverse_chronological_events
],
)
# logger.info(
# "backfill chronological_events=%s",
# [
# "event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
# % (
# event.event_id,
# event.depth,
# event.content.get("body", event.type),
# getattr(event, "state_key", None),
# event.prev_event_ids(),
# )
# for event in chronological_events
# ],
# )
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
# This should only exist on instances that are configured to write
assert (
self._instance_name in self.hs.config.worker.writers.events
), "Can only instantiate xxxfoobarbaz on master"
# Since we have been configured to write, we ought to have id generators,
# rather than id trackers.
assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
len(reverse_chronological_events)
)
async with stream_ordering_manager as stream_orderings:
for event, stream in zip(
reverse_chronological_events, stream_orderings
):
event.internal_metadata.stream_ordering = stream
logger.info(
"backfill chronological_events=%s",
[
"event_id=%s,depth=%d,stream_ordering=%s,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.internal_metadata.stream_ordering,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in chronological_events
],
)
logger.info(
"backfill reverse_chronological_events=%s",
[
"event_id=%s,depth=%d,stream_ordering=%s,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.internal_metadata.stream_ordering,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in reverse_chronological_events
],
)
await self._process_pulled_events(
dest,
events,
# Expecting to persist in chronological order here (oldest ->
# newest) so that events are persisted before they're referenced
# as a `prev_event`.
chronological_events,
# reverse_chronological_events,
backfilled=True,
)
@@ -768,24 +870,25 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
set_tag(
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str([event.event_id for event in events]),
)
set_tag(
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(events)),
)
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
"event_id=%s,depth=%d,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in events
@@ -795,6 +898,26 @@ class FederationEventHandler:
# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
logger.info(
"_process_pulled_events backfill sorted_events=%s",
json.dumps(
[
"event_id=%s,depth=%d,stream_ordering=%s,body=%s(%s),prevs=%s\n"
% (
event.event_id,
event.depth,
event.internal_metadata.stream_ordering,
event.content.get("body", event.type),
getattr(event, "state_key", None),
event.prev_event_ids(),
)
for event in sorted_events
],
indent=4,
),
)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -1073,11 +1196,18 @@ class FederationEventHandler:
destination, room_id, event_id=event_id
)
logger.debug(
"state_ids returned %i state events, %i auth events",
logger.info(
"_get_state_ids_after_missing_prev_event(event_id=%s): state_ids returned %i state events, %i auth events",
event_id,
len(state_event_ids),
len(auth_event_ids),
)
logger.info(
"_get_state_ids_after_missing_prev_event(event_id=%s): state_event_ids=%s auth_event_ids=%s",
event_id,
state_event_ids,
auth_event_ids,
)
# Start by checking events we already have in the DB
desired_events = set(state_event_ids)
@@ -1108,19 +1238,19 @@ class FederationEventHandler:
missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
str(missing_auth_event_ids),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
str(len(missing_auth_event_ids)),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
str(missing_desired_event_ids),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
str(len(missing_desired_event_ids)),
)
@@ -1197,15 +1327,20 @@ class FederationEventHandler:
event_id,
failed_to_fetch,
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "failed_to_fetch",
str(failed_to_fetch),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
str(len(failed_to_fetch)),
)
set_attribute(
SynapseTags.RESULT_PREFIX + f"failed_to_fetch ({len(failed_to_fetch)})",
str(failed_to_fetch),
)
if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
(remote_event.type, remote_event.state_key)
@@ -1586,11 +1721,11 @@ class FederationEventHandler:
event_map = {event.event_id: event for event in events}
event_ids = event_map.keys()
set_tag(
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str(event_ids),
)
set_tag(
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)
@@ -1750,11 +1885,11 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "claimed_auth_events",
str([ev.event_id for ev in claimed_auth_events]),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
str(len(claimed_auth_events)),
)
@@ -1785,6 +1920,12 @@ class FederationEventHandler:
# already have checked we have all the auth events, in
# _load_or_fetch_auth_events_for_event above)
if context.partial_state:
logger.info(
"_check_event_auth(event=%s) with partial_state - %s (%s)",
event.event_id,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
room_version = await self._store.get_room_version_id(event.room_id)
local_state_id_map = await context.get_prev_state_ids()
@@ -1802,15 +1943,40 @@ class FederationEventHandler:
)
)
else:
logger.info(
"_check_event_auth(event=%s) with full state - %s (%s)",
event.event_id,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
event_types = event_auth.auth_types_for_event(event.room_version, event)
state_for_auth_id_map = await context.get_prev_state_ids(
StateFilter.from_types(event_types)
)
logger.info(
"_check_event_auth(event=%s) state_for_auth_id_map=%s - %s (%s)",
event.event_id,
state_for_auth_id_map,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
calculated_auth_event_ids = self._event_auth_handler.compute_auth_events(
event, state_for_auth_id_map, for_verification=True
)
logger.info(
"_check_event_auth(event=%s) match=%s claimed_auth_events=%s calculated_auth_event_ids=%s - %s (%s)",
event.event_id,
collections.Counter(event.auth_event_ids())
== collections.Counter(calculated_auth_event_ids),
event.auth_event_ids(),
calculated_auth_event_ids,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
# if those are the same, we're done here.
if collections.Counter(event.auth_event_ids()) == collections.Counter(
calculated_auth_event_ids
@@ -2203,11 +2369,11 @@ class FederationEventHandler:
if not backfilled: # Never notify for backfilled events
with start_active_span("notify_persisted_events"):
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "event_ids",
str([ev.event_id for ev in events]),
)
set_tag(
set_attribute(
SynapseTags.RESULT_PREFIX + "event_ids.length",
str(len(events)),
)
+15 -3
View File
@@ -52,7 +52,7 @@ from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.logging import opentracing
from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
@@ -1081,6 +1081,11 @@ class EventCreationHandler:
# Do a quick sanity check here, rather than waiting until we've created the
# event and then try to auth it (which fails with a somewhat confusing "No
# create event in auth events")
logger.info(
"create_new_client_event allow_no_prev_events=%s auth_event_ids=%s",
allow_no_prev_events,
auth_event_ids,
)
if allow_no_prev_events:
# We allow events with no `prev_events` but it better have some `auth_events`
assert (
@@ -1101,6 +1106,13 @@ class EventCreationHandler:
depth=depth,
)
logger.info(
"create_new_client_event(event=%s) state_event_ids=%s resultant auth_event_ids=%s",
event.event_id,
state_event_ids,
auth_event_ids,
)
# Pass on the outlier property from the builder to the event
# after it is created
if builder.internal_metadata.outlier:
@@ -1374,7 +1386,7 @@ class EventCreationHandler:
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
with opentracing.start_active_span("calculate_push_actions"):
with tracing.start_active_span("calculate_push_actions"):
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
@@ -1464,7 +1476,7 @@ class EventCreationHandler:
state = await state_entry.get_state(
self._storage_controllers.state, StateFilter.all()
)
with opentracing.start_active_span("get_joined_hosts"):
with tracing.start_active_span("get_joined_hosts"):
joined_hosts = await self.store.get_joined_hosts(
event.room_id, state, state_entry
)
+1 -1
View File
@@ -24,7 +24,7 @@ from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
+9 -1
View File
@@ -19,7 +19,7 @@ import attr
from synapse.api.constants import RelationTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event
from synapse.logging.opentracing import trace
from synapse.logging.tracing import SynapseTags, set_attribute, trace
from synapse.storage.databases.main.relations import _RelatedEvent
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.visibility import filter_events_for_client
@@ -166,6 +166,7 @@ class RelationsHandler:
return return_value
@trace
async def get_relations_for_event(
self,
event_id: str,
@@ -200,6 +201,7 @@ class RelationsHandler:
return related_events, next_token
@trace
async def get_annotations_for_event(
self,
event_id: str,
@@ -245,6 +247,7 @@ class RelationsHandler:
return filtered_results
@trace
async def _get_threads_for_events(
self,
events_by_id: Dict[str, EventBase],
@@ -406,6 +409,11 @@ class RelationsHandler:
# The event should get bundled aggregations.
events_by_id[event.event_id] = event
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events_by_id)})",
str(events_by_id.keys()),
)
# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {}
+8 -8
View File
@@ -153,6 +153,7 @@ class RoomBatchHandler:
self,
state_events_at_start: List[JsonDict],
room_id: str,
initial_prev_event_ids: List[str],
initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
@@ -178,10 +179,8 @@ class RoomBatchHandler:
state_event_ids_at_start = []
state_event_ids = initial_state_event_ids.copy()
# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
# `@mxid joined the room` noise between each batch.
prev_event_ids_for_state_chain: List[str] = []
# TODO: Here
prev_event_ids_for_state_chain: List[str] = initial_prev_event_ids
for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
@@ -269,6 +268,7 @@ class RoomBatchHandler:
events_to_create: List[JsonDict],
room_id: str,
inherited_depth: int,
state_chain_event_id_to_connect_to: str,
initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
@@ -301,10 +301,8 @@ class RoomBatchHandler:
# We expect the last event in a historical batch to be an batch event
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []
# TODO: Here
prev_event_ids: List[str] = [state_chain_event_id_to_connect_to]
event_ids = []
events_to_persist = []
@@ -390,6 +388,7 @@ class RoomBatchHandler:
events_to_create: List[JsonDict],
room_id: str,
batch_id_to_connect_to: str,
state_chain_event_id_to_connect_to: str,
inherited_depth: int,
initial_state_event_ids: List[str],
app_service_requester: Requester,
@@ -458,6 +457,7 @@ class RoomBatchHandler:
events_to_create=events_to_create,
room_id=room_id,
inherited_depth=inherited_depth,
state_chain_event_id_to_connect_to=state_chain_event_id_to_connect_to,
initial_state_event_ids=initial_state_event_ids,
app_service_requester=app_service_requester,
)
+3 -3
View File
@@ -32,7 +32,7 @@ from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.logging import tracing
from synapse.module_api import NOT_SPAM
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -429,7 +429,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)
with opentracing.start_active_span("handle_new_client_event"):
with tracing.start_active_span("handle_new_client_event"):
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
event,
@@ -565,7 +565,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
with opentracing.start_active_span("update_membership_locked"):
with tracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
target,
+14 -9
View File
@@ -38,7 +38,12 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.logging.tracing import (
SynapseTags,
log_kv,
set_attribute,
start_active_span,
)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import NotifCounts
from synapse.storage.roommember import MemberSummary
@@ -404,12 +409,12 @@ class SyncHandler:
indoctrination.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
log_kv({"since_token": str(since_token)})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
set_attribute(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]:
@@ -1303,7 +1308,7 @@ class SyncHandler:
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})
log_kv({"now_token": str(now_token)})
logger.debug(
"Calculating sync response for %r between %s and %s",
@@ -1555,7 +1560,7 @@ class SyncHandler:
# `/sync`
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
@@ -2223,13 +2228,13 @@ class SyncHandler:
upto_token = room_builder.upto_token
with start_active_span("sync.generate_room_entry"):
set_tag("room_id", room_id)
set_attribute("room_id", room_id)
log_kv({"events": len(events or ())})
log_kv(
{
"since_token": since_token,
"upto_token": upto_token,
"since_token": str(since_token),
"upto_token": str(upto_token),
}
)
@@ -2244,7 +2249,7 @@ class SyncHandler:
log_kv(
{
"batch_events": len(batch.events),
"prev_batch": batch.prev_batch,
"prev_batch": str(batch.prev_batch),
"batch_limited": batch.limited,
}
)
+12 -8
View File
@@ -75,7 +75,13 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.logging.tracing import (
SpanAttributes,
SpanKind,
StatusCode,
set_status,
start_active_span,
)
from synapse.types import ISynapseReactor
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
@@ -402,12 +408,11 @@ class SimpleHttpClient:
with start_active_span(
"outgoing-client-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.HTTP_METHOD: method,
tags.HTTP_URL: uri,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.HTTP_METHOD: method,
SpanAttributes.HTTP_URL: uri,
},
finish_on_close=True,
):
try:
body_producer = None
@@ -459,8 +464,7 @@ class SimpleHttpClient:
type(e).__name__,
e.args[0],
)
set_tag(tags.ERROR, True)
set_tag("error_reason", e.args[0])
set_status(StatusCode.ERROR, e)
raise
async def post_urlencoded_get_json(
+16 -10
View File
@@ -72,9 +72,14 @@ from synapse.http.client import (
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.logging.tracing import (
SpanAttributes,
SpanKind,
set_attribute,
start_active_span,
)
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
@@ -517,18 +522,19 @@ class MatrixFederationHttpClient:
scope = start_active_span(
"outgoing-federation-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.PEER_ADDRESS: request.destination,
tags.HTTP_METHOD: request.method,
tags.HTTP_URL: request.path,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.HTTP_HOST: request.destination,
SpanAttributes.HTTP_METHOD: request.method,
SpanAttributes.HTTP_URL: request.path,
},
finish_on_close=True,
)
# Inject the span into the headers
headers_dict: Dict[bytes, List[bytes]] = {}
opentracing.inject_header_dict(headers_dict, request.destination)
tracing.inject_active_tracing_context_into_header_dict(
headers_dict, request.destination
)
headers_dict[b"User-Agent"] = [self.version_string_bytes]
@@ -614,7 +620,7 @@ class MatrixFederationHttpClient:
request.method, response.code
).inc()
set_tag(tags.HTTP_STATUS_CODE, response.code)
set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.code)
response_phrase = response.phrase.decode("ascii", errors="replace")
if 200 <= response.code < 300:
+12 -12
View File
@@ -60,14 +60,14 @@ from synapse.api.errors import (
from synapse.config.homeserver import HomeServerConfig
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
from synapse.logging.tracing import get_active_span, start_active_span, trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.cancellation import is_function_cancellable
from synapse.util.iterutils import chunk_seq
if TYPE_CHECKING:
import opentracing
import opentelemetry
from synapse.server import HomeServer
@@ -267,7 +267,7 @@ class HttpServer(Protocol):
subsequent arguments will be any matched groups from the regex.
This should return either tuple of (code, response), or None.
servlet_classname (str): The name of the handler to be used in prometheus
and opentracing logs.
and tracing logs.
"""
@@ -278,7 +278,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
requests by method, or override `_async_render` to handle all requests.
Args:
extract_context: Whether to attempt to extract the opentracing
extract_context: Whether to attempt to extract the tracing
context from the request the servlet is handling.
"""
@@ -448,7 +448,7 @@ class JsonResource(DirectServeJsonResource):
callback: The handler for the request. Usually a Servlet
servlet_classname: The name of the handler to be used in prometheus
and opentracing logs.
and tracing logs.
"""
method_bytes = method.encode("utf-8")
@@ -816,19 +816,19 @@ async def _async_write_json_to_request_in_thread(
expensive.
"""
def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes:
def encode(tracing_span: Optional["opentelemetry.trace.Span"]) -> bytes:
# it might take a while for the threadpool to schedule us, so we write
# opentracing logs once we actually get scheduled, so that we can see how
# tracing logs once we actually get scheduled, so that we can see how
# much that contributed.
if opentracing_span:
opentracing_span.log_kv({"event": "scheduled"})
if tracing_span:
tracing_span.add_event("scheduled", attributes={"event": "scheduled"})
res = json_encoder(json_object)
if opentracing_span:
opentracing_span.log_kv({"event": "encoded"})
if tracing_span:
tracing_span.add_event("scheduled", attributes={"event": "encoded"})
return res
with start_active_span("encode_json_response"):
span = active_span()
span = get_active_span()
json_str = await defer_to_thread(request.reactor, encode, span)
_write_bytes_to_request(request, json_str)
+24 -16
View File
@@ -37,7 +37,7 @@ from synapse.logging.context import (
from synapse.types import Requester
if TYPE_CHECKING:
import opentracing
import opentelemetry
logger = logging.getLogger(__name__)
@@ -87,9 +87,9 @@ class SynapseRequest(Request):
# server name, for client requests this is the Requester object.
self._requester: Optional[Union[Requester, str]] = None
# An opentracing span for this request. Will be closed when the request is
# An tracing span for this request. Will be closed when the request is
# completely processed.
self._opentracing_span: "Optional[opentracing.Span]" = None
self._tracing_span: Optional["opentelemetry.trace.Span"] = None
# we can't yet create the logcontext, as we don't know the method.
self.logcontext: Optional[LoggingContext] = None
@@ -166,12 +166,12 @@ class SynapseRequest(Request):
# If there's no authenticated entity, it was the requester.
self.logcontext.request.authenticated_entity = authenticated_entity or requester
def set_opentracing_span(self, span: "opentracing.Span") -> None:
"""attach an opentracing span to this request
def set_tracing_span(self, span: "opentelemetry.trace.Span") -> None:
"""attach an tracing span to this request
Doing so will cause the span to be closed when we finish processing the request
"""
self._opentracing_span = span
self._tracing_span = span
def get_request_id(self) -> str:
request_id_value = None
@@ -318,8 +318,10 @@ class SynapseRequest(Request):
self._processing_finished_time = time.time()
self._is_processing = False
if self._opentracing_span:
self._opentracing_span.log_kv({"event": "finished processing"})
if self._tracing_span:
self._tracing_span.add_event(
"finished processing", attributes={"event": "finished processing"}
)
# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
@@ -334,8 +336,10 @@ class SynapseRequest(Request):
"""
self.finish_time = time.time()
Request.finish(self)
if self._opentracing_span:
self._opentracing_span.log_kv({"event": "response sent"})
if self._tracing_span:
self._tracing_span.add_event(
"response sent", attributes={"event": "response sent"}
)
if not self._is_processing:
assert self.logcontext is not None
with PreserveLoggingContext(self.logcontext):
@@ -370,9 +374,13 @@ class SynapseRequest(Request):
with PreserveLoggingContext(self.logcontext):
logger.info("Connection from client lost before response was sent")
if self._opentracing_span:
self._opentracing_span.log_kv(
{"event": "client connection lost", "reason": str(reason.value)}
if self._tracing_span:
self._tracing_span.add_event(
"client connection lost",
attributes={
"event": "client connection lost",
"reason": str(reason.value),
},
)
if self._is_processing:
@@ -480,9 +488,9 @@ class SynapseRequest(Request):
usage.evt_db_fetch_count,
)
# complete the opentracing span, if any.
if self._opentracing_span:
self._opentracing_span.finish()
# complete the tracing span, if any.
if self._tracing_span:
self._tracing_span.end()
try:
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
+1 -8
View File
@@ -46,7 +46,6 @@ from twisted.internet import defer, threads
from twisted.python.threadpool import ThreadPool
if TYPE_CHECKING:
from synapse.logging.scopecontextmanager import _LogContextScope
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
@@ -221,14 +220,13 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
class _Sentinel:
"""Sentinel to represent the root context"""
__slots__ = ["previous_context", "finished", "request", "scope", "tag"]
__slots__ = ["previous_context", "finished", "request", "tag"]
def __init__(self) -> None:
# Minimal set for compatibility with LoggingContext
self.previous_context = None
self.finished = False
self.request = None
self.scope = None
self.tag = None
def __str__(self) -> str:
@@ -281,7 +279,6 @@ class LoggingContext:
"finished",
"request",
"tag",
"scope",
]
def __init__(
@@ -302,7 +299,6 @@ class LoggingContext:
self.main_thread = get_thread_id()
self.request = None
self.tag = ""
self.scope: Optional["_LogContextScope"] = None
# keep track of whether we have hit the __exit__ block for this context
# (suggesting that the the thing that created the context thinks it should
@@ -315,9 +311,6 @@ class LoggingContext:
# we track the current request_id
self.request = self.parent_context.request
# we also track the current scope:
self.scope = self.parent_context.scope
if request is not None:
# the request param overrides the request from the parent context
self.request = request
File diff suppressed because it is too large Load Diff
-171
View File
@@ -1,171 +0,0 @@
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import logging
import logging
from types import TracebackType
from typing import Optional, Type
from opentracing import Scope, ScopeManager, Span
import twisted
from synapse.logging.context import (
LoggingContext,
current_context,
nested_logging_context,
)
logger = logging.getLogger(__name__)
class LogContextScopeManager(ScopeManager):
"""
The LogContextScopeManager tracks the active scope in opentracing
by using the log contexts which are native to synapse. This is so
that the basic opentracing api can be used across twisted defereds.
It would be nice just to use opentracing's ContextVarsScopeManager,
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
"""
def __init__(self) -> None:
pass
@property
def active(self) -> Optional[Scope]:
"""
Returns the currently active Scope which can be used to access the
currently active Scope.span.
If there is a non-null Scope, its wrapped Span
becomes an implicit parent of any newly-created Span at
Tracer.start_active_span() time.
Return:
The Scope that is active, or None if not available.
"""
ctx = current_context()
return ctx.scope
def activate(self, span: Span, finish_on_close: bool) -> Scope:
"""
Makes a Span active.
Args
span: the span that should become active.
finish_on_close: whether Span should be automatically finished when
Scope.close() is called.
Returns:
Scope to control the end of the active period for
*span*. It is a programming error to neglect to call
Scope.close() on the returned instance.
"""
ctx = current_context()
if not ctx:
logger.error("Tried to activate scope outside of loggingcontext")
return Scope(None, span) # type: ignore[arg-type]
if ctx.scope is not None:
# start a new logging context as a child of the existing one.
# Doing so -- rather than updating the existing logcontext -- means that
# creating several concurrent spans under the same logcontext works
# correctly.
ctx = nested_logging_context("")
enter_logcontext = True
else:
# if there is no span currently associated with the current logcontext, we
# just store the scope in it.
#
# This feels a bit dubious, but it does hack around a problem where a
# span outlasts its parent logcontext (which would otherwise lead to
# "Re-starting finished log context" errors).
enter_logcontext = False
scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
ctx.scope = scope
if enter_logcontext:
ctx.__enter__()
return scope
class _LogContextScope(Scope):
"""
A custom opentracing scope, associated with a LogContext
* filters out _DefGen_Return exceptions which arise from calling
`defer.returnValue` in Twisted code
* When the scope is closed, the logcontext's active scope is reset to None.
and - if enter_logcontext was set - the logcontext is finished too.
"""
def __init__(
self,
manager: LogContextScopeManager,
span: Span,
logcontext: LoggingContext,
enter_logcontext: bool,
finish_on_close: bool,
):
"""
Args:
manager:
the manager that is responsible for this scope.
span:
the opentracing span which this scope represents the local
lifetime for.
logcontext:
the log context to which this scope is attached.
enter_logcontext:
if True the log context will be exited when the scope is finished
finish_on_close:
if True finish the span when the scope is closed
"""
super().__init__(manager, span)
self.logcontext = logcontext
self._finish_on_close = finish_on_close
self._enter_logcontext = enter_logcontext
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
if exc_type == twisted.internet.defer._DefGen_Return:
# filter out defer.returnValue() calls
exc_type = value = traceback = None
super().__exit__(exc_type, value, traceback)
def __str__(self) -> str:
return f"Scope<{self.span}>"
def close(self) -> None:
active_scope = self.manager.active
if active_scope is not self:
logger.error(
"Closing scope %s which is not the currently-active one %s",
self,
active_scope,
)
if self._finish_on_close:
self.span.finish()
self.logcontext.scope = None
if self._enter_logcontext:
self.logcontext.__exit__(None, None, None)
File diff suppressed because it is too large Load Diff
@@ -42,7 +42,7 @@ from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
)
from synapse.logging.opentracing import SynapseTags, start_active_span
from synapse.logging.tracing import SynapseTags, start_active_span
from synapse.metrics._types import Collector
if TYPE_CHECKING:
@@ -208,7 +208,7 @@ def run_as_background_process(
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
bg_start_span: Whether to start an opentracing span. Defaults to True.
bg_start_span: Whether to start an tracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
args: positional args for func
@@ -232,10 +232,11 @@ def run_as_background_process(
try:
if bg_start_span:
ctx = start_active_span(
f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)}
f"bgproc.{desc}",
attributes={SynapseTags.REQUEST_ID: str(context)},
)
else:
ctx = nullcontext() # type: ignore[assignment]
ctx = nullcontext()
with ctx:
return await func(*args, **kwargs)
except Exception:
+3 -3
View File
@@ -39,7 +39,7 @@ from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.tracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
@@ -536,7 +536,7 @@ class Notifier:
log_kv(
{
"wait_for_events": "sleep",
"token": prev_token,
"token": str(prev_token),
}
)
@@ -546,7 +546,7 @@ class Notifier:
log_kv(
{
"wait_for_events": "woken",
"token": user_stream.current_token,
"token": str(user_stream.current_token),
}
)
+3 -3
View File
@@ -23,7 +23,7 @@ from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging import opentracing
from synapse.logging import tracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
@@ -198,9 +198,9 @@ class HttpPusher(Pusher):
)
for push_action in unprocessed:
with opentracing.start_active_span(
with tracing.start_active_span(
"http-push",
tags={
attributes={
"authenticated_entity": self.user_id,
"event_id": push_action.event_id,
"app_id": self.app_id,
+5 -3
View File
@@ -28,8 +28,8 @@ from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.opentracing import trace_with_opname
from synapse.logging import tracing
from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.cancellation import is_function_cancellable
@@ -249,7 +249,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)
tracing.inject_active_tracing_context_into_header_dict(
headers, check_destination=False
)
try:
# Keep track of attempts made so we can bail if we don't manage to
+5 -5
View File
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Optional
from prometheus_client import Counter, Histogram
from synapse.logging import opentracing
from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable
from synapse.util import json_decoder, json_encoder
@@ -94,9 +94,9 @@ class ExternalCache:
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
with opentracing.start_active_span(
with tracing.start_active_span(
"ExternalCache.set",
tags={opentracing.SynapseTags.CACHE_NAME: cache_name},
attributes={tracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("set").time():
return await make_deferred_yieldable(
@@ -113,9 +113,9 @@ class ExternalCache:
if self._redis_connection is None:
return None
with opentracing.start_active_span(
with tracing.start_active_span(
"ExternalCache.get",
tags={opentracing.SynapseTags.CACHE_NAME: cache_name},
attributes={tracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("get").time():
result = await make_deferred_yieldable(
+4 -4
View File
@@ -26,7 +26,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag
from synapse.logging.tracing import log_kv, set_attribute
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.types import JsonDict, StreamToken
from synapse.util.cancellation import cancellable
@@ -87,7 +87,7 @@ class KeyUploadServlet(RestServlet):
user_id
)
if dehydrated_device is not None and device_id != dehydrated_device[0]:
set_tag("error", True)
set_attribute("error", True)
log_kv(
{
"message": "Client uploading keys for a different device",
@@ -205,13 +205,13 @@ class KeyChangesServlet(RestServlet):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from", required=True)
set_tag("from", from_token_string)
set_attribute("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
#
# XXX This does not enforce that "to" is passed.
set_tag("to", str(parse_string(request, "to")))
set_attribute("to", str(parse_string(request, "to")))
from_token = await StreamToken.from_string(self.store, from_token_string)
+2 -2
View File
@@ -24,7 +24,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import set_tag
from synapse.logging.tracing import set_attribute
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict, RoomAlias, RoomID
@@ -97,7 +97,7 @@ class KnockRoomAliasServlet(RestServlet):
def on_PUT(
self, request: SynapseRequest, room_identifier: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_identifier, txn_id
+17 -11
View File
@@ -51,7 +51,7 @@ from synapse.http.servlet import (
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag
from synapse.logging.tracing import set_attribute
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.storage.state import StateFilter
@@ -152,7 +152,7 @@ class RoomCreateRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(request, self.on_POST, request)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
@@ -264,7 +264,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
if txn_id:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
content = parse_json_object_from_request(request)
@@ -299,7 +299,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
set_attribute("event_id", event_id)
ret = {"event_id": event_id}
return 200, ret
@@ -349,7 +349,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
set_attribute("event_id", event_id)
return 200, {"event_id": event_id}
def on_GET(
@@ -360,7 +360,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, event_type, txn_id
@@ -418,7 +418,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_identifier: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_identifier, txn_id
@@ -634,6 +634,12 @@ class RoomMessageListRestServlet(RestServlet):
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
logger.info(
"RoomMessageListRestServlet ========================================"
)
logger.info(
"==================================================================="
)
processing_start_time = self.clock.time_msec()
# Fire off and hope that we get a result by the end.
#
@@ -904,7 +910,7 @@ class RoomForgetRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, txn_id
@@ -1007,7 +1013,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, membership_action: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, membership_action, txn_id
@@ -1053,13 +1059,13 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
set_attribute("event_id", event_id)
return 200, {"event_id": event_id}
def on_PUT(
self, request: SynapseRequest, room_id: str, event_id: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, event_id, txn_id
+3
View File
@@ -153,6 +153,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
room_id=room_id,
initial_prev_event_ids=prev_event_ids_from_query,
initial_state_event_ids=state_event_ids,
app_service_requester=requester,
)
@@ -222,6 +223,8 @@ class RoomBatchSendEventRestServlet(RestServlet):
room_id=room_id,
batch_id_to_connect_to=batch_id_to_connect_to,
inherited_depth=inherited_depth,
# Connect the historical batch to the state chain
state_chain_event_id_to_connect_to=state_event_ids_at_start[-1],
initial_state_event_ids=state_event_ids,
app_service_requester=requester,
)
+3 -3
View File
@@ -19,7 +19,7 @@ from synapse.http import servlet
from synapse.http.server import HttpServer
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import set_tag
from synapse.logging.tracing import set_attribute
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict
@@ -46,8 +46,8 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
set_attribute("message_type", message_type)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
)
+1 -1
View File
@@ -37,7 +37,7 @@ from synapse.handlers.sync import (
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname
from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict, StreamToken
from synapse.util import json_decoder
+128 -2
View File
@@ -36,11 +36,13 @@ import attr
from frozendict import frozendict
from prometheus_client import Counter, Histogram
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.tracing import SynapseTags, log_kv, set_attribute, tag_args, trace
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
@@ -250,6 +252,8 @@ class StateHandler:
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
return await self.store.get_joined_hosts(room_id, state, entry)
@trace
@tag_args
async def compute_event_context(
self,
event: EventBase,
@@ -282,6 +286,14 @@ class StateHandler:
RuntimeError if `state_ids_before_event` is not provided and one or more
prev events are missing or outliers.
"""
set_attribute(
SynapseTags.RESULT_PREFIX + "event_type_and_state",
f"{event.type} - {getattr(event, 'state_key', None)}",
)
set_attribute(
SynapseTags.RESULT_PREFIX + "event_body",
event.content.get("body", None),
)
assert not event.internal_metadata.is_outlier()
@@ -289,6 +301,15 @@ class StateHandler:
# first of all, figure out the state before the event, unless we
# already have it.
#
logger.info(
"compute_event_context(event=%s, state_ids_before_event=%s) - %s (%s)",
event.event_id,
state_ids_before_event,
event.content.get("body", event.type),
getattr(event, "state_key", None),
# stack_info=True,
)
if state_ids_before_event:
# if we're given the state before the event, then we use that
state_group_before_event_prev_group = None
@@ -304,6 +325,12 @@ class StateHandler:
current_state_ids=state_ids_before_event,
)
)
log_kv(
{
"message": "Using state before event because `state_ids_before_event` was given:",
"state_group_before_event": state_group_before_event,
}
)
# the partial_state flag must be provided
assert partial_state is not None
@@ -324,7 +351,7 @@ class StateHandler:
)
partial_state = any(incomplete_prev_events.values())
if partial_state:
logger.debug(
logger.info(
"New/incoming event %s refers to prev_events %s with partial state",
event.event_id,
[k for (k, v) in incomplete_prev_events.items() if v],
@@ -343,6 +370,24 @@ class StateHandler:
deltas_to_state_group_before_event = entry.delta_ids
state_ids_before_event = None
logger.info(
"compute_event_context(event=%s) resolve_state_groups_for_events entry.state_group=%s state_group_before_event_prev_group=%s deltas_to_state_group_before_event=%s - %s (%s)",
event.event_id,
entry.state_group,
state_group_before_event_prev_group,
deltas_to_state_group_before_event,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
log_kv(
{
"message": "resolve_state_groups_for_events",
"entry.state_group": entry.state_group,
"state_group_before_event_prev_group": state_group_before_event_prev_group,
"deltas_to_state_group_before_event": deltas_to_state_group_before_event,
}
)
# We make sure that we have a state group assigned to the state.
if entry.state_group is None:
# store_state_group requires us to have either a previous state group
@@ -352,6 +397,12 @@ class StateHandler:
state_ids_before_event = await entry.get_state(
self._state_storage_controller, StateFilter.all()
)
log_kv(
{
"message": "state_group_before_event_prev_group was None so get state_ids_before_event",
"state_ids_before_event": state_ids_before_event,
}
)
state_group_before_event = (
await self._state_storage_controller.store_state_group(
@@ -363,15 +414,27 @@ class StateHandler:
)
)
entry.set_state_group(state_group_before_event)
log_kv(
{
"message": "entry.set_state_group(state_group_before_event)",
"state_group_before_event": state_group_before_event,
}
)
else:
state_group_before_event = entry.state_group
log_kv(
{
"message": "Entry already has a state_group",
"state_group_before_event": state_group_before_event,
}
)
#
# now if it's not a state event, we're done
#
if not event.is_state():
return EventContext.with_state(
event_context = EventContext.with_state(
storage=self._storage_controllers,
state_group_before_event=state_group_before_event,
state_group=state_group_before_event,
@@ -381,6 +444,29 @@ class StateHandler:
partial_state=partial_state,
)
state_for_auth_id_map = await event_context.get_prev_state_ids(
StateFilter.from_types(
event_auth.auth_types_for_event(event.room_version, event)
)
)
log_kv(
{
"message": "Done creating context for non-state event",
"state_for_auth_id_map from event_context": str(
state_for_auth_id_map
),
}
)
logger.info(
"compute_event_context(event=%s) Done creating context=%s for non-state event - %s (%s)",
event.event_id,
event_context,
event.content.get("body", event.type),
getattr(event, "state_key", None),
)
return event_context
#
# otherwise, we'll need to create a new state group for after the event
#
@@ -421,6 +507,7 @@ class StateHandler:
)
@measure_func()
@trace
async def resolve_state_groups_for_events(
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
) -> _StateCacheEntry:
@@ -448,6 +535,13 @@ class StateHandler:
state_group_ids = state_groups.values()
logger.info(
"resolve_state_groups_for_events: state_group_ids=%s state_groups=%s",
state_group_ids,
state_groups,
)
log_kv({"state_group_ids": state_group_ids, "state_groups": state_groups})
# check if each event has same state group id, if so there's no state to resolve
state_group_ids_set = set(state_group_ids)
if len(state_group_ids_set) == 1:
@@ -458,6 +552,18 @@ class StateHandler:
) = await self._state_storage_controller.get_state_group_delta(
state_group_id
)
logger.info(
"resolve_state_groups_for_events: Returning state_group_id=%s prev_group=%s",
state_group_id,
prev_group,
)
log_kv(
{
"message": "Returning state_group_id",
"state_group_id": state_group_id,
"prev_group": prev_group,
}
)
return _StateCacheEntry(
state=None,
state_group=state_group_id,
@@ -465,6 +571,14 @@ class StateHandler:
delta_ids=delta_ids,
)
elif len(state_group_ids_set) == 0:
logger.info(
"resolve_state_groups_for_events: Returning empty state group since there are no state_group_ids"
)
log_kv(
{
"message": "Returning empty state group since there are no state_group_ids",
}
)
return _StateCacheEntry(state={}, state_group=None)
room_version = await self.store.get_room_version_id(room_id)
@@ -480,6 +594,18 @@ class StateHandler:
None,
state_res_store=StateResolutionStore(self.store),
)
logger.info(
"resolve_state_groups_for_events: RResolving state groups and returning result state_to_resolve=%s result=%s",
state_to_resolve,
result,
)
log_kv(
{
"message": "Resolving state groups and returning result",
"state_to_resolve": state_to_resolve,
"result": result,
}
)
return result
async def update_current_state(self, room_id: str) -> None:
+62 -26
View File
@@ -43,14 +43,15 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
from synapse.logging.tracing import (
Link,
SynapseTags,
active_span,
set_tag,
start_active_span_follows_from,
get_active_span,
set_attribute,
start_active_span,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -124,7 +125,7 @@ times_pruned_extremities = Counter(
class _PersistEventsTask:
"""A batch of events to persist."""
name: ClassVar[str] = "persist_event_batch" # used for opentracing
name: ClassVar[str] = "persist_event_batch" # used for tracing
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
@@ -145,7 +146,7 @@ class _PersistEventsTask:
class _UpdateCurrentStateTask:
"""A room whose current state needs recalculating."""
name: ClassVar[str] = "update_current_state" # used for opentracing
name: ClassVar[str] = "update_current_state" # used for tracing
def try_merge(self, task: "_EventPersistQueueTask") -> bool:
"""Deduplicates consecutive recalculations of current state."""
@@ -160,11 +161,11 @@ class _EventPersistQueueItem:
task: _EventPersistQueueTask
deferred: ObservableDeferred
parent_opentracing_span_contexts: List = attr.ib(factory=list)
"""A list of opentracing spans waiting for this batch"""
parent_tracing_span_contexts: List = attr.ib(factory=list)
"""A list of tracing spans waiting for this batch"""
opentracing_span_context: Any = None
"""The opentracing span under which the persistence actually happened"""
tracing_span_context: Any = None
"""The tracing span under which the persistence actually happened"""
_PersistResult = TypeVar("_PersistResult")
@@ -228,10 +229,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
)
queue.append(end_item)
# also add our active opentracing span to the item so that we get a link back
span = active_span()
# also add our active tracing span to the item so that we get a link back
span = get_active_span()
if span:
end_item.parent_opentracing_span_contexts.append(span.context)
end_item.parent_tracing_span_contexts.append(span.get_span_context())
# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
@@ -239,9 +240,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
# wait for the queue item to complete
res = await make_deferred_yieldable(end_item.deferred.observe())
# add another opentracing span which links to the persist trace.
with start_active_span_follows_from(
f"{task.name}_complete", (end_item.opentracing_span_context,)
# add another tracing span which links to the persist trace.
with start_active_span(
f"{task.name}_complete",
links=[Link(end_item.tracing_span_context)],
):
pass
@@ -272,13 +274,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
with start_active_span_follows_from(
with start_active_span(
item.task.name,
item.parent_opentracing_span_contexts,
inherit_force_tracing=True,
) as scope:
if scope:
item.opentracing_span_context = scope.span.context
links=[
Link(span_context)
for span_context in item.parent_tracing_span_contexts
],
) as span:
if span:
item.tracing_span_context = span.get_span_context()
ret = await self._per_item_callback(room_id, item.task)
except Exception:
@@ -392,15 +396,15 @@ class EventsPersistenceStorageController:
partitioned.setdefault(event.room_id, []).append((event, ctx))
event_ids.append(event.event_id)
set_tag(
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids",
str(event_ids),
)
set_tag(
set_attribute(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
@@ -431,6 +435,22 @@ class EventsPersistenceStorageController:
else:
events.append(event)
# We expect events to be persisted by this point and this makes
# mypy happy about `stream_ordering` not being optional below
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)
return (
events,
self.main_store.get_room_max_token(),
@@ -463,6 +483,22 @@ class EventsPersistenceStorageController:
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
else:
# We expect events to be persisted by this point and this makes
# mypy happy about `stream_ordering` not being optional below
assert event.internal_metadata.stream_ordering
# Invalidate related caches after we persist a new event
relation = relation_from_event(event)
self.main_store._invalidate_caches_for_event(
stream_ordering=event.internal_metadata.stream_ordering,
event_id=event.event_id,
room_id=event.room_id,
etype=event.type,
state_key=event.state_key if hasattr(event, "state_key") else None,
redacts=event.redacts,
relates_to=relation.parent_id if relation else None,
backfilled=backfilled,
)
event_stream_id = event.internal_metadata.stream_ordering
# stream ordering should have been assigned by now
+1 -1
View File
@@ -29,7 +29,7 @@ from typing import (
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.logging.tracing import tag_args, trace
from synapse.storage.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.storage.util.partial_state_events_tracker import (
+16 -20
View File
@@ -47,7 +47,7 @@ from twisted.internet.interfaces import IReactorCore
from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
from synapse.logging import opentracing
from synapse.logging import tracing
from synapse.logging.context import (
LoggingContext,
current_context,
@@ -432,11 +432,11 @@ class LoggingTransaction:
start = time.time()
try:
with opentracing.start_active_span(
with tracing.start_active_span(
"db.query",
tags={
opentracing.tags.DATABASE_TYPE: "sql",
opentracing.tags.DATABASE_STATEMENT: one_line_sql,
attributes={
tracing.SpanAttributes.DB_SYSTEM: "sql",
tracing.SpanAttributes.DB_STATEMENT: one_line_sql,
},
):
return func(sql, *args, **kwargs)
@@ -710,15 +710,15 @@ class DatabasePool:
exception_callbacks=exception_callbacks,
)
try:
with opentracing.start_active_span(
with tracing.start_active_span(
"db.txn",
tags={
opentracing.SynapseTags.DB_TXN_DESC: desc,
opentracing.SynapseTags.DB_TXN_ID: name,
attributes={
tracing.SynapseTags.DB_TXN_DESC: desc,
tracing.SynapseTags.DB_TXN_ID: name,
},
):
r = func(cursor, *args, **kwargs)
opentracing.log_kv({"message": "commit"})
tracing.log_kv({"message": "commit"})
conn.commit()
return r
except self.engine.module.OperationalError as e:
@@ -734,7 +734,7 @@ class DatabasePool:
if i < N:
i += 1
try:
with opentracing.start_active_span("db.rollback"):
with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1)
@@ -748,7 +748,7 @@ class DatabasePool:
if i < N:
i += 1
try:
with opentracing.start_active_span("db.rollback"):
with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning(
@@ -854,7 +854,7 @@ class DatabasePool:
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
with opentracing.start_active_span(f"db.{desc}"):
with tracing.start_active_span(f"db.{desc}"):
result = await self.runWithConnection(
self.new_transaction,
desc,
@@ -937,9 +937,7 @@ class DatabasePool:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
with opentracing.start_active_span(
operation_name="db.connection",
):
with tracing.start_active_span("db.connection"):
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
@@ -953,15 +951,13 @@ class DatabasePool:
"Reconnecting database connection over transaction limit"
)
conn.reconnect()
opentracing.log_kv(
{"message": "reconnected due to txn limit"}
)
tracing.log_kv({"message": "reconnected due to txn limit"})
self._txn_counters[tid] = 1
if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
opentracing.log_kv({"message": "reconnected"})
tracing.log_kv({"message": "reconnected"})
if self._txn_limit > 0:
self._txn_counters[tid] = 1
+1
View File
@@ -223,6 +223,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))
self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
@@ -27,7 +27,7 @@ from typing import (
)
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -436,7 +436,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(user_id, device_id), None
)
set_tag("last_deleted_stream_id", str(last_deleted_stream_id))
set_attribute("last_deleted_stream_id", str(last_deleted_stream_id))
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -485,10 +485,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
A list of messages for the device and where in the stream the messages got to.
"""
set_tag("destination", destination)
set_tag("last_stream_id", last_stream_id)
set_tag("current_stream_id", current_stream_id)
set_tag("limit", limit)
set_attribute("destination", destination)
set_attribute("last_stream_id", last_stream_id)
set_attribute("current_stream_id", current_stream_id)
set_attribute("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
+21 -21
View File
@@ -30,11 +30,11 @@ from typing import (
from typing_extensions import Literal
from synapse.api.constants import EduTypes
from synapse.api.constants import EduTypes, EventContentFields
from synapse.api.errors import Codes, StoreError
from synapse.logging.opentracing import (
from synapse.logging.tracing import (
get_active_span_text_map,
set_tag,
set_attribute,
trace,
whitelisted_homeserver,
)
@@ -334,12 +334,12 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
# maps (user_id, device_id) -> (stream_id, opentracing_context)
# maps (user_id, device_id) -> (stream_id,tracing_context)
#
# opentracing_context contains the opentracing metadata for the request
# tracing_context contains the opentelemetry metadata for the request
# that created the poke
#
# The most recent request's opentracing_context is used as the
# The most recent request's tracing_context is used as the
# context which created the Edu.
# This is the stream ID that we will return for the consumer to resume
@@ -402,8 +402,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
if update_stream_id > previous_update_stream_id:
# FIXME If this overwrites an older update, this discards the
# previous OpenTracing context.
# It might make it harder to track down issues using OpenTracing.
# previous tracing context.
# It might make it harder to track down issues using tracing.
# If there's a good reason why it doesn't matter, a comment here
# about that would not hurt.
query_map[key] = (update_stream_id, update_context)
@@ -469,11 +469,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
- user_id
- device_id
- stream_id
- opentracing_context
- tracing_context
"""
# get the list of device updates that need to be sent
sql = """
SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
SELECT user_id, device_id, stream_id, tracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
@@ -494,7 +494,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
destination: The host the device updates are intended for
from_stream_id: The minimum stream_id to filter updates by, exclusive
query_map: Dictionary mapping (user_id, device_id) to
(update stream_id, the relevant json-encoded opentracing context)
(update stream_id, the relevant json-encoded tracing context)
Returns:
List of objects representing a device update EDU.
@@ -532,13 +532,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
for device_id in device_ids:
device = user_devices[device_id]
stream_id, opentracing_context = query_map[(user_id, device_id)]
stream_id, tracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
"org.matrix.opentracing_context": opentracing_context,
EventContentFields.TRACING_CONTEXT: tracing_context,
}
prev_id = stream_id
@@ -708,8 +708,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
else:
results[user_id] = await self.get_cached_devices_for_user(user_id)
set_tag("in_cache", str(results))
set_tag("not_in_cache", str(user_ids_not_in_cache))
set_attribute("in_cache", str(results))
set_attribute("not_in_cache", str(user_ids_not_in_cache))
return user_ids_not_in_cache, results
@@ -1805,7 +1805,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"device_id",
"sent",
"ts",
"opentracing_context",
"tracing_context",
),
values=[
(
@@ -1850,7 +1850,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"room_id",
"stream_id",
"converted_to_destinations",
"opentracing_context",
"tracing_context",
),
values=[
(
@@ -1874,11 +1874,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
written to `device_lists_outbound_pokes`.
Returns:
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
A list of user ID, device ID, room ID, stream ID and optional opentelemetry context.
"""
sql = """
SELECT user_id, device_id, room_id, stream_id, opentracing_context
SELECT user_id, device_id, room_id, stream_id, tracing_context
FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations
ORDER BY stream_id
@@ -1896,9 +1896,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
device_id,
room_id,
stream_id,
db_to_json(opentracing_context),
db_to_json(tracing_context),
)
for user_id, device_id, room_id, stream_id, opentracing_context in txn
for user_id, device_id, room_id, stream_id, tracing_context in txn
]
return await self.db_pool.runInteraction(
@@ -18,7 +18,7 @@ from typing import Dict, Iterable, Mapping, Optional, Tuple, cast
from typing_extensions import Literal, TypedDict
from synapse.api.errors import StoreError
from synapse.logging.opentracing import log_kv, trace
from synapse.logging.tracing import log_kv, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict, JsonSerializable, StreamKeyType
@@ -36,7 +36,7 @@ from synapse.appservice import (
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
@@ -148,7 +148,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
key data. The key data will be a dict in the same format as the
DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
"""
set_tag("query_list", str(query_list))
set_attribute("query_list", str(query_list))
if not query_list:
return {}
@@ -231,8 +231,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
Dict mapping from user-id to dict mapping from device_id to
key data.
"""
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)
set_attribute("include_all_devices", include_all_devices)
set_attribute("include_deleted_devices", include_deleted_devices)
result = await self.db_pool.runInteraction(
"get_e2e_device_keys",
@@ -419,9 +419,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"""
def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("new_keys", str(new_keys))
set_attribute("user_id", user_id)
set_attribute("device_id", device_id)
set_attribute("new_keys", str(new_keys))
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -1161,10 +1161,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool:
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", str(device_keys))
set_attribute("user_id", user_id)
set_attribute("device_id", device_id)
set_attribute("time_now", time_now)
set_attribute("device_keys", str(device_keys))
old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
@@ -34,7 +34,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.logging.opentracing import tag_args, trace
from synapse.logging.tracing import tag_args, trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
@@ -726,22 +726,43 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
async def get_backfill_points_in_room(
self,
room_id: str,
) -> List[Tuple[str, int]]:
current_depth: int,
) -> List[Tuple[str, int, int]]:
"""
Gets the oldest events(backwards extremities) in the room along with the
approximate depth. Sorted by depth, highest to lowest (descending).
approximate depth. Sorted by depth, highest to lowest (descending) so the closest
events to the `current_depth` are first in the list.
We use this function so that we can compare and see if a client's
`current_depth` at their current scrollback is within pagination range
of the event extremities. If the `current_depth` is close to the depth
of given oldest event, we can trigger a backfill.
We ignore extremities that have a greater depth than our `current_depth`
as:
1. we don't really care about getting events that have happened
after our current position; and
2. by the nature of paginating and scrolling back, we have likely
previously tried and failed to backfill from that extremity, so
to avoid getting "stuck" requesting the same backfill repeatedly
we drop those extremities.
Args:
room_id: Room where we want to find the oldest events
current_depth: The depth at the users current scrollback position
because we only care about finding events older than the given
`current_depth` when scrolling and paginating backwards.
Returns:
List of (event_id, depth) tuples. Sorted by depth, highest to lowest
(descending)
List of (event_id, depth, stream_ordering) tuples. Sorted by depth,
highest to lowest (descending) so the closest events to the
`current_depth` are first in the list. Tie-broken with `stream_ordering`,
then `event_id` to get a stable sort.
"""
def get_backfill_points_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
) -> List[Tuple[str, int, int]]:
# Assemble a tuple lookup of event_id -> depth for the oldest events
# we know of in the room. Backwards extremeties are the oldest
# events we know of in the room but we only know of them because
@@ -750,7 +771,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# specifically). So we need to look for the approximate depth from
# the events connected to the current backwards extremeties.
sql = """
SELECT backward_extrem.event_id, event.depth FROM events AS event
SELECT backward_extrem.event_id, event.depth, event.stream_ordering FROM events AS event
/**
* Get the edge connections from the event_edges table
* so we can see whether this event's prev_events points
@@ -784,6 +805,17 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
* necessarily safe to assume that it will have been completed.
*/
AND edge.is_state is ? /* False */
/**
* We only want backwards extremities that are older than or at
* the same position of the given `current_depth` (where older
* means less than the given depth) because we're looking backwards
* from the `current_depth` when backfilling.
*
* current_depth (ignore events that come after this, ignore 2-4)
* |
* <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
*/
AND event.depth <= ? /* current_depth */
/**
* Exponential back-off (up to the upper bound) so we don't retry the
* same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
@@ -802,7 +834,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
* alphabetical order of the event_ids so we get a consistent
* ordering which is nice when asserting things in tests.
*/
ORDER BY event.depth DESC, backward_extrem.event_id DESC
ORDER BY event.depth DESC, event.stream_ordering DESC, backward_extrem.event_id DESC
"""
if isinstance(self.database_engine, PostgresEngine):
@@ -817,13 +849,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
(
room_id,
False,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
),
)
return cast(List[Tuple[str, int]], txn.fetchall())
return cast(List[Tuple[str, int, int]], txn.fetchall())
return await self.db_pool.runInteraction(
"get_backfill_points_in_room",
@@ -835,26 +868,47 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
async def get_insertion_event_backward_extremities_in_room(
self,
room_id: str,
) -> List[Tuple[str, int]]:
current_depth: int,
) -> List[Tuple[str, int, int]]:
"""
Get the insertion events we know about that we haven't backfilled yet
along with the approximate depth. Sorted by depth, highest to lowest
(descending).
(descending) so the closest events to the `current_depth` are first
in the list.
We use this function so that we can compare and see if someones
`current_depth` at their current scrollback is within pagination range
of the insertion event. If the `current_depth` is close to the depth
of the given insertion event, we can trigger a backfill.
We ignore insertion events that have a greater depth than our `current_depth`
as:
1. we don't really care about getting events that have happened
after our current position; and
2. by the nature of paginating and scrolling back, we have likely
previously tried and failed to backfill from that insertion event, so
to avoid getting "stuck" requesting the same backfill repeatedly
we drop those insertion event.
Args:
room_id: Room where we want to find the oldest events
current_depth: The depth at the users current scrollback position because
we only care about finding events older than the given
`current_depth` when scrolling and paginating backwards.
Returns:
List of (event_id, depth) tuples. Sorted by depth, highest to lowest
(descending)
List of (event_id, depth, stream_ordering) tuples. Sorted by depth,
highest to lowest (descending) so the closest events to the
`current_depth` are first in the list. Tie-broken with `stream_ordering`,
then `event_id` to get a stable sort.
"""
def get_insertion_event_backward_extremities_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
) -> List[Tuple[str, int, int]]:
sql = """
SELECT
insertion_event_extremity.event_id, event.depth
insertion_event_extremity.event_id, event.depth, event.stream_ordering
/* We only want insertion events that are also marked as backwards extremities */
FROM insertion_event_extremities AS insertion_event_extremity
/* Get the depth of the insertion event from the events table */
@@ -869,6 +923,17 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
WHERE
insertion_event_extremity.room_id = ?
/**
* We only want extremities that are older than or at
* the same position of the given `current_depth` (where older
* means less than the given depth) because we're looking backwards
* from the `current_depth` when backfilling.
*
* current_depth (ignore events that come after this, ignore 2-4)
* |
* <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
*/
AND event.depth <= ? /* current_depth */
/**
* Exponential back-off (up to the upper bound) so we don't retry the
* same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
@@ -887,7 +952,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
* alphabetical order of the event_ids so we get a consistent
* ordering which is nice when asserting things in tests.
*/
ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
ORDER BY event.depth DESC, event.stream_ordering DESC, insertion_event_extremity.event_id DESC
"""
if isinstance(self.database_engine, PostgresEngine):
@@ -901,12 +966,13 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
sql % (least_function,),
(
room_id,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
),
)
return cast(List[Tuple[str, int]], txn.fetchall())
return cast(List[Tuple[str, int, int]], txn.fetchall())
return await self.db_pool.runInteraction(
"get_insertion_event_backward_extremities_in_room",
+4 -2
View File
@@ -40,7 +40,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -209,7 +209,9 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
# foo
if event.internal_metadata.stream_ordering is None:
event.internal_metadata.stream_ordering = stream
await self.db_pool.runInteraction(
"persist_events",
+23 -19
View File
@@ -54,7 +54,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.logging.opentracing import start_active_span, tag_args, trace
from synapse.logging.tracing import start_active_span, tag_args, trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -1474,32 +1474,38 @@ class EventsWorkerStore(SQLBaseStore):
# the batches as big as possible.
results: Set[str] = set()
for chunk in batch_iter(event_ids, 500):
r = await self._have_seen_events_dict(
[(room_id, event_id) for event_id in chunk]
for event_ids_chunk in batch_iter(event_ids, 500):
events_seen_dict = await self._have_seen_events_dict(
room_id, event_ids_chunk
)
results.update(
eid for (eid, have_event) in events_seen_dict.items() if have_event
)
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
return results
@cachedList(cached_method_name="have_seen_event", list_name="keys")
@cachedList(cached_method_name="have_seen_event", list_name="event_ids")
async def _have_seen_events_dict(
self, keys: Collection[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
self,
room_id: str,
event_ids: Collection[str],
) -> Dict[str, bool]:
"""Helper for have_seen_events
Returns:
a dict {(room_id, event_id)-> bool}
a dict {event_id -> bool}
"""
# if the event cache contains the event, obviously we've seen it.
cache_results = {
(rid, eid)
for (rid, eid) in keys
if await self._get_event_cache.contains((eid,))
event_id
for event_id in event_ids
if await self._get_event_cache.contains((event_id,))
}
results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
remaining = [
event_id for event_id in event_ids if event_id not in cache_results
]
if not remaining:
return results
@@ -1511,23 +1517,21 @@ class EventsWorkerStore(SQLBaseStore):
sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
txn.database_engine, "e.event_id", remaining
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}
# ... and then we can update the results for each key
results.update(
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
)
results.update({eid: (eid in found_events) for eid in remaining})
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results
@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
res = await self._have_seen_events_dict(((room_id, event_id),))
return res[(room_id, event_id)]
res = await self._have_seen_events_dict(room_id, [event_id])
return res[event_id]
def _get_current_state_event_counts_txn(
self, txn: LoggingTransaction, room_id: str
@@ -367,6 +367,10 @@ class RelationsWorkerStore(SQLBaseStore):
def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
raise NotImplementedError()
# TODO: What's the proper way to fix this so we can stack @trace on top of
# @cachedList
#
# @trace
@cachedList(cached_method_name="get_applicable_edit", list_name="event_ids")
async def get_applicable_edits(
self, event_ids: Collection[str]
+1 -1
View File
@@ -23,7 +23,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
+1 -1
View File
@@ -58,7 +58,7 @@ from twisted.internet import defer
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
+2
View File
@@ -85,6 +85,8 @@ Changes in SCHEMA_VERSION = 73;
events over federation.
- Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`,
`batch_events`) to make it easy to delete all associated rows when purging a room.
- Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
from `opentracing_context` to generalized `tracing_context`.
"""
@@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Rename to generalized `tracing_context` since we're moving from opentracing to opentelemetry
ALTER TABLE device_lists_outbound_pokes RENAME COLUMN opentracing_context TO tracing_context;
ALTER TABLE device_lists_changes_in_room RENAME COLUMN opentracing_context TO tracing_context;
@@ -20,7 +20,7 @@ from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import trace_with_opname
from synapse.logging.tracing import trace_with_opname
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.util import unwrapFirstError
+1 -1
View File
@@ -21,7 +21,7 @@ from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.receipts import ReceiptEventSource
from synapse.handlers.room import RoomEventSource
from synapse.handlers.typing import TypingNotificationEventSource
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.streams import EventSource
from synapse.types import StreamToken
+6
View File
@@ -431,6 +431,12 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
cache: DeferredCache[CacheKey, Any] = cached_method.cache
num_args = cached_method.num_args
if num_args != self.num_args:
raise Exception(
"Number of args (%s) does not match underlying cache_method_name=%s (%s)."
% (self.num_args, self.cached_method_name, num_args)
)
@functools.wraps(self.orig)
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
# If we're passed a cache_context then we'll want to call its
+13 -17
View File
@@ -29,11 +29,7 @@ import attr
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import (
active_span,
start_active_span,
start_active_span_follows_from,
)
from synapse.logging.tracing import Link, get_active_span, start_active_span
from synapse.util import Clock
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import register_cache
@@ -41,7 +37,7 @@ from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
import opentracing
import opentelemetry
# the type of the key in the cache
KV = TypeVar("KV")
@@ -82,8 +78,8 @@ class ResponseCacheEntry:
easier to cache Failure results.
"""
opentracing_span_context: "Optional[opentracing.SpanContext]"
"""The opentracing span which generated/is generating the result"""
tracing_span_context: Optional["opentelemetry.trace.SpanContext"]
"""The tracing span which generated/is generating the result"""
class ResponseCache(Generic[KV]):
@@ -141,7 +137,7 @@ class ResponseCache(Generic[KV]):
self,
context: ResponseCacheContext[KV],
deferred: "defer.Deferred[RV]",
opentracing_span_context: "Optional[opentracing.SpanContext]",
tracing_span_context: Optional["opentelemetry.trace.SpanContext"],
) -> ResponseCacheEntry:
"""Set the entry for the given key to the given deferred.
@@ -152,14 +148,14 @@ class ResponseCache(Generic[KV]):
Args:
context: Information about the cache miss
deferred: The deferred which resolves to the result.
opentracing_span_context: An opentracing span wrapping the calculation
tracing_span_context: An tracing span wrapping the calculation
Returns:
The cache entry object.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
key = context.cache_key
entry = ResponseCacheEntry(result, opentracing_span_context)
entry = ResponseCacheEntry(result, tracing_span_context)
self._result_cache[key] = entry
def on_complete(r: RV) -> RV:
@@ -234,15 +230,15 @@ class ResponseCache(Generic[KV]):
if cache_context:
kwargs["cache_context"] = context
span_context: Optional[opentracing.SpanContext] = None
span_context: Optional["opentelemetry.trace.SpanContext"] = None
async def cb() -> RV:
# NB it is important that we do not `await` before setting span_context!
nonlocal span_context
with start_active_span(f"ResponseCache[{self._name}].calculate"):
span = active_span()
span = get_active_span()
if span:
span_context = span.context
span_context = span.get_span_context()
return await callback(*args, **kwargs)
d = run_in_background(cb)
@@ -257,9 +253,9 @@ class ResponseCache(Generic[KV]):
"[%s]: using incomplete cached result for [%s]", self._name, key
)
span_context = entry.opentracing_span_context
with start_active_span_follows_from(
span_context = entry.tracing_span_context
with start_active_span(
f"ResponseCache[{self._name}].wait",
contexts=(span_context,) if span_context else (),
links=[Link(span_context)] if span_context else None,
):
return await make_deferred_yieldable(result)
+1 -1
View File
@@ -42,7 +42,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.opentracing import start_active_span
from synapse.logging.tracing import start_active_span
from synapse.metrics import Histogram, LaterGauge
from synapse.util import Clock
+1 -1
View File
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.opentracing import trace
from synapse.logging.tracing import trace
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
from synapse.storage.state import StateFilter
+479 -4
View File
@@ -11,16 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import json
from typing import Dict, List, Optional, Tuple
from unittest import mock
from unittest.mock import Mock, patch
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import AuthError
from synapse.api.room_versions import RoomVersion
from synapse.appservice import ApplicationService
from synapse.event_auth import (
check_state_dependent_auth_rules,
check_state_independent_auth_rules,
)
from synapse.events import make_event_from_dict
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.federation.transport.client import StateRequestResponse
from synapse.logging.context import LoggingContext
@@ -28,9 +32,15 @@ from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.state.v2 import _mainline_sort, _reverse_topological_power_sort
from synapse.types import JsonDict
from synapse.util.stringutils import random_string
from tests import unittest
from tests.test_utils import event_injection, make_awaitable
from tests.test_utils.event_injection import create_event, inject_event
import logging
logger = logging.getLogger(__name__)
class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
@@ -45,10 +55,24 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
self.mock_federation_transport_client = mock.Mock(
spec=["get_room_state_ids", "get_room_state", "get_event"]
)
return super().setup_test_homeserver(
federation_transport_client=self.mock_federation_transport_client
self.appservice = ApplicationService(
token="i_am_an_app_service",
id="1234",
namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]},
# Note: this user does not have to match the regex above
sender="@as_main:test",
)
mock_load_appservices = Mock(return_value=[self.appservice])
with patch(
"synapse.storage.databases.main.appservice.load_appservices",
mock_load_appservices,
):
return super().setup_test_homeserver(
federation_transport_client=self.mock_federation_transport_client
)
def test_process_pulled_event_with_missing_state(self) -> None:
"""Ensure that we correctly handle pulled events with lots of missing state
@@ -848,3 +872,454 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
bert_member_event.event_id,
"Rejected kick event unexpectedly became part of room state.",
)
def test_process_pulled_events_asdf(self) -> None:
main_store = self.hs.get_datastores().main
state_storage_controller = self.hs.get_storage_controllers().state
def _debug_event_string(event: EventBase) -> str:
debug_body = event.content.get("body", event.type)
maybe_state_key = getattr(event, "state_key", None)
return f"event_id={event.event_id},depth={event.depth},stream_ordering={event.internal_metadata.stream_ordering},body={debug_body}({maybe_state_key}),prevs={event.prev_event_ids()}"
known_event_dict: Dict[str, Tuple[EventBase, List[EventBase]]] = {}
def _add_to_known_event_list(
event: EventBase, state_events: Optional[List[EventBase]] = None
) -> None:
if state_events is None:
state_map = self.get_success(
state_storage_controller.get_state_for_event(event.event_id)
)
state_events = list(state_map.values())
known_event_dict[event.event_id] = (event, state_events)
async def get_room_state_ids(
destination: str, room_id: str, event_id: str
) -> JsonDict:
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state_ids: Event ({event_id}) not part of our known events list"
)
known_event, known_event_state_list = known_event_info
logger.info(
"stubbed get_room_state_ids: destination=%s event_id=%s auth_event_ids=%s",
destination,
event_id,
known_event.auth_event_ids(),
)
# self.assertEqual(event_id, missing_event.event_id)
return {
"pdu_ids": [
state_event.event_id for state_event in known_event_state_list
],
"auth_chain_ids": known_event.auth_event_ids(),
}
async def get_room_state(
room_version: RoomVersion, destination: str, room_id: str, event_id: str
) -> StateRequestResponse:
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state: Event ({event_id}) not part of our known events list"
)
known_event, known_event_state_list = known_event_info
logger.info(
"stubbed get_room_state: destination=%s event_id=%s auth_event_ids=%s",
destination,
event_id,
known_event.auth_event_ids(),
)
auth_event_ids = known_event.auth_event_ids()
auth_events = []
for auth_event_id in auth_event_ids:
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state: Auth event ({auth_event_id}) is not part of our known events list"
)
known_auth_event, _ = known_event_info
auth_events.append(known_auth_event)
return StateRequestResponse(
state=known_event_state_list,
auth_events=auth_events,
)
async def get_event(destination: str, event_id: str, timeout=None):
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_event: Event ({event_id}) not part of our known events list"
)
known_event, _ = known_event_info
return {"pdus": [known_event.get_pdu_json()]}
self.mock_federation_transport_client.get_room_state_ids.side_effect = (
get_room_state_ids
)
self.mock_federation_transport_client.get_room_state.side_effect = (
get_room_state
)
self.mock_federation_transport_client.get_event.side_effect = get_event
# create the room
room_creator = self.appservice.sender
room_id = self.helper.create_room_as(
room_creator=self.appservice.sender, tok=self.appservice.token
)
room_version = self.get_success(main_store.get_room_version(room_id))
event_before = self.get_success(
inject_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.Message,
content={"body": "eventBefore0", "msgtype": "m.text"},
)
)
_add_to_known_event_list(event_before)
event_after = self.get_success(
inject_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.Message,
content={"body": "eventAfter0", "msgtype": "m.text"},
)
)
_add_to_known_event_list(event_after)
state_map = self.get_success(
state_storage_controller.get_state_for_event(event_before.event_id)
)
room_create_event = state_map.get((EventTypes.Create, ""))
pl_event = state_map.get((EventTypes.PowerLevels, ""))
as_membership_event = state_map.get((EventTypes.Member, room_creator))
assert room_create_event is not None
assert pl_event is not None
assert as_membership_event is not None
for state_event in state_map.values():
_add_to_known_event_list(state_event)
# This should be the successor of the event we want to insert next to
# (the successor of event_before is event_after).
inherited_depth = event_after.depth
historical_base_auth_event_ids = [
room_create_event.event_id,
pl_event.event_id,
]
historical_state_events = list(state_map.values())
historical_state_event_ids = [
state_event.event_id for state_event in historical_state_events
]
maria_mxid = "@maria:test"
maria_membership_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=maria_mxid,
state_key=maria_mxid,
type=EventTypes.Member,
content={
"membership": "join",
},
# It all works when I add a prev_event for the floating
# insertion event but the event no longer floats.
# It's able to resolve state at the prev_events though.
prev_event_ids=[event_before.event_id],
# allow_no_prev_events=True,
# prev_event_ids=[],
# auth_event_ids=historical_base_auth_event_ids,
#
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events. For member events, we do it this way.
state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(maria_membership_event, historical_state_events)
logger.info("maria_membership_event=%s", maria_membership_event.event_id)
historical_state_events.append(maria_membership_event)
historical_state_event_ids.append(maria_membership_event.event_id)
batch_id = random_string(8)
next_batch_id = random_string(8)
insertion_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_INSERTION,
content={
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
# The difference from the actual room /batch_send is that this is normally
# floating as well. But seems to work once we connect it to the
# floating historical state chain.
prev_event_ids=[maria_membership_event.event_id],
# allow_no_prev_events=True,
# prev_event_ids=[],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
# state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(insertion_event, historical_state_events)
historical_message_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=maria_mxid,
type=EventTypes.Message,
content={"body": "Historical message", "msgtype": "m.text"},
prev_event_ids=[insertion_event.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
maria_membership_event.event_id,
],
depth=inherited_depth,
)
)
_add_to_known_event_list(historical_message_event, historical_state_events)
batch_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_BATCH,
content={
EventContentFields.MSC2716_BATCH_ID: batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
prev_event_ids=[historical_message_event.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
depth=inherited_depth,
)
)
_add_to_known_event_list(batch_event, historical_state_events)
base_insertion_event, base_insertion_event_context = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_INSERTION,
content={
EventContentFields.MSC2716_NEXT_BATCH_ID: batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
prev_event_ids=[event_before.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
# state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(base_insertion_event, historical_state_events)
# Chronological
pulled_events: List[EventBase] = [
# Beginning of room (oldest messages)
# *list(state_map.values()),
room_create_event,
pl_event,
as_membership_event,
state_map.get((EventTypes.JoinRules, "")),
state_map.get((EventTypes.RoomHistoryVisibility, "")),
event_before,
# HISTORICAL MESSAGE END
insertion_event,
historical_message_event,
batch_event,
base_insertion_event,
# HISTORICAL MESSAGE START
event_after,
# Latest in the room (newest messages)
]
# pulled_events: List[EventBase] = [
# # Beginning of room (oldest messages)
# # *list(state_map.values()),
# room_create_event,
# pl_event,
# as_membership_event,
# state_map.get((EventTypes.JoinRules, "")),
# state_map.get((EventTypes.RoomHistoryVisibility, "")),
# event_before,
# # HISTORICAL MESSAGE END
# insertion_event,
# historical_message_event,
# batch_event,
# base_insertion_event,
# # HISTORICAL MESSAGE START
# event_after,
# # Latest in the room (newest messages)
# ]
# The order that we get after passing reverse chronological events in
# that mostly passes. Only the insertion event is rejected but the
# historical messages appear /messages scrollback.
# pulled_events: List[EventBase] = [
# # Beginning of room (oldest messages)
# # *list(state_map.values()),
# room_create_event,
# pl_event,
# as_membership_event,
# state_map.get((EventTypes.JoinRules, "")),
# state_map.get((EventTypes.RoomHistoryVisibility, "")),
# event_before,
# event_after,
# base_insertion_event,
# batch_event,
# historical_message_event,
# insertion_event,
# # Latest in the room (newest messages)
# ]
logger.info(
"pulled_events=%s",
json.dumps(
[_debug_event_string(event) for event in pulled_events],
indent=4,
),
)
for event, _ in known_event_dict.values():
if event.internal_metadata.outlier:
self.fail("Our pristine events should not be marked as an outlier")
# TODO: We currently don't set the `stream_ordering` on `pulled_events` here
# like we normally would via `backfill(..._` before passing it off to
# `_process_pulled_events(...)`
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_events(
self.OTHER_SERVER_NAME,
[
# Make copies of events since Synapse modifies the
# internal_metadata in place and we want to keep our
# pristine copies
make_event_from_dict(pulled_event.get_pdu_json(), room_version)
for pulled_event in pulled_events
],
backfilled=True,
)
)
from_token = self.get_success(
self.hs.get_event_sources().get_current_token_for_pagination(room_id)
)
actual_events_in_room_reverse_chronological, _ = self.get_success(
main_store.paginate_room_events(
room_id, from_key=from_token.room_key, limit=100, direction="b"
)
)
# We have to reverse the list to make it chronological.
actual_events_in_room_chronological = list(
reversed(actual_events_in_room_reverse_chronological)
)
expected_event_order = [
# Beginning of room (oldest messages)
# *list(state_map.values()),
room_create_event,
as_membership_event,
pl_event,
state_map.get((EventTypes.JoinRules, "")),
state_map.get((EventTypes.RoomHistoryVisibility, "")),
event_before,
# HISTORICAL MESSAGE END
insertion_event,
historical_message_event,
batch_event,
base_insertion_event,
# HISTORICAL MESSAGE START
event_after,
# Latest in the room (newest messages)
]
event_id_diff = {event.event_id for event in expected_event_order} - {
event.event_id for event in actual_events_in_room_chronological
}
event_diff_ordered = [
event for event in expected_event_order if event.event_id in event_id_diff
]
event_id_extra = {
event.event_id for event in actual_events_in_room_chronological
} - {event.event_id for event in expected_event_order}
event_extra_ordered = [
event
for event in actual_events_in_room_chronological
if event.event_id in event_id_extra
]
assertion_message = (
"Debug info:\nActual events missing from expected list: %s\nActual events contain %d additional events compared to expected: %s\nExpected event order: %s\nActual event order: %s"
% (
json.dumps(
[_debug_event_string(event) for event in event_diff_ordered],
indent=4,
),
len(event_extra_ordered),
json.dumps(
[_debug_event_string(event) for event in event_extra_ordered],
indent=4,
),
json.dumps(
[_debug_event_string(event) for event in expected_event_order],
indent=4,
),
json.dumps(
[
_debug_event_string(event)
for event in actual_events_in_room_chronological
],
indent=4,
),
)
)
# assert (
# actual_events_in_room_chronological == expected_event_order
# ), assertion_message
self.assertEqual(
[event.event_id for event in actual_events_in_room_chronological],
[event.event_id for event in expected_event_order],
assertion_message,
)
-279
View File
@@ -1,279 +0,0 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import cast
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactorClock
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.opentracing import (
start_active_span,
start_active_span_follows_from,
tag_args,
trace_with_opname,
)
from synapse.util import Clock
try:
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
LogContextScopeManager = None # type: ignore
try:
import jaeger_client
except ImportError:
jaeger_client = None # type: ignore
import logging
from tests.unittest import TestCase
logger = logging.getLogger(__name__)
class LogContextScopeManagerTestCase(TestCase):
"""
Test logging contexts and active opentracing spans.
There's casts throughout this from generic opentracing objects (e.g.
opentracing.Span) to the ones specific to Jaeger since they have additional
properties that these tests depend on. This is safe since the only supported
opentracing backend is Jaeger.
"""
if LogContextScopeManager is None:
skip = "Requires opentracing" # type: ignore[unreachable]
if jaeger_client is None:
skip = "Requires jaeger_client" # type: ignore[unreachable]
def setUp(self) -> None:
# since this is a unit test, we don't really want to mess around with the
# global variables that power opentracing. We create our own tracer instance
# and test with it.
scope_manager = LogContextScopeManager()
config = jaeger_client.config.Config(
config={}, service_name="test", scope_manager=scope_manager
)
self._reporter = jaeger_client.reporter.InMemoryReporter()
self._tracer = config.create_tracer(
sampler=jaeger_client.ConstSampler(True),
reporter=self._reporter,
)
def test_start_active_span(self) -> None:
# the scope manager assumes a logging context of some sort.
with LoggingContext("root context"):
self.assertIsNone(self._tracer.active_span)
# start_active_span should start and activate a span.
scope = start_active_span("span", tracer=self._tracer)
span = cast(jaeger_client.Span, scope.span)
self.assertEqual(self._tracer.active_span, span)
self.assertIsNotNone(span.start_time)
# entering the context doesn't actually do a whole lot.
with scope as ctx:
self.assertIs(ctx, scope)
self.assertEqual(self._tracer.active_span, span)
# ... but leaving it unsets the active span, and finishes the span.
self.assertIsNone(self._tracer.active_span)
self.assertIsNotNone(span.end_time)
# the span should have been reported
self.assertEqual(self._reporter.get_spans(), [span])
def test_nested_spans(self) -> None:
"""Starting two spans off inside each other should work"""
with LoggingContext("root context"):
with start_active_span("root span", tracer=self._tracer) as root_scope:
self.assertEqual(self._tracer.active_span, root_scope.span)
root_context = cast(jaeger_client.SpanContext, root_scope.span.context)
scope1 = start_active_span(
"child1",
tracer=self._tracer,
)
self.assertEqual(
self._tracer.active_span, scope1.span, "child1 was not activated"
)
context1 = cast(jaeger_client.SpanContext, scope1.span.context)
self.assertEqual(context1.parent_id, root_context.span_id)
scope2 = start_active_span_follows_from(
"child2",
contexts=(scope1,),
tracer=self._tracer,
)
self.assertEqual(self._tracer.active_span, scope2.span)
context2 = cast(jaeger_client.SpanContext, scope2.span.context)
self.assertEqual(context2.parent_id, context1.span_id)
with scope1, scope2:
pass
# the root scope should be restored
self.assertEqual(self._tracer.active_span, root_scope.span)
span2 = cast(jaeger_client.Span, scope2.span)
span1 = cast(jaeger_client.Span, scope1.span)
self.assertIsNotNone(span2.end_time)
self.assertIsNotNone(span1.end_time)
self.assertIsNone(self._tracer.active_span)
# the spans should be reported in order of their finishing.
self.assertEqual(
self._reporter.get_spans(), [scope2.span, scope1.span, root_scope.span]
)
def test_overlapping_spans(self) -> None:
"""Overlapping spans which are not neatly nested should work"""
reactor = MemoryReactorClock()
clock = Clock(reactor)
scopes = []
async def task(i: int):
scope = start_active_span(
f"task{i}",
tracer=self._tracer,
)
scopes.append(scope)
self.assertEqual(self._tracer.active_span, scope.span)
await clock.sleep(4)
self.assertEqual(self._tracer.active_span, scope.span)
scope.close()
async def root():
with start_active_span("root span", tracer=self._tracer) as root_scope:
self.assertEqual(self._tracer.active_span, root_scope.span)
scopes.append(root_scope)
d1 = run_in_background(task, 1)
await clock.sleep(2)
d2 = run_in_background(task, 2)
# because we did run_in_background, the active span should still be the
# root.
self.assertEqual(self._tracer.active_span, root_scope.span)
await make_deferred_yieldable(
defer.gatherResults([d1, d2], consumeErrors=True)
)
self.assertEqual(self._tracer.active_span, root_scope.span)
with LoggingContext("root context"):
# start the test off
d1 = defer.ensureDeferred(root())
# let the tasks complete
reactor.pump((2,) * 8)
self.successResultOf(d1)
self.assertIsNone(self._tracer.active_span)
# the spans should be reported in order of their finishing: task 1, task 2,
# root.
self.assertEqual(
self._reporter.get_spans(),
[scopes[1].span, scopes[2].span, scopes[0].span],
)
def test_trace_decorator_sync(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with sync functions
"""
with LoggingContext("root context"):
@trace_with_opname("fixture_sync_func", tracer=self._tracer)
@tag_args
def fixture_sync_func() -> str:
return "foo"
result = fixture_sync_func()
self.assertEqual(result, "foo")
# the span should have been reported
self.assertEqual(
[span.operation_name for span in self._reporter.get_spans()],
["fixture_sync_func"],
)
def test_trace_decorator_deferred(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with functions that return deferreds
"""
reactor = MemoryReactorClock()
with LoggingContext("root context"):
@trace_with_opname("fixture_deferred_func", tracer=self._tracer)
@tag_args
def fixture_deferred_func() -> "defer.Deferred[str]":
d1: defer.Deferred[str] = defer.Deferred()
d1.callback("foo")
return d1
result_d1 = fixture_deferred_func()
# let the tasks complete
reactor.pump((2,) * 8)
self.assertEqual(self.successResultOf(result_d1), "foo")
# the span should have been reported
self.assertEqual(
[span.operation_name for span in self._reporter.get_spans()],
["fixture_deferred_func"],
)
def test_trace_decorator_async(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with async functions
"""
reactor = MemoryReactorClock()
with LoggingContext("root context"):
@trace_with_opname("fixture_async_func", tracer=self._tracer)
@tag_args
async def fixture_async_func() -> str:
return "foo"
d1 = defer.ensureDeferred(fixture_async_func())
# let the tasks complete
reactor.pump((2,) * 8)
self.assertEqual(self.successResultOf(d1), "foo")
# the span should have been reported
self.assertEqual(
[span.operation_name for span in self._reporter.get_spans()],
["fixture_async_func"],
)
+254
View File
@@ -0,0 +1,254 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactorClock
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.tracing import start_active_span, tag_args, trace_with_opname
from synapse.util import Clock
from tests.unittest import TestCase
try:
import opentelemetry
import opentelemetry.sdk.trace
import opentelemetry.sdk.trace.export
import opentelemetry.sdk.trace.export.in_memory_span_exporter
import opentelemetry.trace
import opentelemetry.trace.propagation
except ImportError:
opentelemetry = None # type: ignore[assignment]
class TracingTestCase(TestCase):
"""
Test logging contexts and active opentelemetry spans.
"""
if opentelemetry is None:
skip = "Requires opentelemetry" # type: ignore[unreachable]
def setUp(self) -> None:
# since this is a unit test, we don't really want to mess around with the
# global variables that power opentelemetry. We create our own tracer instance
# and test with it.
self._tracer_provider = opentelemetry.sdk.trace.TracerProvider()
self._exporter = (
opentelemetry.sdk.trace.export.in_memory_span_exporter.InMemorySpanExporter()
)
processor = opentelemetry.sdk.trace.export.SimpleSpanProcessor(self._exporter)
self._tracer_provider.add_span_processor(processor)
self._tracer = self._tracer_provider.get_tracer(__name__)
def test_start_active_span(self) -> None:
# This means no current span
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# start_active_span should start and activate a span.
with start_active_span("new-span", tracer=self._tracer) as span:
self.assertEqual(opentelemetry.trace.get_current_span(), span)
# ... but leaving it unsets the active span, and finishes the span.
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# the span should have been reported
self.assertListEqual(
[span.name for span in self._exporter.get_finished_spans()], ["new-span"]
)
def test_nested_spans(self) -> None:
"""Starting two spans off inside each other should work"""
with start_active_span("root_span", tracer=self._tracer) as root_span:
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
with start_active_span(
"child_span1",
tracer=self._tracer,
) as child_span1:
self.assertEqual(
opentelemetry.trace.get_current_span(),
child_span1,
"child_span1 was not activated",
)
with start_active_span(
"child_span2",
tracer=self._tracer,
) as child_span2:
self.assertEqual(
opentelemetry.trace.get_current_span(), child_span2
)
# the root scope should be restored
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
# Active span is unset now that we're outside of the `with` scopes
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# the spans should be reported in order of their finishing.
self.assertListEqual(
[span.name for span in self._exporter.get_finished_spans()],
["child_span2", "child_span1", "root_span"],
)
def test_side_by_side_spans(self) -> None:
with start_active_span("span1", tracer=self._tracer), start_active_span(
"span2", tracer=self._tracer
) as span2:
# We expect the last span in `with` list to be active
self.assertEqual(opentelemetry.trace.get_current_span(), span2)
# Active span is unset now that we're outside of the `with` scopes
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# the spans should be reported in order of their finishing.
self.assertListEqual(
[span.name for span in self._exporter.get_finished_spans()],
["span2", "span1"],
)
def test_overlapping_spans(self) -> None:
"""Overlapping spans which are not neatly nested should work"""
reactor = MemoryReactorClock()
clock = Clock(reactor)
async def task(i: int):
with start_active_span(
f"task{i}",
tracer=self._tracer,
) as span:
self.assertEqual(opentelemetry.trace.get_current_span(), span)
await clock.sleep(4)
self.assertEqual(opentelemetry.trace.get_current_span(), span)
async def root():
with start_active_span("root_span", tracer=self._tracer) as root_span:
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
d1 = run_in_background(task, 1)
await clock.sleep(2)
d2 = run_in_background(task, 2)
# because we did run_in_background, the active span should still be the
# root.
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
await make_deferred_yieldable(
defer.gatherResults([d1, d2], consumeErrors=True)
)
self.assertEqual(opentelemetry.trace.get_current_span(), root_span)
# start the test off
root_defferred = defer.ensureDeferred(root())
# let the tasks complete
reactor.pump((2,) * 8)
self.successResultOf(root_defferred)
# Active span is unset now that we're outside of the `with` scopes
self.assertEqual(
opentelemetry.trace.get_current_span(), opentelemetry.trace.INVALID_SPAN
)
# the spans should be reported in order of their finishing: task 1, task 2,
# root.
self.assertListEqual(
[span.name for span in self._exporter.get_finished_spans()],
["task1", "task2", "root_span"],
)
def test_trace_decorator_sync(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with sync functions
"""
@trace_with_opname("fixture_sync_func", tracer=self._tracer)
@tag_args
def fixture_sync_func() -> str:
return "foo"
result = fixture_sync_func()
self.assertEqual(result, "foo")
# the span should have been reported
self.assertEqual(
[span.name for span in self._exporter.get_finished_spans()],
["fixture_sync_func"],
)
def test_trace_decorator_deferred(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with functions that return deferreds
"""
reactor = MemoryReactorClock()
@trace_with_opname("fixture_deferred_func", tracer=self._tracer)
@tag_args
def fixture_deferred_func() -> "defer.Deferred[str]":
d1: defer.Deferred[str] = defer.Deferred()
d1.callback("foo")
return d1
result_d1 = fixture_deferred_func()
# let the tasks complete
reactor.pump((2,) * 8)
self.assertEqual(self.successResultOf(result_d1), "foo")
# the span should have been reported
self.assertEqual(
[span.name for span in self._exporter.get_finished_spans()],
["fixture_deferred_func"],
)
def test_trace_decorator_async(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with async functions
"""
reactor = MemoryReactorClock()
@trace_with_opname("fixture_async_func", tracer=self._tracer)
@tag_args
async def fixture_async_func() -> str:
return "foo"
d1 = defer.ensureDeferred(fixture_async_func())
# let the tasks complete
reactor.pump((2,) * 8)
self.assertEqual(self.successResultOf(d1), "foo")
# the span should have been reported
self.assertEqual(
[span.name for span in self._exporter.get_finished_spans()],
["fixture_async_func"],
)
@@ -35,66 +35,45 @@ from synapse.util import Clock
from synapse.util.async_helpers import yieldable_gather_results
from tests import unittest
from tests.test_utils.event_injection import create_event, inject_event
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.hs = hs
self.store: EventsWorkerStore = hs.get_datastores().main
# insert some test data
for rid in ("room1", "room2"):
self.get_success(
self.store.db_pool.simple_insert(
"rooms",
{"room_id": rid, "room_version": 4},
)
)
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room_id = self.helper.create_room_as(self.user, tok=self.token)
self.event_ids: List[str] = []
for idx, rid in enumerate(
(
"room1",
"room1",
"room1",
"room2",
for i in range(3):
event = self.get_success(
inject_event(
hs,
room_version=RoomVersions.V7.identifier,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": f"foobarbaz{i}"},
)
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": event_id,
"room_id": rid,
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
)
)
self.event_ids.append(event_id)
self.event_ids.append(event.event_id)
def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})
@@ -104,7 +83,9 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
@@ -116,11 +97,86 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
def test_persisting_event_invalidates_cache(self):
"""
Test to make sure that the `have_seen_event` cache
is invalidated after we persist an event and returns
the updated value.
"""
event, event_context = self.get_success(
create_event(
self.hs,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": "garply"},
)
)
with LoggingContext(name="test") as ctx:
# First, check `have_seen_event` for an event we have not seen yet
# to prime the cache with a `false` value.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, set())
# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
# Persist the event which should invalidate or prefill the
# `have_seen_event` cache so we don't return stale values.
persistence = self.hs.get_storage_controllers().persistence
self.get_success(
persistence.persist_event(
event,
event_context,
)
)
with LoggingContext(name="test") as ctx:
# Check `have_seen_event` again and we should see the updated fact
# that we have now seen the event after persisting it.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, {event.event_id})
# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
def test_invalidate_cache_by_room_id(self):
"""
Test to make sure that all events associated with the given `(room_id,)`
are invalidated in the `have_seen_event` cache.
"""
with LoggingContext(name="test") as ctx:
# Prime the cache with some values
res = self.get_success(
self.store.have_seen_events(self.room_id, self.event_ids)
)
self.assertEqual(res, set(self.event_ids))
# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
# Clear the cache with any events associated with the `room_id`
self.store.have_seen_event.invalidate((self.room_id,))
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events(self.room_id, self.event_ids)
)
self.assertEqual(res, set(self.event_ids))
# Since we cleared the cache, it should result in another db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
class EventCacheTestCase(unittest.HomeserverTestCase):
"""Test that the various layers of event cache works."""
+16 -1
View File
@@ -82,6 +82,11 @@ async def create_event(
hs: synapse.server.HomeServer,
room_version: Optional[str] = None,
prev_event_ids: Optional[List[str]] = None,
*,
allow_no_prev_events: Optional[bool] = False,
auth_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
**kwargs,
) -> Tuple[EventBase, EventContext]:
if room_version is None:
@@ -89,11 +94,21 @@ async def create_event(
kwargs["room_id"]
)
import logging
logger = logging.getLogger(__name__)
logger.info("kwargs=%s", kwargs)
builder = hs.get_event_builder_factory().for_room_version(
KNOWN_ROOM_VERSIONS[room_version], kwargs
)
event, context = await hs.get_event_creation_handler().create_new_client_event(
builder, prev_event_ids=prev_event_ids
builder,
# Why does this need another default to pass: `Argument "allow_no_prev_events" to "create_new_client_event" of "EventCreationHandler" has incompatible type "Optional[bool]"; expected "bool"`
allow_no_prev_events=allow_no_prev_events or False,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
state_event_ids=state_event_ids,
depth=depth,
)
return event, context
+1 -1
View File
@@ -743,7 +743,7 @@ class HomeserverTestCase(TestCase):
"""
Inject a membership event into a room.
Deprecated: use event_injection.inject_room_member directly
Deprecated: use event_injection.inject_member_event directly
Args:
room: Room ID to inject the event into.
+32 -1
View File
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Set
from typing import Iterable, Set, Tuple
from unittest import mock
from twisted.internet import defer, reactor
@@ -1008,3 +1008,34 @@ class CachedListDescriptorTestCase(unittest.TestCase):
obj.inner_context_was_finished, "Tried to restart a finished logcontext"
)
self.assertEqual(current_context(), SENTINEL_CONTEXT)
def test_num_args_mismatch(self):
"""
Make sure someone does not accidentally use @cachedList on a method with
a mismatch in the number args to the underlying single cache method.
"""
class Cls:
@descriptors.cached(tree=True)
def fn(self, room_id, event_id):
pass
# This is wrong ❌. `@cachedList` expects to be given the same number
# of arguments as the underlying cached function, just with one of
# the arguments being an iterable
@descriptors.cachedList(cached_method_name="fn", list_name="keys")
def list_fn(self, keys: Iterable[Tuple[str, str]]):
pass
# Corrected syntax ✅
#
# @cachedList(cached_method_name="fn", list_name="event_ids")
# async def list_fn(
# self, room_id: str, event_ids: Collection[str],
# )
obj = Cls()
# Make sure this raises an error about the arg mismatch
with self.assertRaises(Exception):
obj.list_fn([("foo", "bar")])